Simon pointed out in a nearby thread [0] that the detail part of
partition-not-found error should show just the partition keys.  I posted a
patch on that thread [1], but to avoid confusion being caused by multitude
of patches over there I'm re-posting it here.

* What the patch does:

Currently we show the whole row in the detail part of the error.

CREATE TABLE measurement_year_month (
    logdate         date not null,
    peaktemp        int,
    unitsales       int
) PARTITION BY RANGE (EXTRACT(YEAR FROM logdate), EXTRACT(MONTH FROM
logdate));

# INSERT INTO measurement_year_month VALUES ('2016-12-02', 1, 1);
ERROR:  no partition of relation "measurement_year_month" found for row
DETAIL:  Failing row contains (2016-12-02, 1, 1).

Patch changes it look like the following:

# INSERT INTO measurement_year_month VALUES ('2016-12-02', 1, 1);
ERROR:  no partition of relation "measurement_year_month" found for row
DETAIL:  Partition key of the failing row contains
(date_part('year'::text, logdate), date_part('month'::text,
logdate))=(2016, 12).

It's similar to error detail shown when btree unique violation occurs:

-- just to be clear, using LIKE won't make measurement partitioned too
CREATE TABLE measurement (LIKE measurement_year_month);
CREATE UNIQUE INDEX ON measurement (EXTRACT(YEAR FROM logdate),
EXTRACT(MONTH FROM logdate))

# INSERT INTO measurement VALUES ('2016-12-02', 1, 1);
INSERT 0 1

# INSERT INTO measurement VALUES ('2016-12-02', 1, 1);
ERROR:  duplicate key value violates unique constraint
"measurement_date_part_date_part1_idx"
DETAIL:  Key (date_part('year'::text, logdate), date_part('month'::text,
logdate))=(2016, 12) already exists.

* Some of the implementation details of the patch here:

The rules about which columns to show or whether to show the DETAIL at all
are similar to those in BuildIndexValueDescription():

 - if user has SELECT privilege on the whole table, simply go ahead

 - if user doesn't have SELECT privilege on the table, check that they
   can see all the columns in the key (no point in showing partial key);
   however abort on finding an expression for which we don't try finding
   out privilege situation of whatever columns may be in the expression

Thanks,
Amit

[0]
https://www.postgresql.org/message-id/CANP8%2BjJBpWocfKrbJcaf3iBt9E3U%3DWPE_NC8YE6rye%2BYJ1sYnQ%40mail.gmail.com
[1]
https://www.postgresql.org/message-id/2f8df068-9a49-d74a-30af-7cd17bdee181%40lab.ntt.co.jp
>From b382d1ebb25ab3d577667d50e86430b34e57ab9a Mon Sep 17 00:00:00 2001
From: amit <amitlangot...@gmail.com>
Date: Mon, 13 Feb 2017 19:52:19 +0900
Subject: [PATCH] Show only the partition key upon failing to find a partition

Currently, the whole row is shown without column names.  Instead,
adopt a style similar to _bt_check_unique() in ExecFindPartition()
and show the failing key in format (key1, ...)=(val1, ...).

The key description shown in the error message is now built by the
new ExecBuildSlotPartitionKeyDescription(), which works along the
lines of BuildIndexValueDescription(), using similar rules about
what columns of the partition key to include depending on the user's
privileges to view the same.

A bunch of relevant tests are added.
---
 src/backend/catalog/partition.c      |  29 +++----
 src/backend/executor/execMain.c      | 147 ++++++++++++++++++++++++++++++-----
 src/backend/utils/adt/ruleutils.c    |  37 ++++++---
 src/include/catalog/partition.h      |  13 +++-
 src/include/utils/ruleutils.h        |   2 +
 src/test/regress/expected/insert.out |  38 ++++++++-
 src/test/regress/sql/insert.sql      |  30 +++++++
 7 files changed, 246 insertions(+), 50 deletions(-)

diff --git a/src/backend/catalog/partition.c b/src/backend/catalog/partition.c
index 4bcef58763..710ce07a6f 100644
--- a/src/backend/catalog/partition.c
+++ b/src/backend/catalog/partition.c
@@ -140,13 +140,6 @@ static int partition_bound_bsearch(PartitionKey key,
 						PartitionBoundInfo boundinfo,
 						void *probe, bool probe_is_bound, bool *is_equal);
 
-/* Support get_partition_for_tuple() */
-static void FormPartitionKeyDatum(PartitionDispatch pd,
-					  TupleTableSlot *slot,
-					  EState *estate,
-					  Datum *values,
-					  bool *isnull);
-
 /*
  * RelationBuildPartitionDesc
  *		Form rel's partition descriptor
@@ -1608,7 +1601,7 @@ generate_partition_qual(Relation rel)
  * the heap tuple passed in.
  * ----------------
  */
-static void
+void
 FormPartitionKeyDatum(PartitionDispatch pd,
 					  TupleTableSlot *slot,
 					  EState *estate,
@@ -1672,7 +1665,7 @@ int
 get_partition_for_tuple(PartitionDispatch *pd,
 						TupleTableSlot *slot,
 						EState *estate,
-						Oid *failed_at)
+						GetPartitionFailureData *gpfd)
 {
 	PartitionDispatch parent;
 	Datum		values[PARTITION_MAX_KEYS];
@@ -1693,13 +1686,6 @@ get_partition_for_tuple(PartitionDispatch *pd,
 		TupleTableSlot *myslot = parent->tupslot;
 		TupleConversionMap *map = parent->tupmap;
 
-		/* Quick exit */
-		if (partdesc->nparts == 0)
-		{
-			*failed_at = RelationGetRelid(parent->reldesc);
-			return -1;
-		}
-
 		if (myslot != NULL && map != NULL)
 		{
 			HeapTuple	tuple = ExecFetchSlotTuple(slot);
@@ -1710,6 +1696,14 @@ get_partition_for_tuple(PartitionDispatch *pd,
 			slot = myslot;
 		}
 
+		/* Quick exit */
+		if (partdesc->nparts == 0)
+		{
+			gpfd->failed_at = parent;
+			gpfd->failed_slot = slot;
+			return -1;
+		}
+
 		/*
 		 * Extract partition key from tuple. Expression evaluation machinery
 		 * that FormPartitionKeyDatum() invokes expects ecxt_scantuple to
@@ -1774,7 +1768,8 @@ get_partition_for_tuple(PartitionDispatch *pd,
 		if (cur_index < 0)
 		{
 			result = -1;
-			*failed_at = RelationGetRelid(parent->reldesc);
+			gpfd->failed_at = parent;
+			gpfd->failed_slot = slot;
 			break;
 		}
 		else if (parent->indexes[cur_index] >= 0)
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index a66639178a..9f3b10e364 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -60,6 +60,7 @@
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/rls.h"
+#include "utils/ruleutils.h"
 #include "utils/snapmgr.h"
 #include "utils/tqual.h"
 
@@ -95,6 +96,10 @@ static char *ExecBuildSlotValueDescription(Oid reloid,
 							  TupleDesc tupdesc,
 							  Bitmapset *modifiedCols,
 							  int maxfieldlen);
+static char *ExecBuildSlotPartitionKeyDescription(Relation rel,
+									 Datum *values,
+									 bool *isnull,
+									 int maxfieldlen);
 static void EvalPlanQualStart(EPQState *epqstate, EState *parentestate,
 				  Plan *planTree);
 
@@ -3187,33 +3192,137 @@ ExecFindPartition(ResultRelInfo *resultRelInfo, PartitionDispatch *pd,
 				  TupleTableSlot *slot, EState *estate)
 {
 	int			result;
-	Oid			failed_at;
+	GetPartitionFailureData gpfd;
 
-	result = get_partition_for_tuple(pd, slot, estate, &failed_at);
+	result = get_partition_for_tuple(pd, slot, estate, &gpfd);
 	if (result < 0)
 	{
-		Relation	rel = resultRelInfo->ri_RelationDesc;
+		Relation	failed_rel;
+		Datum		key_values[PARTITION_MAX_KEYS];
+		bool		key_isnull[PARTITION_MAX_KEYS];
 		char	   *val_desc;
-		Bitmapset  *insertedCols,
-				   *updatedCols,
-				   *modifiedCols;
-		TupleDesc	tupDesc = RelationGetDescr(rel);
-
-		insertedCols = GetInsertedColumns(resultRelInfo, estate);
-		updatedCols = GetUpdatedColumns(resultRelInfo, estate);
-		modifiedCols = bms_union(insertedCols, updatedCols);
-		val_desc = ExecBuildSlotValueDescription(RelationGetRelid(rel),
-												 slot,
-												 tupDesc,
-												 modifiedCols,
-												 64);
-		Assert(OidIsValid(failed_at));
+		ExprContext *ecxt = GetPerTupleExprContext(estate);
+
+		failed_rel = gpfd.failed_at->reldesc;
+		ecxt->ecxt_scantuple = slot;
+		FormPartitionKeyDatum(gpfd.failed_at, gpfd.failed_slot, estate,
+							  key_values, key_isnull);
+		val_desc = ExecBuildSlotPartitionKeyDescription(failed_rel,
+														key_values,
+														key_isnull,
+														64);
+		Assert(OidIsValid(RelationGetRelid(failed_rel)));
 		ereport(ERROR,
 				(errcode(ERRCODE_CHECK_VIOLATION),
 				 errmsg("no partition of relation \"%s\" found for row",
-						get_rel_name(failed_at)),
-			val_desc ? errdetail("Failing row contains %s.", val_desc) : 0));
+						RelationGetRelationName(failed_rel)),
+			val_desc ? errdetail("Partition key of the failing row contains %s.", val_desc) : 0));
 	}
 
 	return result;
 }
+
+/*
+ * BuildSlotPartitinKeyDescription
+ *
+ * Construct a string describing the contents of the partition key in the
+ * form "(key_name, ...)=(key_value, ...)".  This is currently used for
+ * building error messages when ExecFindPartition() fails to find partition
+ * for a row.
+ *
+ * Note that if the user does not have permissions to view all of the
+ * columns involved then a NULL is returned.
+ */
+static char *
+ExecBuildSlotPartitionKeyDescription(Relation rel,
+									 Datum *values,
+									 bool *isnull,
+									 int maxfieldlen)
+{
+	StringInfoData	buf;
+	PartitionKey	key = RelationGetPartitionKey(rel);
+	int			partnatts = get_partition_natts(key);
+	int			i;
+	Oid			relid = RelationGetRelid(rel);
+	AclResult	aclresult;
+
+	/*
+	 * Check permissions- if the user does not have access to view all of the
+	 * key columns then return NULL to avoid leaking data.
+	 *
+	 * First check if RLS is enabled for the relation.  If so, return NULL to
+	 * avoid leaking data.
+	 *
+	 * Next we need to check table-level SELECT access and then, if there is
+	 * no access there, check column-level permissions.
+	 */
+
+	/* RLS check- if RLS is enabled then we don't return anything. */
+	if (check_enable_rls(relid, InvalidOid, true) == RLS_ENABLED)
+		return NULL;
+
+	/* Table-level SELECT is enough, if the user has it */
+	aclresult = pg_class_aclcheck(relid, GetUserId(), ACL_SELECT);
+	if (aclresult != ACLCHECK_OK)
+	{
+		/*
+		 * No table-level access, so step through the columns in the partition
+		 * key and make sure the user has SELECT rights on all of them.
+		 */
+		for (i = 0; i < partnatts; i++)
+		{
+			AttrNumber	attnum = get_partition_col_attnum(key, i);
+
+			/*
+			 * Note that if attnum == InvalidAttrNumber, then this partition
+			 * key column is an expression and we return no detail rather than
+			 * try to figure out what column(s) the expression includes and
+			 * if the user has SELECT rights on them.
+			 */
+			if (attnum == InvalidAttrNumber ||
+				pg_attribute_aclcheck(relid, attnum, GetUserId(),
+									  ACL_SELECT) != ACLCHECK_OK)
+				return NULL;
+		}
+	}
+
+	initStringInfo(&buf);
+	appendStringInfo(&buf, "(%s)=(",
+					 pg_get_partkeydef_columns(relid, true));
+
+	for (i = 0; i < partnatts; i++)
+	{
+		char	   *val;
+		int			vallen;
+
+		if (isnull[i])
+			val = "null";
+		else
+		{
+			Oid			foutoid;
+			bool		typisvarlena;
+
+			getTypeOutputInfo(get_partition_col_typid(key, i),
+							  &foutoid, &typisvarlena);
+			val = OidOutputFunctionCall(foutoid, values[i]);
+		}
+
+		if (i > 0)
+			appendStringInfoString(&buf, ", ");
+
+		/* truncate if needed */
+		vallen = strlen(val);
+		if (vallen <= maxfieldlen)
+			appendStringInfoString(&buf, val);
+		else
+		{
+			vallen = pg_mbcliplen(val, vallen, maxfieldlen);
+			appendBinaryStringInfo(&buf, val, vallen);
+			appendStringInfoString(&buf, "...");
+		}
+	}
+
+	appendStringInfoChar(&buf, ')');
+
+	return buf.data;
+}
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index f355954b53..0c4e28fd9b 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -317,7 +317,8 @@ static char *pg_get_indexdef_worker(Oid indexrelid, int colno,
 					   const Oid *excludeOps,
 					   bool attrsOnly, bool showTblSpc,
 					   int prettyFlags, bool missing_ok);
-static char *pg_get_partkeydef_worker(Oid relid, int prettyFlags);
+static char *pg_get_partkeydef_worker(Oid relid, int prettyFlags,
+						 bool attrsOnly);
 static char *pg_get_constraintdef_worker(Oid constraintId, bool fullCommand,
 							int prettyFlags, bool missing_ok);
 static text *pg_get_expr_worker(text *expr, Oid relid, const char *relname,
@@ -1431,14 +1432,26 @@ pg_get_partkeydef(PG_FUNCTION_ARGS)
 	Oid			relid = PG_GETARG_OID(0);
 
 	PG_RETURN_TEXT_P(string_to_text(pg_get_partkeydef_worker(relid,
-														PRETTYFLAG_INDENT)));
+														PRETTYFLAG_INDENT,
+															 false)));
+}
+
+/* Internal version that just reports the column definitions */
+char *
+pg_get_partkeydef_columns(Oid relid, bool pretty)
+{
+	int			prettyFlags;
+
+	prettyFlags = pretty ? PRETTYFLAG_PAREN | PRETTYFLAG_INDENT : PRETTYFLAG_INDENT;
+	return pg_get_partkeydef_worker(relid, prettyFlags, true);
 }
 
 /*
  * Internal workhorse to decompile a partition key definition.
  */
 static char *
-pg_get_partkeydef_worker(Oid relid, int prettyFlags)
+pg_get_partkeydef_worker(Oid relid, int prettyFlags,
+						 bool attrsOnly)
 {
 	Form_pg_partitioned_table form;
 	HeapTuple	tuple;
@@ -1508,17 +1521,20 @@ pg_get_partkeydef_worker(Oid relid, int prettyFlags)
 	switch (form->partstrat)
 	{
 		case PARTITION_STRATEGY_LIST:
-			appendStringInfo(&buf, "LIST");
+			if (!attrsOnly)
+				appendStringInfo(&buf, "LIST");
 			break;
 		case PARTITION_STRATEGY_RANGE:
-			appendStringInfo(&buf, "RANGE");
+			if (!attrsOnly)
+				appendStringInfo(&buf, "RANGE");
 			break;
 		default:
 			elog(ERROR, "unexpected partition strategy: %d",
 				 (int) form->partstrat);
 	}
 
-	appendStringInfo(&buf, " (");
+	if (!attrsOnly)
+		appendStringInfo(&buf, " (");
 	sep = "";
 	for (keyno = 0; keyno < form->partnatts; keyno++)
 	{
@@ -1561,14 +1577,17 @@ pg_get_partkeydef_worker(Oid relid, int prettyFlags)
 
 		/* Add collation, if not default for column */
 		partcoll = partcollation->values[keyno];
-		if (OidIsValid(partcoll) && partcoll != keycolcollation)
+		if (!attrsOnly && OidIsValid(partcoll) && partcoll != keycolcollation)
 			appendStringInfo(&buf, " COLLATE %s",
 							 generate_collation_name((partcoll)));
 
 		/* Add the operator class name, if not default */
-		get_opclass_name(partclass->values[keyno], keycoltype, &buf);
+		if (!attrsOnly)
+			get_opclass_name(partclass->values[keyno], keycoltype, &buf);
 	}
-	appendStringInfoChar(&buf, ')');
+
+	if (!attrsOnly)
+		appendStringInfoChar(&buf, ')');
 
 	/* Clean up */
 	ReleaseSysCache(tuple);
diff --git a/src/include/catalog/partition.h b/src/include/catalog/partition.h
index b195d1a5ab..698b388c46 100644
--- a/src/include/catalog/partition.h
+++ b/src/include/catalog/partition.h
@@ -70,6 +70,12 @@ typedef struct PartitionDispatchData
 
 typedef struct PartitionDispatchData *PartitionDispatch;
 
+typedef struct GetPartitionFailureData
+{
+	PartitionDispatch	failed_at;
+	TupleTableSlot	   *failed_slot;
+} GetPartitionFailureData;
+
 extern void RelationBuildPartitionDesc(Relation relation);
 extern bool partition_bounds_equal(PartitionKey key,
 					   PartitionBoundInfo p1, PartitionBoundInfo p2);
@@ -85,8 +91,13 @@ extern List *RelationGetPartitionQual(Relation rel);
 extern PartitionDispatch *RelationGetPartitionDispatchInfo(Relation rel,
 								 int lockmode, int *num_parted,
 								 List **leaf_part_oids);
+extern void FormPartitionKeyDatum(PartitionDispatch pd,
+					  TupleTableSlot *slot,
+					  EState *estate,
+					  Datum *values,
+					  bool *isnull);
 extern int get_partition_for_tuple(PartitionDispatch *pd,
 						TupleTableSlot *slot,
 						EState *estate,
-						Oid *failed_at);
+						GetPartitionFailureData *gpfd);
 #endif   /* PARTITION_H */
diff --git a/src/include/utils/ruleutils.h b/src/include/utils/ruleutils.h
index 3e8aad97e2..42fc872c4a 100644
--- a/src/include/utils/ruleutils.h
+++ b/src/include/utils/ruleutils.h
@@ -21,6 +21,8 @@
 extern char *pg_get_indexdef_string(Oid indexrelid);
 extern char *pg_get_indexdef_columns(Oid indexrelid, bool pretty);
 
+extern char *pg_get_partkeydef_columns(Oid relid, bool pretty);
+
 extern char *pg_get_constraintdef_command(Oid constraintId);
 extern char *deparse_expression(Node *expr, List *dpcontext,
 				   bool forceprefix, bool showimplicit);
diff --git a/src/test/regress/expected/insert.out b/src/test/regress/expected/insert.out
index 81af3ef497..083d977a95 100644
--- a/src/test/regress/expected/insert.out
+++ b/src/test/regress/expected/insert.out
@@ -234,14 +234,14 @@ insert into part_ee_ff2 values ('ff', 11);
 -- fail
 insert into range_parted values ('a', 0);
 ERROR:  no partition of relation "range_parted" found for row
-DETAIL:  Failing row contains (a, 0).
+DETAIL:  Partition key of the failing row contains (a, (b + 0))=(a, 0).
 -- ok
 insert into range_parted values ('a', 1);
 insert into range_parted values ('a', 10);
 -- fail
 insert into range_parted values ('a', 20);
 ERROR:  no partition of relation "range_parted" found for row
-DETAIL:  Failing row contains (a, 20).
+DETAIL:  Partition key of the failing row contains (a, (b + 0))=(a, 20).
 -- ok
 insert into range_parted values ('b', 1);
 insert into range_parted values ('b', 10);
@@ -265,10 +265,10 @@ insert into list_parted (a) values ('aA');
 -- fail (partition of part_ee_ff not found in both cases)
 insert into list_parted values ('EE', 0);
 ERROR:  no partition of relation "part_ee_ff" found for row
-DETAIL:  Failing row contains (EE, 0).
+DETAIL:  Partition key of the failing row contains (b)=(0).
 insert into part_ee_ff values ('EE', 0);
 ERROR:  no partition of relation "part_ee_ff" found for row
-DETAIL:  Failing row contains (EE, 0).
+DETAIL:  Partition key of the failing row contains (b)=(0).
 -- ok
 insert into list_parted values ('EE', 1);
 insert into part_ee_ff values ('EE', 10);
@@ -351,6 +351,10 @@ select tableoid::regclass, * from p;
  p11      | 1 | 2
 (1 row)
 
+-- check that proper message is shown after failure to route through p1
+insert into p values (1, 5);
+ERROR:  no partition of relation "p1" found for row
+DETAIL:  Partition key of the failing row contains ((b + 0))=(1).
 truncate p;
 alter table p add constraint check_b check (b = 3);
 -- check that correct input row is shown when constraint check_b fails on p11
@@ -386,5 +390,31 @@ with ins (a, b, c) as
  p4  | 1 |  30 |  39
 (5 rows)
 
+-- check that message shown after failure to find a partition shows the
+-- appropriate key description (or none) in various situations
+create table key_desc (a int, b int) partition by list ((a+0));
+create table key_desc_1 partition of key_desc for values in (1) partition by range (b);
+create user someone_else;
+grant select (a) on key_desc_1 to someone_else;
+grant insert on key_desc to someone_else;
+set role someone_else;
+-- no key description is shown
+insert into key_desc values (1, 1);
+ERROR:  no partition of relation "key_desc_1" found for row
+reset role;
+grant select (b) on key_desc_1 to someone_else;
+set role someone_else;
+-- key description (b)=(1) is now shown
+insert into key_desc values (1, 1);
+ERROR:  no partition of relation "key_desc_1" found for row
+DETAIL:  Partition key of the failing row contains (b)=(1).
+-- key description is not shown if key contains expression
+insert into key_desc values (2, 1);
+ERROR:  no partition of relation "key_desc" found for row
+reset role;
+revoke all on key_desc from someone_else;
+revoke all on key_desc_1 from someone_else;
+drop role someone_else;
+drop table key_desc, key_desc_1;
 -- cleanup
 drop table p, p1, p11, p12, p2, p3, p4;
diff --git a/src/test/regress/sql/insert.sql b/src/test/regress/sql/insert.sql
index 454e1ce2e7..3f982b6449 100644
--- a/src/test/regress/sql/insert.sql
+++ b/src/test/regress/sql/insert.sql
@@ -215,6 +215,9 @@ alter table p attach partition p1 for values from (1, 2) to (1, 10);
 insert into p values (1, 2);
 select tableoid::regclass, * from p;
 
+-- check that proper message is shown after failure to route through p1
+insert into p values (1, 5);
+
 truncate p;
 alter table p add constraint check_b check (b = 3);
 -- check that correct input row is shown when constraint check_b fails on p11
@@ -240,5 +243,32 @@ with ins (a, b, c) as
   (insert into p (b, a) select s.a, 1 from generate_series(2, 39) s(a) returning tableoid::regclass, *)
   select a, b, min(c), max(c) from ins group by a, b order by 1;
 
+-- check that message shown after failure to find a partition shows the
+-- appropriate key description (or none) in various situations
+create table key_desc (a int, b int) partition by list ((a+0));
+create table key_desc_1 partition of key_desc for values in (1) partition by range (b);
+
+create user someone_else;
+grant select (a) on key_desc_1 to someone_else;
+grant insert on key_desc to someone_else;
+
+set role someone_else;
+-- no key description is shown
+insert into key_desc values (1, 1);
+
+reset role;
+grant select (b) on key_desc_1 to someone_else;
+set role someone_else;
+-- key description (b)=(1) is now shown
+insert into key_desc values (1, 1);
+
+-- key description is not shown if key contains expression
+insert into key_desc values (2, 1);
+reset role;
+revoke all on key_desc from someone_else;
+revoke all on key_desc_1 from someone_else;
+drop role someone_else;
+drop table key_desc, key_desc_1;
+
 -- cleanup
 drop table p, p1, p11, p12, p2, p3, p4;
-- 
2.11.0

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to