On Sun, Jan 5, 2025 at 1:15 AM Andrey M. Borodin <x4...@yandex-team.ru> wrote:
>
> > On 4 Jan 2025, at 10:24, John Naylor <johncnaylo...@gmail.com> wrote:
> >
> > v6-0001:
> >
> > +static int
> > +unique_cmp(const void *a, const void *b)
> > +{
> > + int32 aval = *((const int32 *) a);
> > + int32 bval = *((const int32 *) b);
> > +
> > + return pg_cmp_s32(aval, bval);
> > +}
> >
> > I'm not sure it makes sense to create a whole new function for this,
> > when the same patch removed:
> >
> > -int
> > -compASC(const void *a, const void *b)
> > -{
> > - return pg_cmp_s32(*(const int32 *) a, *(const int32 *) b);
> > -}
> >
> > ...which in effect the exact same thing.
> >
> > Otherwise seems close to committable.
>
> I thought about it, but decided to rename the routine.
> Here's a version 7 with compASC().

I had the right idea, but the wrong function -- HEAD already had a
suitable comp function, and one that works well with inlined
specialized sorts: isort_cmp(). We just need to remove the extra
argument. Like some other patches in this series, this does have the
side effect of removing the ability to skip quinique(), so that should
be benchmarked (I can do that this week unless you beat me to it). We
can specialize quinique too, as mentioned upthread, but I'm not sure
we need to.

> And, just in case, if we already have ASC, why not keep DESC too instead of 
> newly invented cmp function :) PFA v8.

Those functions from common/int.h are probably not good when inlined
(see comment there). If we really want to keep the branch for asc/desc
out of the comparison, it makes more sense to add a function to
reverse the elements after sorting. That allows using the same cmp
function for everything, thus removing the apparent need for a global
wrapper around the static sort function.

I've done both ideas in v9, which also tries to improve patch
legibility by keeping things in the same place they were before. It
also removes the internal "n > 1" checks that you mentioned earlier --
after thinking about it that seems better than adding braces to one
macro to keep it functional.

--
John Naylor
Amazon Web Services
From 6edd6da21e38f0301f0a3fdf773381124ad26831 Mon Sep 17 00:00:00 2001
From: "Andrey M. Borodin" <x4mmm@night.local>
Date: Sat, 18 May 2024 23:02:50 +0500
Subject: [PATCH v9] Use specialized sort facilities

---
 contrib/intarray/_int.h      | 20 +++++------
 contrib/intarray/_int_tool.c | 65 +++++++++++++++++-------------------
 2 files changed, 39 insertions(+), 46 deletions(-)

diff --git a/contrib/intarray/_int.h b/contrib/intarray/_int.h
index 0352cbd368..5dbf0f66f2 100644
--- a/contrib/intarray/_int.h
+++ b/contrib/intarray/_int.h
@@ -41,17 +41,15 @@ typedef struct
 #define SORT(x) \
 	do { \
 		int		_nelems_ = ARRNELEMS(x); \
-		if (_nelems_ > 1) \
-			isort(ARRPTR(x), _nelems_); \
+		isort(ARRPTR(x), _nelems_); \
 	} while(0)
 
 /* sort the elements of the array and remove duplicates */
 #define PREPAREARR(x) \
 	do { \
 		int		_nelems_ = ARRNELEMS(x); \
-		if (_nelems_ > 1) \
-			if (isort(ARRPTR(x), _nelems_)) \
-				(x) = _int_unique(x); \
+		isort(ARRPTR(x), _nelems_); \
+		(x) = _int_unique(x); \
 	} while(0)
 
 /* "wish" function */
@@ -109,12 +107,13 @@ typedef struct
 /*
  * useful functions
  */
-bool		isort(int32 *a, int len);
+void		isort(int32 *a, size_t len);
 ArrayType  *new_intArrayType(int num);
 ArrayType  *copy_intArrayType(ArrayType *a);
 ArrayType  *resize_intArrayType(ArrayType *a, int num);
 int			internal_size(int *a, int len);
 ArrayType  *_int_unique(ArrayType *r);
+void		_int_reverse(ArrayType *r);
 int32		intarray_match_first(ArrayType *a, int32 elem);
 ArrayType  *intarray_add_elem(ArrayType *a, int32 elem);
 ArrayType  *intarray_concat_arrays(ArrayType *a, ArrayType *b);
@@ -176,16 +175,13 @@ bool		execconsistent(QUERYTYPE *query, ArrayType *array, bool calcnot);
 bool		gin_bool_consistent(QUERYTYPE *query, bool *check);
 bool		query_has_required_values(QUERYTYPE *query);
 
-int			compASC(const void *a, const void *b);
-int			compDESC(const void *a, const void *b);
-
 /* sort, either ascending or descending */
 #define QSORT(a, direction) \
 	do { \
 		int		_nelems_ = ARRNELEMS(a); \
-		if (_nelems_ > 1) \
-			qsort((void*) ARRPTR(a), _nelems_, sizeof(int32), \
-				  (direction) ? compASC : compDESC ); \
+		isort(ARRPTR(a), _nelems_); \
+		if ((direction) == 0) \
+			_int_reverse(a); \
 	} while(0)
 
 #endif							/* ___INT_H__ */
diff --git a/contrib/intarray/_int_tool.c b/contrib/intarray/_int_tool.c
index c85280c842..8d3c728c42 100644
--- a/contrib/intarray/_int_tool.c
+++ b/contrib/intarray/_int_tool.c
@@ -186,9 +186,9 @@ rt__int_size(ArrayType *a, float *size)
 	*size = (float) ARRNELEMS(a);
 }
 
-/* qsort_arg comparison function for isort() */
-static int
-isort_cmp(const void *a, const void *b, void *arg)
+/* comparison function for isort() and _int_unique() */
+static inline int
+isort_cmp(const void *a, const void *b)
 {
 	int32		aval = *((const int32 *) a);
 	int32		bval = *((const int32 *) b);
@@ -197,25 +197,16 @@ isort_cmp(const void *a, const void *b, void *arg)
 		return -1;
 	if (aval > bval)
 		return 1;
-
-	/*
-	 * Report if we have any duplicates.  If there are equal keys, qsort must
-	 * compare them at some point, else it wouldn't know whether one should go
-	 * before or after the other.
-	 */
-	*((bool *) arg) = true;
 	return 0;
 }
 
-/* Sort the given data (len >= 2).  Return true if any duplicates found */
-bool
-isort(int32 *a, int len)
-{
-	bool		r = false;
-
-	qsort_arg(a, len, sizeof(int32), isort_cmp, &r);
-	return r;
-}
+#define ST_SORT isort
+#define ST_ELEMENT_TYPE int32
+#define ST_COMPARE(a, b) isort_cmp(a, b)
+#define ST_SCOPE
+#define ST_DECLARE
+#define ST_DEFINE
+#include "lib/sort_template.h"
 
 /* Create a new int array with room for "num" elements */
 ArrayType *
@@ -311,14 +302,32 @@ ArrayType *
 _int_unique(ArrayType *r)
 {
 	int			num = ARRNELEMS(r);
-	bool		duplicates_found;	/* not used */
 
-	num = qunique_arg(ARRPTR(r), num, sizeof(int), isort_cmp,
-					  &duplicates_found);
+	num = qunique(ARRPTR(r), num, sizeof(int), isort_cmp);
 
 	return resize_intArrayType(r, num);
 }
 
+/* reverse elements of r in place */
+void
+_int_reverse(ArrayType *r)
+{
+	int		   *a = ARRPTR(r);
+	size_t		i = 0;
+	size_t		j = ARRNELEMS(r) - 1;
+
+	while (i < j)
+	{
+		int			tmp;
+
+		tmp = a[i];
+		a[i] = a[j];
+		a[j] = tmp;
+		i++;
+		j--;
+	}
+}
+
 void
 gensign(BITVECP sign, int *a, int len, int siglen)
 {
@@ -393,15 +402,3 @@ int_to_intset(int32 elem)
 	aa[0] = elem;
 	return result;
 }
-
-int
-compASC(const void *a, const void *b)
-{
-	return pg_cmp_s32(*(const int32 *) a, *(const int32 *) b);
-}
-
-int
-compDESC(const void *a, const void *b)
-{
-	return pg_cmp_s32(*(const int32 *) b, *(const int32 *) a);
-}
-- 
2.47.1

Reply via email to