On Fri, Jul 30, 2021 at 7:11 PM Peter Geoghegan <p...@bowt.ie> wrote:
> If you're going to specialize the sort routine for unsigned integer
> style abbreviated keys then you might as well cover all relevant
> opclasses/types. Almost all abbreviated key schemes produce
> conditioned datums that are designed to use simple 3-way unsigned int
> comparator. It's not just text. (Actually, the only abbreviated key
> scheme that doesn't do it that way is numeric.)

Right, that was the plan, but this was just experimenting with an
idea.  Looks like John's also seeing evidence that it may be worth
pursuing.

(Re numeric, I guess it must be possible to rearrange things so it can
use ssup_datum_signed_cmp; maybe something like NaN -> INT64_MAX, +inf
-> INT64_MAX - 1, -inf -> INT64_MIN, and then -1 - (whatever we're
doing now for normal values).)

> Offhand I know that UUID, macaddr, and inet all have abbreviated keys
> that can use your new ssup_datum_binary_cmp() comparator instead of
> their own duplicated comparator (which will make them use the
> corresponding specialized sort routine inside tuplesort.c).

Thanks, I've added these ones, and also gist_bbox_zorder_cmp_abbrev.

I also renamed that function to ssup_datum_unsigned_cmp(), because
"binary" was misleading.
From a1ca8e9ca989de5f61f1b8f067cb9785034ce9cc Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Sat, 3 Jul 2021 19:02:10 +1200
Subject: [PATCH v3] WIP: Accelerate tuple sorting for common types.

Several data types have their own fast comparator functions that can be
replaced with common binary or signed comparator functions.  These
functions can then be recognized by tuplesort.c and used to dispatch to
corresponding specialized sort functions, to accelerate sorting.

XXX WIP, experiment grade only, many open questions... such as:

XXX Can we avoid repeating the null-handling logic?
XXX Is it worth specializing for reverse sort?
XXX Is it worth specializing for nulls first, nulls last, not null?
XXX Should we have separate cases for "result is authoritative", "need
XXX tiebreaker for atts 1..n (= abbrev case)", "need tie breaker for
XXX atts 2..n"?
XXX If we did all of the above, that'd give:
XXX  {nulls_first, nulls_last, not_null} x
XXX  {asc, desc} x
XXX  {tiebreak_none, tiebreak_first, tiebreak_rest} x
XXX  {unsigned, signed, int32}
XXX  = 54 specializations!  Seems a little over the top.
XXX You could use a smaller SortTuple for some cases.
XXX How should we handle numeric?
---
 src/backend/access/gist/gistproc.c     |  18 +--
 src/backend/access/nbtree/nbtcompare.c |  22 ++--
 src/backend/utils/adt/date.c           |  15 +--
 src/backend/utils/adt/mac.c            |  23 +---
 src/backend/utils/adt/network.c        |  17 +--
 src/backend/utils/adt/timestamp.c      |  11 ++
 src/backend/utils/adt/uuid.c           |  25 +---
 src/backend/utils/adt/varlena.c        |  34 +-----
 src/backend/utils/sort/tuplesort.c     | 160 ++++++++++++++++++++++++-
 src/include/utils/sortsupport.h        | 115 ++++++++++++++++++
 10 files changed, 308 insertions(+), 132 deletions(-)

diff --git a/src/backend/access/gist/gistproc.c b/src/backend/access/gist/gistproc.c
index d474612b77..20135ea0ec 100644
--- a/src/backend/access/gist/gistproc.c
+++ b/src/backend/access/gist/gistproc.c
@@ -37,7 +37,6 @@ static uint64 part_bits32_by2(uint32 x);
 static uint32 ieee_float32_to_uint32(float f);
 static int	gist_bbox_zorder_cmp(Datum a, Datum b, SortSupport ssup);
 static Datum gist_bbox_zorder_abbrev_convert(Datum original, SortSupport ssup);
-static int	gist_bbox_zorder_cmp_abbrev(Datum z1, Datum z2, SortSupport ssup);
 static bool gist_bbox_zorder_abbrev_abort(int memtupcount, SortSupport ssup);
 
 
@@ -1726,21 +1725,6 @@ gist_bbox_zorder_abbrev_convert(Datum original, SortSupport ssup)
 #endif
 }
 
-static int
-gist_bbox_zorder_cmp_abbrev(Datum z1, Datum z2, SortSupport ssup)
-{
-	/*
-	 * Compare the pre-computed Z-orders as unsigned integers. Datum is a
-	 * typedef for 'uintptr_t', so no casting is required.
-	 */
-	if (z1 > z2)
-		return 1;
-	else if (z1 < z2)
-		return -1;
-	else
-		return 0;
-}
-
 /*
  * We never consider aborting the abbreviation.
  *
@@ -1764,7 +1748,7 @@ gist_point_sortsupport(PG_FUNCTION_ARGS)
 
 	if (ssup->abbreviate)
 	{
-		ssup->comparator = gist_bbox_zorder_cmp_abbrev;
+		ssup->comparator = ssup_datum_unsigned_cmp;
 		ssup->abbrev_converter = gist_bbox_zorder_abbrev_convert;
 		ssup->abbrev_abort = gist_bbox_zorder_abbrev_abort;
 		ssup->abbrev_full_comparator = gist_bbox_zorder_cmp;
diff --git a/src/backend/access/nbtree/nbtcompare.c b/src/backend/access/nbtree/nbtcompare.c
index 7ac73cb8c2..204cf778fb 100644
--- a/src/backend/access/nbtree/nbtcompare.c
+++ b/src/backend/access/nbtree/nbtcompare.c
@@ -119,26 +119,12 @@ btint4cmp(PG_FUNCTION_ARGS)
 		PG_RETURN_INT32(A_LESS_THAN_B);
 }
 
-static int
-btint4fastcmp(Datum x, Datum y, SortSupport ssup)
-{
-	int32		a = DatumGetInt32(x);
-	int32		b = DatumGetInt32(y);
-
-	if (a > b)
-		return A_GREATER_THAN_B;
-	else if (a == b)
-		return 0;
-	else
-		return A_LESS_THAN_B;
-}
-
 Datum
 btint4sortsupport(PG_FUNCTION_ARGS)
 {
 	SortSupport ssup = (SortSupport) PG_GETARG_POINTER(0);
 
-	ssup->comparator = btint4fastcmp;
+	ssup->comparator = ssup_datum_int32_cmp;
 	PG_RETURN_VOID();
 }
 
@@ -156,6 +142,7 @@ btint8cmp(PG_FUNCTION_ARGS)
 		PG_RETURN_INT32(A_LESS_THAN_B);
 }
 
+#ifndef USE_FLOAT8_BYVAL
 static int
 btint8fastcmp(Datum x, Datum y, SortSupport ssup)
 {
@@ -169,13 +156,18 @@ btint8fastcmp(Datum x, Datum y, SortSupport ssup)
 	else
 		return A_LESS_THAN_B;
 }
+#endif
 
 Datum
 btint8sortsupport(PG_FUNCTION_ARGS)
 {
 	SortSupport ssup = (SortSupport) PG_GETARG_POINTER(0);
 
+#ifdef USE_FLOAT8_BYVAL
+	ssup->comparator = ssup_datum_signed_cmp;
+#else
 	ssup->comparator = btint8fastcmp;
+#endif
 	PG_RETURN_VOID();
 }
 
diff --git a/src/backend/utils/adt/date.c b/src/backend/utils/adt/date.c
index 0bea16cb67..350c662d50 100644
--- a/src/backend/utils/adt/date.c
+++ b/src/backend/utils/adt/date.c
@@ -438,25 +438,12 @@ date_cmp(PG_FUNCTION_ARGS)
 	PG_RETURN_INT32(0);
 }
 
-static int
-date_fastcmp(Datum x, Datum y, SortSupport ssup)
-{
-	DateADT		a = DatumGetDateADT(x);
-	DateADT		b = DatumGetDateADT(y);
-
-	if (a < b)
-		return -1;
-	else if (a > b)
-		return 1;
-	return 0;
-}
-
 Datum
 date_sortsupport(PG_FUNCTION_ARGS)
 {
 	SortSupport ssup = (SortSupport) PG_GETARG_POINTER(0);
 
-	ssup->comparator = date_fastcmp;
+	ssup->comparator = ssup_datum_int32_cmp;
 	PG_RETURN_VOID();
 }
 
diff --git a/src/backend/utils/adt/mac.c b/src/backend/utils/adt/mac.c
index 844d8814e6..9016c6bf94 100644
--- a/src/backend/utils/adt/mac.c
+++ b/src/backend/utils/adt/mac.c
@@ -44,7 +44,6 @@ typedef struct
 
 static int	macaddr_cmp_internal(macaddr *a1, macaddr *a2);
 static int	macaddr_fast_cmp(Datum x, Datum y, SortSupport ssup);
-static int	macaddr_cmp_abbrev(Datum x, Datum y, SortSupport ssup);
 static bool macaddr_abbrev_abort(int memtupcount, SortSupport ssup);
 static Datum macaddr_abbrev_convert(Datum original, SortSupport ssup);
 
@@ -381,7 +380,7 @@ macaddr_sortsupport(PG_FUNCTION_ARGS)
 
 		ssup->ssup_extra = uss;
 
-		ssup->comparator = macaddr_cmp_abbrev;
+		ssup->comparator = ssup_datum_unsigned_cmp;
 		ssup->abbrev_converter = macaddr_abbrev_convert;
 		ssup->abbrev_abort = macaddr_abbrev_abort;
 		ssup->abbrev_full_comparator = macaddr_fast_cmp;
@@ -405,22 +404,6 @@ macaddr_fast_cmp(Datum x, Datum y, SortSupport ssup)
 	return macaddr_cmp_internal(arg1, arg2);
 }
 
-/*
- * SortSupport abbreviated key comparison function. Compares two MAC addresses
- * quickly by treating them like integers, and without having to go to the
- * heap.
- */
-static int
-macaddr_cmp_abbrev(Datum x, Datum y, SortSupport ssup)
-{
-	if (x > y)
-		return 1;
-	else if (x == y)
-		return 0;
-	else
-		return -1;
-}
-
 /*
  * Callback for estimating effectiveness of abbreviated key optimization.
  *
@@ -537,8 +520,8 @@ macaddr_abbrev_convert(Datum original, SortSupport ssup)
 	/*
 	 * Byteswap on little-endian machines.
 	 *
-	 * This is needed so that macaddr_cmp_abbrev() (an unsigned integer 3-way
-	 * comparator) works correctly on all platforms. Without this, the
+	 * This is needed so that ssup_datum_unsigned_cmp() (an unsigned integer
+	 * 3-way comparator) works correctly on all platforms. Without this, the
 	 * comparator would have to call memcmp() with a pair of pointers to the
 	 * first byte of each abbreviated key, which is slower.
 	 */
diff --git a/src/backend/utils/adt/network.c b/src/backend/utils/adt/network.c
index 0ab54316f8..ea1c7390d0 100644
--- a/src/backend/utils/adt/network.c
+++ b/src/backend/utils/adt/network.c
@@ -53,7 +53,6 @@ typedef struct
 
 static int32 network_cmp_internal(inet *a1, inet *a2);
 static int	network_fast_cmp(Datum x, Datum y, SortSupport ssup);
-static int	network_cmp_abbrev(Datum x, Datum y, SortSupport ssup);
 static bool network_abbrev_abort(int memtupcount, SortSupport ssup);
 static Datum network_abbrev_convert(Datum original, SortSupport ssup);
 static List *match_network_function(Node *leftop,
@@ -456,7 +455,7 @@ network_sortsupport(PG_FUNCTION_ARGS)
 
 		ssup->ssup_extra = uss;
 
-		ssup->comparator = network_cmp_abbrev;
+		ssup->comparator = ssup_datum_unsigned_cmp;
 		ssup->abbrev_converter = network_abbrev_convert;
 		ssup->abbrev_abort = network_abbrev_abort;
 		ssup->abbrev_full_comparator = network_fast_cmp;
@@ -479,20 +478,6 @@ network_fast_cmp(Datum x, Datum y, SortSupport ssup)
 	return network_cmp_internal(arg1, arg2);
 }
 
-/*
- * Abbreviated key comparison func
- */
-static int
-network_cmp_abbrev(Datum x, Datum y, SortSupport ssup)
-{
-	if (x > y)
-		return 1;
-	else if (x == y)
-		return 0;
-	else
-		return -1;
-}
-
 /*
  * Callback for estimating effectiveness of abbreviated key optimization.
  *
diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c
index 1c0bf0aa5c..410b39b44e 100644
--- a/src/backend/utils/adt/timestamp.c
+++ b/src/backend/utils/adt/timestamp.c
@@ -37,6 +37,7 @@
 #include "utils/datetime.h"
 #include "utils/float.h"
 #include "utils/numeric.h"
+#include "utils/sortsupport.h"
 
 /*
  * gcc's -ffast-math switch breaks routines that expect exact results from
@@ -2155,6 +2156,7 @@ timestamp_cmp(PG_FUNCTION_ARGS)
 	PG_RETURN_INT32(timestamp_cmp_internal(dt1, dt2));
 }
 
+#ifndef USE_FLOAT8_BYVAL
 /* note: this is used for timestamptz also */
 static int
 timestamp_fastcmp(Datum x, Datum y, SortSupport ssup)
@@ -2164,13 +2166,22 @@ timestamp_fastcmp(Datum x, Datum y, SortSupport ssup)
 
 	return timestamp_cmp_internal(a, b);
 }
+#endif
 
 Datum
 timestamp_sortsupport(PG_FUNCTION_ARGS)
 {
 	SortSupport ssup = (SortSupport) PG_GETARG_POINTER(0);
 
+#ifdef USE_FLOAT8_BYVAL
+	/*
+	 * If this build has pass-by-value timestamps, then we can use a standard
+	 * comparator function.
+	 */
+	ssup->comparator = ssup_datum_signed_cmp;
+#else
 	ssup->comparator = timestamp_fastcmp;
+#endif
 	PG_RETURN_VOID();
 }
 
diff --git a/src/backend/utils/adt/uuid.c b/src/backend/utils/adt/uuid.c
index b02c9fcf98..eaf0a6bbdf 100644
--- a/src/backend/utils/adt/uuid.c
+++ b/src/backend/utils/adt/uuid.c
@@ -34,7 +34,6 @@ typedef struct
 static void string_to_uuid(const char *source, pg_uuid_t *uuid);
 static int	uuid_internal_cmp(const pg_uuid_t *arg1, const pg_uuid_t *arg2);
 static int	uuid_fast_cmp(Datum x, Datum y, SortSupport ssup);
-static int	uuid_cmp_abbrev(Datum x, Datum y, SortSupport ssup);
 static bool uuid_abbrev_abort(int memtupcount, SortSupport ssup);
 static Datum uuid_abbrev_convert(Datum original, SortSupport ssup);
 
@@ -255,7 +254,7 @@ uuid_sortsupport(PG_FUNCTION_ARGS)
 
 		ssup->ssup_extra = uss;
 
-		ssup->comparator = uuid_cmp_abbrev;
+		ssup->comparator = ssup_datum_unsigned_cmp;
 		ssup->abbrev_converter = uuid_abbrev_convert;
 		ssup->abbrev_abort = uuid_abbrev_abort;
 		ssup->abbrev_full_comparator = uuid_fast_cmp;
@@ -278,20 +277,6 @@ uuid_fast_cmp(Datum x, Datum y, SortSupport ssup)
 	return uuid_internal_cmp(arg1, arg2);
 }
 
-/*
- * Abbreviated key comparison func
- */
-static int
-uuid_cmp_abbrev(Datum x, Datum y, SortSupport ssup)
-{
-	if (x > y)
-		return 1;
-	else if (x == y)
-		return 0;
-	else
-		return -1;
-}
-
 /*
  * Callback for estimating effectiveness of abbreviated key optimization.
  *
@@ -390,10 +375,10 @@ uuid_abbrev_convert(Datum original, SortSupport ssup)
 	/*
 	 * Byteswap on little-endian machines.
 	 *
-	 * This is needed so that uuid_cmp_abbrev() (an unsigned integer 3-way
-	 * comparator) works correctly on all platforms.  If we didn't do this,
-	 * the comparator would have to call memcmp() with a pair of pointers to
-	 * the first byte of each abbreviated key, which is slower.
+	 * This is needed so that ssup_datum_unsigned_cmp() (an unsigned integer
+	 * 3-way comparator) works correctly on all platforms.  If we didn't do
+	 * this, the comparator would have to call memcmp() with a pair of pointers
+	 * to the first byte of each abbreviated key, which is slower.
 	 */
 	res = DatumBigEndianToNative(res);
 
diff --git a/src/backend/utils/adt/varlena.c b/src/backend/utils/adt/varlena.c
index d2a11b1b5d..0a91de6992 100644
--- a/src/backend/utils/adt/varlena.c
+++ b/src/backend/utils/adt/varlena.c
@@ -127,7 +127,6 @@ static int	namefastcmp_c(Datum x, Datum y, SortSupport ssup);
 static int	varlenafastcmp_locale(Datum x, Datum y, SortSupport ssup);
 static int	namefastcmp_locale(Datum x, Datum y, SortSupport ssup);
 static int	varstrfastcmp_locale(char *a1p, int len1, char *a2p, int len2, SortSupport ssup);
-static int	varstrcmp_abbrev(Datum x, Datum y, SortSupport ssup);
 static Datum varstr_abbrev_convert(Datum original, SortSupport ssup);
 static bool varstr_abbrev_abort(int memtupcount, SortSupport ssup);
 static int32 text_length(Datum str);
@@ -2175,7 +2174,7 @@ varstr_sortsupport(SortSupport ssup, Oid typid, Oid collid)
 			initHyperLogLog(&sss->abbr_card, 10);
 			initHyperLogLog(&sss->full_card, 10);
 			ssup->abbrev_full_comparator = ssup->comparator;
-			ssup->comparator = varstrcmp_abbrev;
+			ssup->comparator = ssup_datum_unsigned_cmp;
 			ssup->abbrev_converter = varstr_abbrev_convert;
 			ssup->abbrev_abort = varstr_abbrev_abort;
 		}
@@ -2461,27 +2460,6 @@ varstrfastcmp_locale(char *a1p, int len1, char *a2p, int len2, SortSupport ssup)
 	return result;
 }
 
-/*
- * Abbreviated key comparison func
- */
-static int
-varstrcmp_abbrev(Datum x, Datum y, SortSupport ssup)
-{
-	/*
-	 * When 0 is returned, the core system will call varstrfastcmp_c()
-	 * (bpcharfastcmp_c() in BpChar case) or varlenafastcmp_locale().  Even a
-	 * strcmp() on two non-truncated strxfrm() blobs cannot indicate *equality*
-	 * authoritatively, for the same reason that there is a strcoll()
-	 * tie-breaker call to strcmp() in varstr_cmp().
-	 */
-	if (x > y)
-		return 1;
-	else if (x == y)
-		return 0;
-	else
-		return -1;
-}
-
 /*
  * Conversion routine for sortsupport.  Converts original to abbreviated key
  * representation.  Our encoding strategy is simple -- pack the first 8 bytes
@@ -2520,7 +2498,7 @@ varstr_abbrev_convert(Datum original, SortSupport ssup)
 	 * strings may contain NUL bytes.  Besides, this should be faster, too.
 	 *
 	 * More generally, it's okay that bytea callers can have NUL bytes in
-	 * strings because varstrcmp_abbrev() need not make a distinction between
+	 * strings because abbreviated cmp need not make a distinction between
 	 * terminating NUL bytes, and NUL bytes representing actual NULs in the
 	 * authoritative representation.  Hopefully a comparison at or past one
 	 * abbreviated key's terminating NUL byte will resolve the comparison
@@ -2710,10 +2688,10 @@ done:
 	/*
 	 * Byteswap on little-endian machines.
 	 *
-	 * This is needed so that varstrcmp_abbrev() (an unsigned integer 3-way
-	 * comparator) works correctly on all platforms.  If we didn't do this,
-	 * the comparator would have to call memcmp() with a pair of pointers to
-	 * the first byte of each abbreviated key, which is slower.
+	 * This is needed so that ssup_datum_unsigned_cmp() (an unsigned integer
+	 * 3-way comparator) works correctly on all platforms.  If we didn't do
+	 * this, the comparator would have to call memcmp() with a pair of pointers
+	 * to the first byte of each abbreviated key, which is slower.
 	 */
 	res = DatumBigEndianToNative(res);
 
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index b17347b214..32cd072c82 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -669,14 +669,101 @@ static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
 static void tuplesort_free(Tuplesortstate *state);
 static void tuplesort_updatemax(Tuplesortstate *state);
 
+/*
+ * Specialized comparators that we can inline into specialized sorts.  The goal
+ * is to try to sort two tuples without having to follow the pointers to the
+ * comparator or the tuple.
+ *
+ * XXX: For now, these fall back to comparator functions that will compare the
+ * leading datum a second time.
+ *
+ * XXX: For now, there is no specialization for cases where datum1 is
+ * authoritative and we don't even need to fall back to a callback at all (that
+ * would be true for types like int4/int8/timestamp/date, but not true for
+ * abbreviations of text or multi-key sorts.  There could be!  Is it worth it?
+ */
+
+/* Used if first key's comparator is ssup_datum_unsigned_compare */
+static pg_attribute_always_inline int
+qsort_tuple_unsigned_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
+{
+	int			compare;
+
+	compare = ApplyUnsignedSortComparator(a->datum1, a->isnull1,
+										  b->datum1, b->isnull1,
+										  &state->sortKeys[0]);
+	if (compare != 0)
+		return compare;
+
+	return state->comparetup(a, b, state);
+}
+
+/* Used if first key's comparator is ssup_datum_signed_compare */
+static pg_attribute_always_inline int
+qsort_tuple_signed_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
+{
+	int			compare;
+
+	compare = ApplySignedSortComparator(a->datum1, a->isnull1,
+										b->datum1, b->isnull1,
+										&state->sortKeys[0]);
+	if (compare != 0)
+		return compare;
+
+	return state->comparetup(a, b, state);
+}
+
+/* Used if first key's comparator is ssup_datum_int32_compare */
+static pg_attribute_always_inline int
+qsort_tuple_int32_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
+{
+	int			compare;
+
+	compare = ApplyInt32SortComparator(a->datum1, a->isnull1,
+										b->datum1, b->isnull1,
+										&state->sortKeys[0]);
+	if (compare != 0)
+		return compare;
+
+	return state->comparetup(a, b, state);
+}
+
 /*
  * Special versions of qsort just for SortTuple objects.  qsort_tuple() sorts
  * any variant of SortTuples, using the appropriate comparetup function.
  * qsort_ssup() is specialized for the case where the comparetup function
  * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
- * and Datum sorts.
+ * and Datum sorts.  qsort_tuple_{unsigned,signed,int32} are specialized for
+ * common comparison functions on pass-by-value leading datums.
  */
 
+#define ST_SORT qsort_tuple_unsigned
+#define ST_ELEMENT_TYPE SortTuple
+#define ST_COMPARE(a, b, state) qsort_tuple_unsigned_compare(a, b, state)
+#define ST_COMPARE_ARG_TYPE Tuplesortstate
+#define ST_CHECK_FOR_INTERRUPTS
+#define ST_SCOPE static
+#define ST_DEFINE
+#include "lib/sort_template.h"
+
+#define ST_SORT qsort_tuple_signed
+#define ST_ELEMENT_TYPE SortTuple
+#define ST_COMPARE(a, b, state) qsort_tuple_signed_compare(a, b, state)
+#define ST_COMPARE_ARG_TYPE Tuplesortstate
+#define ST_CHECK_FOR_INTERRUPTS
+#define ST_SCOPE static
+#define ST_DEFINE
+#include "lib/sort_template.h"
+
+#define ST_SORT qsort_tuple_int32
+#define ST_ELEMENT_TYPE SortTuple
+#define ST_COMPARE(a, b, state) qsort_tuple_int32_compare(a, b, state)
+#define ST_COMPARE_ARG_TYPE Tuplesortstate
+#define ST_CHECK_FOR_INTERRUPTS
+#define ST_SCOPE static
+#define ST_DEFINE
+#include "lib/sort_template.h"
+
 #define ST_SORT qsort_tuple
 #define ST_ELEMENT_TYPE SortTuple
 #define ST_COMPARE_RUNTIME_POINTER
@@ -3558,15 +3645,40 @@ tuplesort_sort_memtuples(Tuplesortstate *state)
 
 	if (state->memtupcount > 1)
 	{
+		/* Do we have a specialization for the leading column's comparator? */
+		if (state->sortKeys &&
+			state->sortKeys[0].comparator == ssup_datum_unsigned_cmp)
+		{
+			elog(DEBUG1, "qsort_tuple_unsigned");
+			qsort_tuple_unsigned(state->memtuples, state->memtupcount, state);
+		}
+		else if (state->sortKeys &&
+				 state->sortKeys[0].comparator == ssup_datum_signed_cmp)
+		{
+			elog(DEBUG1, "qsort_tuple_signed");
+			qsort_tuple_signed(state->memtuples, state->memtupcount, state);
+		}
+		else if (state->sortKeys &&
+				 state->sortKeys[0].comparator == ssup_datum_int32_cmp)
+		{
+			elog(DEBUG1, "qsort_tuple_int32");
+			qsort_tuple_int32(state->memtuples, state->memtupcount, state);
+		}
 		/* Can we use the single-key sort function? */
-		if (state->onlyKey != NULL)
+		else if (state->onlyKey != NULL)
+		{
+			elog(DEBUG1, "qsort_ssup");
 			qsort_ssup(state->memtuples, state->memtupcount,
 					   state->onlyKey);
+		}
 		else
+		{
+			elog(DEBUG1, "qsort_tuple");
 			qsort_tuple(state->memtuples,
 						state->memtupcount,
 						state->comparetup,
 						state);
+		}
 	}
 }
 
@@ -4780,3 +4892,47 @@ free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
 		stup->tuple = NULL;
 	}
 }
+
+int
+ssup_datum_unsigned_cmp(Datum x, Datum y, SortSupport ssup)
+{
+	if (x < y)
+		return -1;
+	else if (x > y)
+		return 1;
+	else
+		return 0;
+}
+
+int
+ssup_datum_signed_cmp(Datum x, Datum y, SortSupport ssup)
+{
+#if SIZEOF_DATUM == 8
+	int64		xx = (int64) x;
+	int64		yy = (int64) y;
+#else
+	int32		xx = (int32) x;
+	int32		yy = (int32) y;
+#endif
+
+	if (xx < yy)
+		return -1;
+	else if (xx > yy)
+		return 1;
+	else
+		return 0;
+}
+
+int
+ssup_datum_int32_cmp(Datum x, Datum y, SortSupport ssup)
+{
+	int32		xx = (int32) x;
+	int32		yy = (int32) y;
+
+	if (xx < yy)
+		return -1;
+	else if (xx > yy)
+		return 1;
+	else
+		return 0;
+}
diff --git a/src/include/utils/sortsupport.h b/src/include/utils/sortsupport.h
index 2f12a8b8eb..99eac089b3 100644
--- a/src/include/utils/sortsupport.h
+++ b/src/include/utils/sortsupport.h
@@ -229,6 +229,112 @@ ApplySortComparator(Datum datum1, bool isNull1,
 	return compare;
 }
 
+static inline int
+ApplyUnsignedSortComparator(Datum datum1, bool isNull1,
+							Datum datum2, bool isNull2,
+							SortSupport ssup)
+{
+	int			compare;
+
+	if (isNull1)
+	{
+		if (isNull2)
+			compare = 0;		/* NULL "=" NULL */
+		else if (ssup->ssup_nulls_first)
+			compare = -1;		/* NULL "<" NOT_NULL */
+		else
+			compare = 1;		/* NULL ">" NOT_NULL */
+	}
+	else if (isNull2)
+	{
+		if (ssup->ssup_nulls_first)
+			compare = 1;		/* NOT_NULL ">" NULL */
+		else
+			compare = -1;		/* NOT_NULL "<" NULL */
+	}
+	else
+	{
+		compare = datum1 < datum2 ? -1 : datum1 > datum2 ? 1 : 0;
+		if (ssup->ssup_reverse)
+			INVERT_COMPARE_RESULT(compare);
+	}
+
+	return compare;
+}
+
+static inline int
+ApplySignedSortComparator(Datum datum1, bool isNull1,
+						  Datum datum2, bool isNull2,
+						  SortSupport ssup)
+{
+	int			compare;
+
+	if (isNull1)
+	{
+		if (isNull2)
+			compare = 0;		/* NULL "=" NULL */
+		else if (ssup->ssup_nulls_first)
+			compare = -1;		/* NULL "<" NOT_NULL */
+		else
+			compare = 1;		/* NULL ">" NOT_NULL */
+	}
+	else if (isNull2)
+	{
+		if (ssup->ssup_nulls_first)
+			compare = 1;		/* NOT_NULL ">" NULL */
+		else
+			compare = -1;		/* NOT_NULL "<" NULL */
+	}
+	else
+	{
+#if SIZEOF_DATUM == 8
+		compare = (int64) datum1 < (int64) datum2 ? -1 :
+			(int64) datum1 > (int64) datum2 ? 1 : 0;
+#else
+		compare = (int32) datum1 < (int32) datum2 ? -1 :
+			(int32) datum1 > (int32) datum2 ? 1 : 0;
+#endif
+		if (ssup->ssup_reverse)
+			INVERT_COMPARE_RESULT(compare);
+	}
+
+	return compare;
+}
+
+static inline int
+ApplyInt32SortComparator(Datum datum1, bool isNull1,
+						 Datum datum2, bool isNull2,
+						 SortSupport ssup)
+{
+	int			compare;
+
+	if (isNull1)
+	{
+		if (isNull2)
+			compare = 0;		/* NULL "=" NULL */
+		else if (ssup->ssup_nulls_first)
+			compare = -1;		/* NULL "<" NOT_NULL */
+		else
+			compare = 1;		/* NULL ">" NOT_NULL */
+	}
+	else if (isNull2)
+	{
+		if (ssup->ssup_nulls_first)
+			compare = 1;		/* NOT_NULL ">" NULL */
+		else
+			compare = -1;		/* NOT_NULL "<" NULL */
+	}
+	else
+	{
+		compare = (int32) datum1 < (int32) datum2 ? -1 :
+			(int32) datum1 > (int32) datum2 ? 1 : 0;
+		if (ssup->ssup_reverse)
+			INVERT_COMPARE_RESULT(compare);
+	}
+
+	return compare;
+}
+
 /*
  * Apply a sort comparator function and return a 3-way comparison using full,
  * authoritative comparator.  This takes care of handling reverse-sort and
@@ -267,6 +373,15 @@ ApplySortAbbrevFullComparator(Datum datum1, bool isNull1,
 	return compare;
 }
 
+/*
+ * Datum comparison functions that we have specialized sort routines for.
+ * Datatypes that install these as their comparator or abbrevated comparator
+ * are eligible for faster sorting.
+ */
+extern int ssup_datum_unsigned_cmp(Datum x, Datum y, SortSupport ssup);
+extern int ssup_datum_signed_cmp(Datum x, Datum y, SortSupport ssup);
+extern int ssup_datum_int32_cmp(Datum x, Datum y, SortSupport ssup);
+
 /* Other functions in utils/sort/sortsupport.c */
 extern void PrepareSortSupportComparisonShim(Oid cmpFunc, SortSupport ssup);
 extern void PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup);
-- 
2.30.2

Reply via email to