forward patch to pg_hackers
There is fixed patch. Please, Jaime, can you look on it?
Thank You
Pavel
2009/7/30 Tom Lane <[email protected]>:
> Jaime Casanova <[email protected]> writes:
>>> On Mon, Jul 20, 2009 at 10:09 AM, Alvaro
>>>> Getting rid of the check on natts was "ungood" ... it needs to compare
>>>> the number of undropped columns of both tupdescs.
>
>> patch attached
>
> This patch is *still* introducing more bugs than it fixes. The reason
> is that it has modified validate_tupdesc_compat to allow the tupdescs to
> be somewhat different, but has fixed only one of the several call sites
> to deal with the consequences of that. The others will now become crash
> risks if we apply it as-is.
>
> What I would suggest as a suitable plan for a fix is to modify
> validate_tupdesc_compat so that it returns a flag indicating whether the
> tupdesc compatibility is exact or requires translation. If translation
> is required, provide another routine that does that --- probably using a
> mapping data structure set up by validate_tupdesc_compat, since in some
> of these cases we'll be processing many tuples. Then the callers just
> have to know enough to call the tuple-translation function when
> validate_tupdesc_compat tells them to.
>
> There are a number of other places in the system with similar
> requirements, although none of them seem to be exact matches.
> In particular ExecEvalConvertRowtype() provides a good template for
> efficient translation logic, but it's using column name matching
> rather than positional matching so you couldn't share the setup logic.
> I'm not sure if it's worth moving all this code into the core so that
> it can be shared with other future uses (maybe in other PLs), but it's
> worth considering that. access/common/heaptuple.c or tupdesc.c might
> be a good place for it if we decide to do that.
>
> The executor's junkfilter code is pretty nearly related as well, and
> perhaps the Right Thing is to make all of this stuff into applications
> of junkfilters with different setup routines for the different
> requirements. However the junkfilter code is designed to work with
> tuples that are in TupleTableSlots, which isn't particularly helpful for
> plpgsql's uses, so maybe trying to unify with that code is more trouble
> than it's worth.
>
> I'm marking this patch as Waiting on Author again, although perhaps
> Returned with Feedback would be better since my suggestions amount
> to wholesale rewrites.
>
> regards, tom lane
>
*** ./src/pl/plpgsql/src/pl_exec.c.orig 2009-07-22 04:31:38.000000000 +0200
--- ./src/pl/plpgsql/src/pl_exec.c 2009-08-03 21:21:42.196893375 +0200
***************
*** 190,196 ****
Oid reqtype, int32 reqtypmod,
bool isnull);
static void exec_init_tuple_store(PLpgSQL_execstate *estate);
! static void validate_tupdesc_compat(TupleDesc expected, TupleDesc returned,
const char *msg);
static void exec_set_found(PLpgSQL_execstate *estate, bool state);
static void plpgsql_create_econtext(PLpgSQL_execstate *estate);
--- 190,200 ----
Oid reqtype, int32 reqtypmod,
bool isnull);
static void exec_init_tuple_store(PLpgSQL_execstate *estate);
! static HeapTuple convert_tuple(TupleDesc dest_desc, TupleDesc src_desc,
! int *conversion_map,
! HeapTuple tuple);
! static int *validate_tupdesc_compat(TupleDesc expected, TupleDesc returned,
! bool *needs_conversion,
const char *msg);
static void exec_set_found(PLpgSQL_execstate *estate, bool state);
static void plpgsql_create_econtext(PLpgSQL_execstate *estate);
***************
*** 216,221 ****
--- 220,227 ----
ErrorContextCallback plerrcontext;
int i;
int rc;
+ int *conversion_map = NULL;
+ bool needs_conversion = false;
/*
* Setup the execution state
***************
*** 388,394 ****
{
case TYPEFUNC_COMPOSITE:
/* got the expected result rowtype, now check it */
! validate_tupdesc_compat(tupdesc, estate.rettupdesc,
"returned record type does not match expected record type");
break;
case TYPEFUNC_RECORD:
--- 394,400 ----
{
case TYPEFUNC_COMPOSITE:
/* got the expected result rowtype, now check it */
! conversion_map = validate_tupdesc_compat(tupdesc, estate.rettupdesc, &needs_conversion,
"returned record type does not match expected record type");
break;
case TYPEFUNC_RECORD:
***************
*** 414,422 ****
* Copy tuple to upper executor memory, as a tuple Datum. Make
* sure it is labeled with the caller-supplied tuple type.
*/
! estate.retval =
! PointerGetDatum(SPI_returntuple((HeapTuple) DatumGetPointer(estate.retval),
tupdesc));
}
else
{
--- 420,437 ----
* Copy tuple to upper executor memory, as a tuple Datum. Make
* sure it is labeled with the caller-supplied tuple type.
*/
! if (!needs_conversion)
! estate.retval =
! PointerGetDatum(SPI_returntuple((HeapTuple) DatumGetPointer(estate.retval),
tupdesc));
+ else
+ {
+ HeapTuple tuple = (HeapTuple) DatumGetPointer(estate.retval);
+ tuple = convert_tuple(tupdesc, estate.rettupdesc, conversion_map, tuple);
+ estate.retval =
+ PointerGetDatum(SPI_returntuple(tuple, tupdesc));
+ pfree(conversion_map);
+ }
}
else
{
***************
*** 486,491 ****
--- 501,508 ----
PLpgSQL_rec *rec_new,
*rec_old;
HeapTuple rettup;
+ int *conversion_map;
+ bool needs_conversion;
/*
* Setup the execution state
***************
*** 705,715 ****
rettup = NULL;
else
{
! validate_tupdesc_compat(trigdata->tg_relation->rd_att,
! estate.rettupdesc,
"returned row structure does not match the structure of the triggering table");
/* Copy tuple to upper executor memory */
! rettup = SPI_copytuple((HeapTuple) DatumGetPointer(estate.retval));
}
/*
--- 722,741 ----
rettup = NULL;
else
{
! conversion_map = validate_tupdesc_compat(trigdata->tg_relation->rd_att,
! estate.rettupdesc, &needs_conversion,
"returned row structure does not match the structure of the triggering table");
/* Copy tuple to upper executor memory */
! if (!needs_conversion)
! rettup = SPI_copytuple((HeapTuple) DatumGetPointer(estate.retval));
! else
! {
! HeapTuple tuple = (HeapTuple) DatumGetPointer(estate.retval);
! tuple = convert_tuple(trigdata->tg_relation->rd_att,
! estate.rettupdesc, conversion_map, tuple);
! rettup = SPI_copytuple(tuple);
! pfree(conversion_map);
! }
}
/*
***************
*** 2191,2196 ****
--- 2217,2224 ----
case PLPGSQL_DTYPE_REC:
{
PLpgSQL_rec *rec = (PLpgSQL_rec *) retvar;
+ int *conversion_map;
+ bool needs_conversion;
if (!HeapTupleIsValid(rec->tup))
ereport(ERROR,
***************
*** 2199,2207 ****
rec->refname),
errdetail("The tuple structure of a not-yet-assigned"
" record is indeterminate.")));
! validate_tupdesc_compat(tupdesc, rec->tupdesc,
! "wrong record type supplied in RETURN NEXT");
! tuple = rec->tup;
}
break;
--- 2227,2241 ----
rec->refname),
errdetail("The tuple structure of a not-yet-assigned"
" record is indeterminate.")));
! conversion_map = validate_tupdesc_compat(tupdesc, rec->tupdesc, &needs_conversion,
! "wrong record type supplied in RETURN NEXT");
! if (!needs_conversion)
! tuple = rec->tup;
! else
! {
! tuple = convert_tuple(tupdesc, rec->tupdesc, conversion_map, rec->tup);
! pfree(conversion_map);
! }
}
break;
***************
*** 2285,2290 ****
--- 2319,2332 ----
{
Portal portal;
uint32 processed = 0;
+ int *conversion_map;
+ bool needs_conversion;
+ int returned_natts;
+ int expected_natts;
+ Datum *returned_values;
+ Datum *expected_values;
+ bool *returned_nulls;
+ bool *expected_nulls;
if (!estate->retisset)
ereport(ERROR,
***************
*** 2307,2340 ****
stmt->params);
}
! validate_tupdesc_compat(estate->rettupdesc, portal->tupDesc,
"structure of query does not match function result type");
while (true)
{
MemoryContext old_cxt;
- int i;
SPI_cursor_fetch(portal, true, 50);
if (SPI_processed == 0)
break;
! old_cxt = MemoryContextSwitchTo(estate->tuple_store_cxt);
! for (i = 0; i < SPI_processed; i++)
! {
! HeapTuple tuple = SPI_tuptable->vals[i];
! tuplestore_puttuple(estate->tuple_store, tuple);
! processed++;
}
- MemoryContextSwitchTo(old_cxt);
-
- SPI_freetuptable(SPI_tuptable);
}
SPI_freetuptable(SPI_tuptable);
SPI_cursor_close(portal);
estate->eval_processed = processed;
exec_set_found(estate, processed != 0);
--- 2349,2442 ----
stmt->params);
}
! conversion_map = validate_tupdesc_compat(estate->rettupdesc, portal->tupDesc, &needs_conversion,
"structure of query does not match function result type");
+
+ /*
+ * Prepare space for tuple conversion, when it is necessary - when
+ * returned or evaluated tuple has dropped columns.
+ */
+ if (needs_conversion)
+ {
+ int i;
+
+ returned_natts= portal->tupDesc->natts;
+ expected_natts = estate->rettupdesc->natts;
+
+ returned_values = (Datum *) palloc(returned_natts * sizeof(Datum));
+ returned_nulls = (bool *) palloc(returned_natts * sizeof(bool));
+ expected_values = (Datum *) palloc(expected_natts * sizeof(Datum));
+ expected_nulls = (bool *) palloc(expected_natts * sizeof(bool));
+
+ for (i = 0; i < expected_natts; i++)
+ expected_nulls[i] = true;
+ }
while (true)
{
MemoryContext old_cxt;
SPI_cursor_fetch(portal, true, 50);
if (SPI_processed == 0)
break;
! CHECK_FOR_INTERRUPTS();
! /* when conversion is not necessary, then store tuples in tuple store */
! if (!needs_conversion)
! {
! int i;
!
! old_cxt = MemoryContextSwitchTo(estate->tuple_store_cxt);
! for (i = 0; i < SPI_processed; i++)
! {
! tuplestore_puttuple(estate->tuple_store, SPI_tuptable->vals[i]);
! processed++;
! }
! MemoryContextSwitchTo(old_cxt);
! }
! else
! {
! int i;
! int j;
! HeapTuple tuple;
!
! /* convert tuple and store it in tuple store */
! for (i = 0; i < SPI_processed; i++)
! {
! /* Deconstruct the tuple */
! heap_deform_tuple(SPI_tuptable->vals[i], portal->tupDesc,
! returned_values,
! returned_nulls);
! for (j = 0; j < expected_natts; j++)
! if (conversion_map[j] != 0)
! {
! expected_values[j] = returned_values[conversion_map[j] - 1];
! expected_nulls[j] = returned_nulls[conversion_map[j] - 1];
! }
! /* expected_nulls for dropped atts are true allways */
!
! tuple = heap_form_tuple(estate->rettupdesc, expected_values, expected_nulls);
! old_cxt = MemoryContextSwitchTo(estate->tuple_store_cxt);
! tuplestore_puttuple(estate->tuple_store, tuple);
! MemoryContextSwitchTo(old_cxt);
! processed++;
! }
}
}
SPI_freetuptable(SPI_tuptable);
SPI_cursor_close(portal);
+ if (needs_conversion)
+ {
+ pfree(expected_values);
+ pfree(expected_nulls);
+ pfree(returned_values);
+ pfree(returned_nulls);
+ pfree(conversion_map);
+ }
+
estate->eval_processed = processed;
exec_set_found(estate, processed != 0);
***************
*** 5121,5162 ****
}
/*
! * Validates compatibility of supplied TupleDesc pair by checking number and type
! * of attributes.
*/
! static void
! validate_tupdesc_compat(TupleDesc expected, TupleDesc returned, const char *msg)
{
! int i;
! const char *dropped_column_type = gettext_noop("N/A (dropped column)");
if (!expected || !returned)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("%s", _(msg))));
! if (expected->natts != returned->natts)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("%s", _(msg)),
errdetail("Number of returned columns (%d) does not match "
"expected column count (%d).",
! returned->natts, expected->natts)));
! for (i = 0; i < expected->natts; i++)
! if (expected->attrs[i]->atttypid != returned->attrs[i]->atttypid)
! ereport(ERROR,
! (errcode(ERRCODE_DATATYPE_MISMATCH),
! errmsg("%s", _(msg)),
! errdetail("Returned type %s does not match expected type "
! "%s in column \"%s\".",
! OidIsValid(returned->attrs[i]->atttypid) ?
! format_type_be(returned->attrs[i]->atttypid) :
! _(dropped_column_type),
! OidIsValid(expected->attrs[i]->atttypid) ?
! format_type_be(expected->attrs[i]->atttypid) :
! _(dropped_column_type),
! NameStr(expected->attrs[i]->attname))));
}
/* ----------
--- 5223,5360 ----
}
/*
! * convert_tuple function transforms tuple format described in src_desc to
! * format described in dest_desc descriptor. These descriptors are compatible. One
! * allowed difference are dropped attribs.
*/
! static HeapTuple
! convert_tuple(TupleDesc dest_desc, TupleDesc src_desc, int *conversion_map, HeapTuple tuple)
{
! int snatts = src_desc->natts;
! int dnatts = dest_desc->natts;
! Datum *src_values;
! bool *src_nulls;
! Datum *dest_values;
! bool *dest_nulls;
! int i;
! HeapTuple result;
!
! src_values = (Datum *) palloc0 (snatts * sizeof(Datum));
! src_nulls = (bool *) palloc(snatts * sizeof(bool));
! dest_values = (Datum *) palloc0 (dnatts * sizeof(Datum));
! dest_nulls = (bool *) palloc(dnatts * sizeof(Datum));
!
! heap_deform_tuple(tuple, src_desc, src_values, src_nulls);
! for (i = 0; i < dnatts; i++)
! {
! if (conversion_map[i] != 0)
! {
! dest_values[i] = src_values[conversion_map[i] - 1];
! dest_nulls[i] = src_nulls[conversion_map[i] - 1];
! }
! else
! dest_nulls[i] = true;
! }
!
! result = heap_form_tuple(dest_desc, dest_values, dest_nulls);
!
! pfree(src_values);
! pfree(src_nulls);
! pfree(dest_values);
! pfree(dest_nulls);
!
! return result;
! }
+
+ /*
+ * Validates compatibility of supplied TupleDesc pair by checking number and type
+ * of attributes. When TupDescs has dropped columns, then needs_translation flag is
+ * true and function returns array of source tuple field's positions in destionation
+ * tuple. Start is from 1. Zero means null for dropped attribute.
+ */
+ static int *
+ validate_tupdesc_compat(TupleDesc expected, TupleDesc returned,
+ bool *needs_conversion,
+ const char *msg)
+ {
+ int i;
+ int j = 0;
+ int *result = NULL;
+ int valid_expected_natts;
+ int valid_returned_natts;
+ int conversion_map[FUNC_MAX_ARGS];
+
if (!expected || !returned)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("%s", _(msg))));
! valid_expected_natts = expected->natts;
! valid_returned_natts = returned->natts;
!
! for (i = 0; i < expected->natts; i++)
! {
! if (!expected->attrs[i]->attisdropped)
! {
! /* find first valid returned att */
! while (j < returned->natts && returned->attrs[j]->attisdropped)
! {
! valid_returned_natts--;
! j++;
! }
! /* validate att pair only when j is valid */
! if (j < returned->natts)
! {
! Assert(!expected->attrs[i]->attisdropped);
! Assert(!returned->attrs[j]->attisdropped);
!
! if (expected->attrs[i]->atttypid != returned->attrs[j]->atttypid)
! ereport(ERROR,
! (errcode(ERRCODE_DATATYPE_MISMATCH),
! errmsg("%s", _(msg)),
! errdetail("Returned type %s does not match expected type "
! "%s in column \"%s\".",
! format_type_be(returned->attrs[j]->atttypid),
! format_type_be(expected->attrs[i]->atttypid),
! NameStr(expected->attrs[i]->attname))));
! conversion_map[i] = j+1;
! j++;
! }
! /* Cannot break this cycle - needs correct valid_expected_natts for err msg */
! }
! else
! {
! valid_expected_natts--;
! conversion_map[i] = 0;
! }
! }
!
! /* complete valid_returned_natts evaluation */
! while (j < returned->natts)
! {
! if (returned->attrs[j++]->attisdropped)
! valid_returned_natts--;
! }
!
! if (valid_expected_natts != valid_returned_natts)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("%s", _(msg)),
errdetail("Number of returned columns (%d) does not match "
"expected column count (%d).",
! valid_returned_natts, valid_expected_natts)));
! if (valid_expected_natts != expected->natts || valid_returned_natts != returned->natts)
! {
! *needs_conversion = true;
! result = (int *) palloc(expected->natts * sizeof(int));
! memcpy(result, conversion_map, expected->natts * sizeof(int));
! }
! else
! *needs_conversion = false;
!
! return result;
}
/* ----------
*** ./src/test/regress/expected/plpgsql.out.orig 2009-08-02 14:56:35.000000000 +0200
--- ./src/test/regress/expected/plpgsql.out 2009-08-03 21:39:40.000000000 +0200
***************
*** 3287,3292 ****
--- 3287,3368 ----
(4 rows)
drop function return_dquery();
+ -- fix return query and dropped columns bug
+ create table tabwithcols(a int, b int, c int, d int);
+ insert into tabwithcols values(10,20,30,40),(50,60,70,80);
+ create or replace function returnqueryf()
+ returns setof tabwithcols as $$
+ begin
+ return query select * from tabwithcols;
+ return query execute 'select * from tabwithcols';
+ end;
+ $$ language plpgsql;
+ select * from returnqueryf();
+ a | b | c | d
+ ----+----+----+----
+ 10 | 20 | 30 | 40
+ 50 | 60 | 70 | 80
+ 10 | 20 | 30 | 40
+ 50 | 60 | 70 | 80
+ (4 rows)
+
+ alter table tabwithcols drop column b;
+ alter table tabwithcols drop column c;
+ select * from returnqueryf();
+ a | d
+ ----+----
+ 10 | 40
+ 50 | 80
+ 10 | 40
+ 50 | 80
+ (4 rows)
+
+ alter table tabwithcols drop column d;
+ select * from returnqueryf();
+ a
+ ----
+ 10
+ 50
+ 10
+ 50
+ (4 rows)
+
+ alter table tabwithcols add column d int;
+ select * from returnqueryf();
+ a | d
+ ----+---
+ 10 |
+ 50 |
+ 10 |
+ 50 |
+ (4 rows)
+
+ -- better to be sure we don't break anything else
+ alter table tabwithcols add column b date;
+ create function trigger_function() returns trigger as $$
+ begin
+ new.b = to_date('2009-08-03','YYYY-MM-DD') + new.a;
+ return new;
+ end;
+ $$language plpgsql;
+ create trigger trg_ins_tabwithcols before insert on tabwithcols
+ for each row execute procedure trigger_function();
+ insert into tabwithcols(a, d) values(30,30),(60,70);
+ select * from returnqueryf();
+ a | d | b
+ ----+----+------------
+ 10 | |
+ 50 | |
+ 30 | 30 | 09-02-2009
+ 60 | 70 | 10-02-2009
+ 10 | |
+ 50 | |
+ 30 | 30 | 09-02-2009
+ 60 | 70 | 10-02-2009
+ (8 rows)
+
+ drop function returnqueryf();
+ drop table tabwithcols;
-- Tests for 8.4's new RAISE features
create or replace function raise_test() returns void as $$
begin
*** ./src/test/regress/sql/plpgsql.sql.orig 2009-08-02 15:54:00.402438910 +0200
--- ./src/test/regress/sql/plpgsql.sql 2009-08-03 21:38:25.753894296 +0200
***************
*** 2684,2689 ****
--- 2684,2735 ----
drop function return_dquery();
+ -- fix return query and dropped columns bug
+ create table tabwithcols(a int, b int, c int, d int);
+ insert into tabwithcols values(10,20,30,40),(50,60,70,80);
+
+ create or replace function returnqueryf()
+ returns setof tabwithcols as $$
+ begin
+ return query select * from tabwithcols;
+ return query execute 'select * from tabwithcols';
+ end;
+ $$ language plpgsql;
+
+ select * from returnqueryf();
+
+ alter table tabwithcols drop column b;
+ alter table tabwithcols drop column c;
+
+ select * from returnqueryf();
+
+ alter table tabwithcols drop column d;
+
+ select * from returnqueryf();
+
+ alter table tabwithcols add column d int;
+
+ select * from returnqueryf();
+
+ -- better to be sure we don't break anything else
+ alter table tabwithcols add column b date;
+
+ create function trigger_function() returns trigger as $$
+ begin
+ new.b = to_date('2009-08-03','YYYY-MM-DD') + new.a;
+ return new;
+ end;
+ $$language plpgsql;
+
+ create trigger trg_ins_tabwithcols before insert on tabwithcols
+ for each row execute procedure trigger_function();
+ insert into tabwithcols(a, d) values(30,30),(60,70);
+
+ select * from returnqueryf();
+
+ drop function returnqueryf();
+ drop table tabwithcols;
+
-- Tests for 8.4's new RAISE features
create or replace function raise_test() returns void as $$
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers