Folks,

Please find attached a patch for $Subject.

Motivation:

When people are doing keyset pagination, the simple cases redound to
adding a WHERE that looks like

    (a, b, c) > (most_recent_a, most_recent_b, most_recent_c)

which corresponds to an ORDER BY clause that looks like

    ORDER BY a, b, c

The fun starts when there are mixes of ASC and DESC in the ORDER BY
clause. Reverse collations make this simpler by inverting the meaning
of > (or similar), which makes the rowtypes still sortable in a new
dictionary order, so the pagination would look like:


    (a, b, c) > (most_recent_a, most_recent_b COLLATE "C_backwards", 
most_recent_c)

with an ORDER BY like:

    ORDER BY a, b DESC, c

What say?

Best,
David.
-- 
David Fetter <david(at)fetter(dot)org> http://fetter.org/
Phone: +1 415 235 3778

Remember to vote!
Consider donating to Postgres: http://www.postgresql.org/about/donate
>From cd603421da0d8e4fc19401f24f46da8a26d88d3c Mon Sep 17 00:00:00 2001
From: David Fetter <da...@fetter.org>
Date: Sat, 16 Nov 2019 09:29:14 -0800
Subject: [PATCH v1] Reverse collations
To: hackers
MIME-Version: 1.0
Content-Type: multipart/mixed; boundary="------------2.23.0"

This is a multi-part message in MIME format.
--------------2.23.0
Content-Type: text/plain; charset=UTF-8; format=fixed
Content-Transfer-Encoding: 8bit


Make it possible to define collations which reverse the usual meanings
of <, <=, >, and >= for their corresponding collations.

This in turn makes it easier to do keyset pagination on text with mixes
of ASC and DESC in the ORDER BY.

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 55694c4368..3086936515 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -2106,6 +2106,13 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
       <entry>Is the collation deterministic?</entry>
      </row>
 
+     <row>
+      <entry><structfield>collisreverse</structfield></entry>
+      <entry><type>bool</type></entry>
+      <entry></entry>
+      <entry>Is the collation reversed?</entry>
+     </row>
+
      <row>
       <entry><structfield>collencoding</structfield></entry>
       <entry><type>int4</type></entry>
diff --git a/doc/src/sgml/ref/create_collation.sgml b/doc/src/sgml/ref/create_collation.sgml
index def4dda6e8..cb913871b7 100644
--- a/doc/src/sgml/ref/create_collation.sgml
+++ b/doc/src/sgml/ref/create_collation.sgml
@@ -24,7 +24,8 @@ CREATE COLLATION [ IF NOT EXISTS ] <replaceable>name</replaceable> (
     [ LC_CTYPE = <replaceable>lc_ctype</replaceable>, ]
     [ PROVIDER = <replaceable>provider</replaceable>, ]
     [ DETERMINISTIC = <replaceable>boolean</replaceable>, ]
-    [ VERSION = <replaceable>version</replaceable> ]
+    [ VERSION = <replaceable>version</replaceable>, ]
+    [ REVERSE = <replaceable>boolean</replaceable> ]
 )
 CREATE COLLATION [ IF NOT EXISTS ] <replaceable>name</replaceable> FROM <replaceable>existing_collation</replaceable>
 </synopsis>
@@ -166,6 +167,16 @@ CREATE COLLATION [ IF NOT EXISTS ] <replaceable>name</replaceable> FROM <replace
      </listitem>
     </varlistentry>
 
+    <varlistentry>
+     <term><literal>REVERSE</literal></term>
+
+     <listitem>
+      <para>
+       Specifies whether the collation should sort in reverse. Defaults to false.
+      </para>
+     </listitem>
+    </varlistentry>
+
     <varlistentry>
      <term><replaceable>existing_collation</replaceable></term>
 
@@ -225,6 +236,13 @@ CREATE COLLATION german_phonebook (provider = icu, locale = 'de-u-co-phonebk');
 </programlisting>
   </para>
 
+  <para>
+   To create a collation from the <literal>C</literal> locale that sorts backwards:
+<programlisting>
+CREATE COLLATION C_sdrawckab (locale = 'C', reverse = true);
+</programlisting>
+  </para>
+
   <para>
    To create a collation from an existing collation:
 <programlisting>
diff --git a/src/backend/catalog/pg_collation.c b/src/backend/catalog/pg_collation.c
index dd99d53547..6198f77f36 100644
--- a/src/backend/catalog/pg_collation.c
+++ b/src/backend/catalog/pg_collation.c
@@ -47,6 +47,7 @@ CollationCreate(const char *collname, Oid collnamespace,
 				Oid collowner,
 				char collprovider,
 				bool collisdeterministic,
+				bool collisreverse,
 				int32 collencoding,
 				const char *collcollate, const char *collctype,
 				const char *collversion,
@@ -162,6 +163,7 @@ CollationCreate(const char *collname, Oid collnamespace,
 	values[Anum_pg_collation_collowner - 1] = ObjectIdGetDatum(collowner);
 	values[Anum_pg_collation_collprovider - 1] = CharGetDatum(collprovider);
 	values[Anum_pg_collation_collisdeterministic - 1] = BoolGetDatum(collisdeterministic);
+	values[Anum_pg_collation_collisreverse - 1] = BoolGetDatum(collisreverse);
 	values[Anum_pg_collation_collencoding - 1] = Int32GetDatum(collencoding);
 	namestrcpy(&name_collate, collcollate);
 	values[Anum_pg_collation_collcollate - 1] = NameGetDatum(&name_collate);
diff --git a/src/backend/commands/collationcmds.c b/src/backend/commands/collationcmds.c
index 919e092483..ab37705b27 100644
--- a/src/backend/commands/collationcmds.c
+++ b/src/backend/commands/collationcmds.c
@@ -60,11 +60,13 @@ DefineCollation(ParseState *pstate, List *names, List *parameters, bool if_not_e
 	DefElem    *lcctypeEl = NULL;
 	DefElem    *providerEl = NULL;
 	DefElem    *deterministicEl = NULL;
+	DefElem    *reverseEl = NULL;
 	DefElem    *versionEl = NULL;
 	char	   *collcollate = NULL;
 	char	   *collctype = NULL;
 	char	   *collproviderstr = NULL;
 	bool		collisdeterministic = true;
+	bool		collisreverse = false;
 	int			collencoding = 0;
 	char		collprovider = 0;
 	char	   *collversion = NULL;
@@ -95,6 +97,8 @@ DefineCollation(ParseState *pstate, List *names, List *parameters, bool if_not_e
 			defelp = &providerEl;
 		else if (strcmp(defel->defname, "deterministic") == 0)
 			defelp = &deterministicEl;
+		else if (strcmp(defel->defname, "reverse") == 0)
+			defelp = &reverseEl;
 		else if (strcmp(defel->defname, "version") == 0)
 			defelp = &versionEl;
 		else
@@ -130,6 +134,7 @@ DefineCollation(ParseState *pstate, List *names, List *parameters, bool if_not_e
 		collctype = pstrdup(NameStr(((Form_pg_collation) GETSTRUCT(tp))->collctype));
 		collprovider = ((Form_pg_collation) GETSTRUCT(tp))->collprovider;
 		collisdeterministic = ((Form_pg_collation) GETSTRUCT(tp))->collisdeterministic;
+		collisreverse = ((Form_pg_collation) GETSTRUCT(tp))->collisreverse;
 		collencoding = ((Form_pg_collation) GETSTRUCT(tp))->collencoding;
 
 		ReleaseSysCache(tp);
@@ -165,6 +170,9 @@ DefineCollation(ParseState *pstate, List *names, List *parameters, bool if_not_e
 	if (deterministicEl)
 		collisdeterministic = defGetBoolean(deterministicEl);
 
+	if (reverseEl)
+		collisreverse = defGetBoolean(reverseEl);
+
 	if (versionEl)
 		collversion = defGetString(versionEl);
 
@@ -222,6 +230,7 @@ DefineCollation(ParseState *pstate, List *names, List *parameters, bool if_not_e
 							 GetUserId(),
 							 collprovider,
 							 collisdeterministic,
+							 collisreverse,
 							 collencoding,
 							 collcollate,
 							 collctype,
@@ -605,7 +614,7 @@ pg_import_system_collations(PG_FUNCTION_ARGS)
 			 * about existing ones.
 			 */
 			collid = CollationCreate(localebuf, nspid, GetUserId(),
-									 COLLPROVIDER_LIBC, true, enc,
+									 COLLPROVIDER_LIBC, true, false, enc,
 									 localebuf, localebuf,
 									 get_collation_actual_version(COLLPROVIDER_LIBC, localebuf),
 									 true, true);
@@ -666,7 +675,7 @@ pg_import_system_collations(PG_FUNCTION_ARGS)
 			int			enc = aliases[i].enc;
 
 			collid = CollationCreate(alias, nspid, GetUserId(),
-									 COLLPROVIDER_LIBC, true, enc,
+									 COLLPROVIDER_LIBC, true, false, enc,
 									 locale, locale,
 									 get_collation_actual_version(COLLPROVIDER_LIBC, locale),
 									 true, true);
@@ -728,7 +737,7 @@ pg_import_system_collations(PG_FUNCTION_ARGS)
 
 			collid = CollationCreate(psprintf("%s-x-icu", langtag),
 									 nspid, GetUserId(),
-									 COLLPROVIDER_ICU, true, -1,
+									 COLLPROVIDER_ICU, true, false, -1,
 									 collcollate, collcollate,
 									 get_collation_actual_version(COLLPROVIDER_ICU, collcollate),
 									 true, true);
diff --git a/src/backend/utils/adt/pg_locale.c b/src/backend/utils/adt/pg_locale.c
index fcdbaae37b..f9e47c8bff 100644
--- a/src/backend/utils/adt/pg_locale.c
+++ b/src/backend/utils/adt/pg_locale.c
@@ -1155,7 +1155,8 @@ lookup_collation_cache(Oid collation, bool set_flags)
 		collcollate = NameStr(collform->collcollate);
 		collctype = NameStr(collform->collctype);
 
-		cache_entry->collate_is_c = ((strcmp(collcollate, "C") == 0) ||
+		cache_entry->collate_is_c = !collform->collisreverse &&
+									((strcmp(collcollate, "C") == 0) ||
 									 (strcmp(collcollate, "POSIX") == 0));
 		cache_entry->ctype_is_c = ((strcmp(collctype, "C") == 0) ||
 								   (strcmp(collctype, "POSIX") == 0));
@@ -1357,6 +1358,7 @@ pg_newlocale_from_collation(Oid collid)
 		memset(&result, 0, sizeof(result));
 		result.provider = collform->collprovider;
 		result.deterministic = collform->collisdeterministic;
+		result.reverse = collform->collisreverse;
 
 		if (collform->collprovider == COLLPROVIDER_LIBC)
 		{
diff --git a/src/backend/utils/adt/varlena.c b/src/backend/utils/adt/varlena.c
index 69165eb311..02cbcbd23d 100644
--- a/src/backend/utils/adt/varlena.c
+++ b/src/backend/utils/adt/varlena.c
@@ -1603,6 +1603,9 @@ varstr_cmp(const char *arg1, int len1, const char *arg2, int len2, Oid collid)
 			if (a2p != a2buf)
 				pfree(a2p);
 
+			if (collid != DEFAULT_COLLATION_OID && mylocale->reverse)
+				INVERT_COMPARE_RESULT(result);
+
 			return result;
 		}
 #endif							/* WIN32 */
@@ -1681,6 +1684,9 @@ varstr_cmp(const char *arg1, int len1, const char *arg2, int len2, Oid collid)
 			(!mylocale || mylocale->deterministic))
 			result = strcmp(a1p, a2p);
 
+		if (collid != DEFAULT_COLLATION_OID && mylocale->reverse)
+			INVERT_COMPARE_RESULT(result);
+
 		if (a1p != a1buf)
 			pfree(a1p);
 		if (a2p != a2buf)
diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c
index 27602fa49c..61daf205d8 100644
--- a/src/backend/utils/cache/lsyscache.c
+++ b/src/backend/utils/cache/lsyscache.c
@@ -954,6 +954,22 @@ get_collation_isdeterministic(Oid colloid)
 	return result;
 }
 
+bool
+get_collation_isreverse(Oid colloid)
+{
+	HeapTuple	tp;
+	Form_pg_collation	colltup;
+	bool		result;
+
+	tp = SearchSysCache1(COLLOID, ObjectIdGetDatum(colloid));
+	if (!HeapTupleIsValid(tp))
+		elog(ERROR, "cache lookup failed for collation %u", colloid);
+	colltup = (Form_pg_collation) GETSTRUCT(tp);
+	result = colltup->collisreverse;
+	ReleaseSysCache(tp);
+	return result;
+}
+
 /*				---------- CONSTRAINT CACHE ----------					 */
 
 /*
diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c
index 88a261d9bd..33e357c300 100644
--- a/src/bin/initdb/initdb.c
+++ b/src/bin/initdb/initdb.c
@@ -1712,8 +1712,8 @@ setup_collation(FILE *cmdfd)
 	 * in pg_collation.h.  But add it before reading system collations, so
 	 * that it wins if libc defines a locale named ucs_basic.
 	 */
-	PG_CMD_PRINTF("INSERT INTO pg_collation (oid, collname, collnamespace, collowner, collprovider, collisdeterministic, collencoding, collcollate, collctype)"
-				   "VALUES (pg_nextoid('pg_catalog.pg_collation', 'oid', 'pg_catalog.pg_collation_oid_index'), 'ucs_basic', 'pg_catalog'::regnamespace, %u, '%c', true, %d, 'C', 'C');\n\n",
+	PG_CMD_PRINTF("INSERT INTO pg_collation (oid, collname, collnamespace, collowner, collprovider, collisdeterministic, collisreverse, collencoding, collcollate, collctype)"
+				   "VALUES (pg_nextoid('pg_catalog.pg_collation', 'oid', 'pg_catalog.pg_collation_oid_index'), 'ucs_basic', 'pg_catalog'::regnamespace, %u, '%c', true, false, %d, 'C', 'C');\n\n",
 				   BOOTSTRAP_SUPERUSERID, COLLPROVIDER_LIBC, PG_UTF8);
 
 	/* Now import all collations we can find in the operating system */
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index bf69adc2f4..32801744b9 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -13466,6 +13466,7 @@ dumpCollation(Archive *fout, CollInfo *collinfo)
 	PGresult   *res;
 	int			i_collprovider;
 	int			i_collisdeterministic;
+	int			i_collisreverse;
 	int			i_collcollate;
 	int			i_collctype;
 	const char *collprovider;
@@ -13501,6 +13502,13 @@ dumpCollation(Archive *fout, CollInfo *collinfo)
 		appendPQExpBufferStr(query,
 							 "true AS collisdeterministic, ");
 
+	if (fout->remoteVersion >= 130000)
+		appendPQExpBufferStr(query,
+							 "collisreverse, ");
+	else
+		appendPQExpBufferStr(query,
+							 "false as collisreverse, ");
+
 	appendPQExpBuffer(query,
 					  "collcollate, "
 					  "collctype "
@@ -13512,6 +13520,7 @@ dumpCollation(Archive *fout, CollInfo *collinfo)
 
 	i_collprovider = PQfnumber(res, "collprovider");
 	i_collisdeterministic = PQfnumber(res, "collisdeterministic");
+	i_collisreverse = PQfnumber(res, "collisreverse");
 	i_collcollate = PQfnumber(res, "collcollate");
 	i_collctype = PQfnumber(res, "collctype");
 
@@ -13540,6 +13549,9 @@ dumpCollation(Archive *fout, CollInfo *collinfo)
 	if (strcmp(PQgetvalue(res, 0, i_collisdeterministic), "f") == 0)
 		appendPQExpBufferStr(q, ", deterministic = false");
 
+	if (strcmp(PQgetvalue(res, 0, i_collisreverse), "t") == 0)
+		appendPQExpBufferStr(q, ", reverse = true");
+
 	if (strcmp(collcollate, collctype) == 0)
 	{
 		appendPQExpBufferStr(q, ", locale = ");
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index b3b9313b36..d87aa96915 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -4525,6 +4525,17 @@ listCollations(const char *pattern, bool verbose, bool showSystem)
 						  gettext_noop("yes"),
 						  gettext_noop("Deterministic?"));
 
+	if (pset.sversion >= 130000)
+		appendPQExpBuffer(&buf,
+						  ",\n       CASE WHEN c.collisreverse THEN '%s' ELSE '%s' END AS \"%s\"",
+						  gettext_noop("yes"), gettext_noop("no"),
+						  gettext_noop("Reverse?"));
+	else
+		appendPQExpBuffer(&buf,
+						  ",\n       '%s' AS \"%s\"",
+						  gettext_noop("no"),
+						  gettext_noop("Reverse?"));
+
 	if (verbose)
 		appendPQExpBuffer(&buf,
 						  ",\n       pg_catalog.obj_description(c.oid, 'pg_collation') AS \"%s\"",
diff --git a/src/include/catalog/pg_collation.h b/src/include/catalog/pg_collation.h
index d3366f361d..929d521d05 100644
--- a/src/include/catalog/pg_collation.h
+++ b/src/include/catalog/pg_collation.h
@@ -34,6 +34,7 @@ CATALOG(pg_collation,3456,CollationRelationId)
 	Oid			collowner;		/* owner of collation */
 	char		collprovider;	/* see constants below */
 	bool		collisdeterministic BKI_DEFAULT(t);
+	bool		collisreverse BKI_DEFAULT(f);
 	int32		collencoding;	/* encoding for this collation; -1 = "all" */
 	NameData	collcollate;	/* LC_COLLATE setting */
 	NameData	collctype;		/* LC_CTYPE setting */
@@ -63,6 +64,7 @@ extern Oid	CollationCreate(const char *collname, Oid collnamespace,
 							Oid collowner,
 							char collprovider,
 							bool collisdeterministic,
+							bool collisreverse,
 							int32 collencoding,
 							const char *collcollate, const char *collctype,
 							const char *collversion,
diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h
index c8df5bff9f..90e1fbc33a 100644
--- a/src/include/utils/lsyscache.h
+++ b/src/include/utils/lsyscache.h
@@ -92,6 +92,7 @@ extern void get_atttypetypmodcoll(Oid relid, AttrNumber attnum,
 								  Oid *typid, int32 *typmod, Oid *collid);
 extern char *get_collation_name(Oid colloid);
 extern bool get_collation_isdeterministic(Oid colloid);
+extern bool get_collation_isreverse(Oid colloid);
 extern char *get_constraint_name(Oid conoid);
 extern char *get_language_name(Oid langoid, bool missing_ok);
 extern Oid	get_opclass_family(Oid opclass);
diff --git a/src/include/utils/pg_locale.h b/src/include/utils/pg_locale.h
index b4b3aa5843..d571eab4c6 100644
--- a/src/include/utils/pg_locale.h
+++ b/src/include/utils/pg_locale.h
@@ -83,6 +83,7 @@ struct pg_locale_struct
 {
 	char		provider;
 	bool		deterministic;
+	bool		reverse;
 	union
 	{
 #ifdef HAVE_LOCALE_T
diff --git a/src/test/regress/expected/collate.out b/src/test/regress/expected/collate.out
index 0dee7d783a..2e51dde931 100644
--- a/src/test/regress/expected/collate.out
+++ b/src/test/regress/expected/collate.out
@@ -11,6 +11,7 @@
  */
 CREATE SCHEMA collate_tests;
 SET search_path = collate_tests;
+CREATE COLLATION "C_sdrawckab" (locale = 'C', reverse = true);
 CREATE TABLE collate_test1 (
     a int,
     b text COLLATE "C" NOT NULL
@@ -66,6 +67,14 @@ SELECT * FROM collate_test1 WHERE b COLLATE "C" >= 'abc' COLLATE "C";
  3 | bbc
 (2 rows)
 
+SELECT * FROM collate_test1 WHERE b COLLATE "C_sdrawckab" >= 'abc' COLLATE "C_sdrawckab";
+ a |  b  
+---+-----
+ 1 | abc
+ 2 | Abc
+ 4 | ABD
+(3 rows)
+
 SELECT * FROM collate_test1 WHERE b COLLATE "C" >= 'bbc' COLLATE "POSIX"; -- fail
 ERROR:  collation mismatch between explicit collations "C" and "POSIX"
 LINE 1: ...* FROM collate_test1 WHERE b COLLATE "C" >= 'bbc' COLLATE "P...
@@ -682,8 +691,9 @@ SELECT collation for ((SELECT b FROM collate_test1 LIMIT 1));
 -- must get rid of them.
 --
 DROP SCHEMA collate_tests CASCADE;
-NOTICE:  drop cascades to 17 other objects
-DETAIL:  drop cascades to table collate_test1
+NOTICE:  drop cascades to 18 other objects
+DETAIL:  drop cascades to collation "C_sdrawckab"
+drop cascades to table collate_test1
 drop cascades to table collate_test_like
 drop cascades to table collate_test2
 drop cascades to type testdomain_p
diff --git a/src/test/regress/sql/collate.sql b/src/test/regress/sql/collate.sql
index 89de26a227..1317ad4733 100644
--- a/src/test/regress/sql/collate.sql
+++ b/src/test/regress/sql/collate.sql
@@ -13,6 +13,8 @@
 CREATE SCHEMA collate_tests;
 SET search_path = collate_tests;
 
+CREATE COLLATION "C_sdrawckab" (locale = 'C', reverse = true);
+
 CREATE TABLE collate_test1 (
     a int,
     b text COLLATE "C" NOT NULL
@@ -42,6 +44,7 @@ INSERT INTO collate_test2 SELECT * FROM collate_test1;
 SELECT * FROM collate_test1 WHERE b COLLATE "C" >= 'abc';
 SELECT * FROM collate_test1 WHERE b >= 'abc' COLLATE "C";
 SELECT * FROM collate_test1 WHERE b COLLATE "C" >= 'abc' COLLATE "C";
+SELECT * FROM collate_test1 WHERE b COLLATE "C_sdrawckab" >= 'abc' COLLATE "C_sdrawckab";
 SELECT * FROM collate_test1 WHERE b COLLATE "C" >= 'bbc' COLLATE "POSIX"; -- fail
 
 CREATE DOMAIN testdomain_p AS text COLLATE "POSIX";

--------------2.23.0--


Reply via email to