I realise that this has already been done, by Joe Conway I think. Indeed I was
looking at this just before beta1 when I happened to notice the post giving the
plpgsql function. However, as I had started work on it and I was interested in
seeing how things should be done I continued, only not in so much of a rush.
In the interests on finding out if I have approached this the right way, or the
way a more experienced backend programmer would, I'd appreciate any comments on
the attached .c file. In particular, I'm not sure what I'm doing with regard to
memory contexts, I think I may have one unnecessary switch in there, and in
general I seem to be doing a lot of work just to find out tidbits of
information.
I based this on, i.e. started by editing, Joe Conway's tablefunc.c but I think
there's very little of the original left in there.
I've also attached the .h, Makefile and .sql.in files to make this work if
anyone is interested in giving it a run. The .sql.in shows the usage. I did
this in a directory called pggrouping, for the sake of a better name, under the
contrib directory in my tree, so that's probably the best place to build it.
Thanks, and sorry for adding to people's email and work load.
--
Nigel J. Andrews
Director
---
Logictree Systems Limited
Computer Consultants
/*
* Derived from tablefunc.c, a sample to demonstrate C functions which
* return setof scalar and setof composite by Joe Conway <[EMAIL PROTECTED]>
*
* Copyright 2002 by PostgreSQL Global Development Group
*
* Permission to use, copy, modify, and distribute this software and its
* documentation for any purpose, without fee, and without a written agreement
* is hereby granted, provided that the above copyright notice and this
* paragraph and the following two paragraphs appear in all copies.
*
* IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
* DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
* LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
* DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
* AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
* ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
* PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
*
*/
#include <stdlib.h>
#include <math.h>
#include "postgres.h"
#include "fmgr.h"
#include "funcapi.h"
#include "executor/spi.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "pggrouping.h"
typedef struct unpack_array_fctx
{
SPITupleTable *spi_tuptable; /* sql results from user query */
TupleDesc tupdesc; /* TupleDesc for results */
int unpack_attrnum; /* attribute number to be unpacked */
int lastcall_cntr; /* previous call_cntr, invlaid = -1 */
int lastindex; /* index of the last array item sent, invalid < 1 */
} unpack_array_fctx;
#define GET_TEXT(cstrp) DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(cstrp)))
#define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp)))
static Datum expandArray_SRF(FunctionCallInfo info,
char *sql,
int unpackAttrNum);
static Datum expandArray_SRF_FirstCall(FunctionCallInfo fcinfo,
FuncCallContext *funcctx,
const char *sql,
const int unpackAttrNum);
static Datum expandArray_SRF_GetTuple(FunctionCallInfo fcinfo,
FuncCallContext *funcctx);
static TupleDesc makeUnpackedTupleDesc(TupleDesc src_tupdesc,
int unpack_attrnum);
static bool similarTupleDescs(TupleDesc ret_tupdesc,
TupleDesc sql_tupdesc);
/*
* pg_group_expandusers
*
* Return pg_group where each tuple has grolist attribute of int4[] type
* changed to be of type int4 and to hold only one user id.
*/
PG_FUNCTION_INFO_V1(pg_group_long);
Datum
pg_group_long(PG_FUNCTION_ARGS)
{
return expandArray_SRF(fcinfo,
"select groname,grosysid,grolist from pg_group",
3);
}
/*
* expand_array_srf
*
* Return tuples such that the elements of an array attribute are
* extracted in turn and placed into the output instead of the array.
* Declared to fmgr as:
* CREATE FUNCTION the_name(text,integer) RETURNS SETOF RECORD ...
*
* where the text argument is the query string to obtain the source
* data and the integer argument gives the column number of the array
* to expand.
*
* Note, despite checking number of arguments this is in no way safe
* from some one creating a fmgr function which uses wrong argument types.
*/
PG_FUNCTION_INFO_V1(expand_array_srf);
Datum
expand_array_srf(PG_FUNCTION_ARGS)
{
if (fcinfo->nargs != 2)
elog(ERROR, "Wrong number of arguments specified for function");
return expandArray_SRF(fcinfo,
GET_STR(PG_GETARG_TEXT_P(0)),
PG_GETARG_INT32(1));
}
/*
* expandArray_SRF
*
* This is not the user invoked function. It is supposed to
* reused by all such functions for doing the work.
* Objects if it's asked to unpack multidimensional arrays.
*
* Nice idea but the use of fcinfo in the return macros requiring
* that value to be passed around makes this split into nice
* manageable code portions more messy and worse, dependent on
* the calling convention of fmgr [and SRF] functions, not to
* mention the SPI_ disconnection stuck in the middle of nowhere;
* otherwise known as: at the end, completely out of touch with the
* rest of the use of SPI.
*/
static
Datum
expandArray_SRF(FunctionCallInfo fcinfo, char *sql, int unpackAttrNum)
{
FuncCallContext *funcctx;
Datum retdatum;
if(SRF_IS_FIRSTCALL())
{
/* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT();
retdatum = expandArray_SRF_FirstCall(fcinfo, funcctx, sql, unpackAttrNum);
/* cheat to detect no usable tuples */
if (((ReturnSetInfo *)(fcinfo->resultinfo))->isDone == ExprEndResult)
return retdatum;
}
/* stuff done on every call of the function */
funcctx = SRF_PERCALL_SETUP();
if (funcctx->call_cntr < funcctx->max_calls)
return expandArray_SRF_GetTuple(fcinfo, funcctx);
/* release SPI related resources
not too nice what with SPI only reference in FirstCall() */
SPI_finish();
SRF_RETURN_DONE(funcctx);
}
/*
* expandArray_SRF_FirstCall
*
* Do the 'on first call' stuff.
*/
static
Datum
expandArray_SRF_FirstCall(FunctionCallInfo fcinfo,
FuncCallContext *funcctx,
const char *sql,
const int unpackcolnum)
{
struct unpack_array_fctx *fctx;
MemoryContext oldcontext;
int ret;
int proc;
Oid funcid = fcinfo->flinfo->fn_oid;
Oid functypeid;
char functyptype;
TupleDesc tupdesc = NULL;
SPITupleTable *spi_tuptable = NULL;
/* switch to memory context appropriate for multiple function calls */
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
/* Do the query */
if ((ret = SPI_connect()) < 0)
elog(ERROR, "expandArray: SPI_connect returned %d", ret);
ret = SPI_exec((char *)sql, 0);
proc = SPI_processed;
if ((ret != SPI_OK_SELECT) || (proc == 0))
{
/* no tuples */
SPI_finish();
SRF_RETURN_DONE(funcctx);
}
spi_tuptable = SPI_tuptable;
/* SPI switches context on us, so reset it */
MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
/* Determine and verify return types */
/* desired return type */
tupdesc = makeUnpackedTupleDesc(spi_tuptable->tupdesc, unpackcolnum);
/* function return type */
functypeid = get_func_rettype(funcid);
functyptype = get_typtype(functypeid);
if (functyptype == 'c')
{
/* Return type is fully known (predetermined) so check for
compatibility to query */
TupleDesc fret_tupdesc;
fret_tupdesc = TypeGetTupleDesc(functypeid, NIL);
if (!similarTupleDescs(fret_tupdesc, tupdesc))
elog(ERROR, "Query and return tuple descriptions are incompatible");
pfree(fret_tupdesc);
}
else if (functyptype == 'p' && functypeid == RECORDOID)
{
/* We have the freedom to specify what we are returning */
tupdesc = tupdesc;
}
else if (functyptype == 'b')
elog(ERROR, "Invalid kind of return type specified for function");
else
elog(ERROR, "Unknown kind of return type specified for function");
/* allocate a slot for a tuple with this tupdesc */
funcctx->slot = TupleDescGetSlot(tupdesc);
/* initialise our persistent storage */
fctx = (struct unpack_array_fctx *) palloc(sizeof(struct unpack_array_fctx));
fctx->spi_tuptable = spi_tuptable;
fctx->lastcall_cntr = -1;
fctx->lastindex = 0;
fctx->unpack_attrnum = unpackcolnum;
fctx->tupdesc = tupdesc;
funcctx->user_fctx = fctx;
/* total 'notional' number of tuples to be returned */
funcctx->max_calls = proc;
MemoryContextSwitchTo(oldcontext);
/* dummy return */
return Int32GetDatum(0);
}
/*
* expandArray_SRF_GetTuple
*
* Get a tuple.
*/
static
Datum
expandArray_SRF_GetTuple(FunctionCallInfo fcinfo, FuncCallContext *funcctx)
{
int call_cntr = funcctx->call_cntr;
TupleTableSlot *slot = funcctx->slot;
struct unpack_array_fctx *fctx = (struct unpack_array_fctx *) funcctx->user_fctx;
int nextarrayind = fctx->lastindex + 1;
int unpackcolnum = fctx->unpack_attrnum;
HeapTuple spi_tuple = fctx->spi_tuptable->vals[call_cntr];
TupleDesc spi_tupdesc = fctx->spi_tuptable->tupdesc;
HeapTuple tuple;
TupleDesc tupdesc = fctx->tupdesc;
MemoryContext oldcontext;
Datum *values;
char *nulls;
int i;
int cattrindex = unpackcolnum - 1;
Datum result;
/* paranoia test */
#ifdef ALWAYS_PARANOID
if (spi_tuple->t_data->t_natts != tupdesc->natts
|| spi_tupdesc->natts != tupdesc->natts)
elog(ERROR, "Mismatch in tuple and tupledesc attribute counts");
#else
Assert(spi_tuple->t_data->t_natts == tupdesc->natts);
Assert(spi_tupdesc->natts == tupdesc->natts);
#endif
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
values = (Datum *)palloc(sizeof(Datum) * tupdesc->natts);
nulls = (char *)palloc(sizeof(char) * tupdesc->natts);
/* heap_deformtuple() replacement */
for (i = 0; i < tupdesc->natts; i++)
{
bool isNull;
*(values + i) = heap_getattr(spi_tuple,
i + 1,
spi_tupdesc,
&isNull);
*(nulls + i) = (isNull) ? 'n' : ' ';
}
if (*(nulls + cattrindex) != 'n')
{
/* Replace the offending array with an element from it or null it */
ArrayType *earray = DatumGetArrayTypeP(*(values + cattrindex));
Datum eelement = PointerGetDatum(NULL);
bool eelementisnull = true;
int earrayind = fctx->lastcall_cntr == call_cntr
? nextarrayind
: ARR_LBOUND(earray)[0];
/* Quick check for 1-dimensionality */
if (ARR_NDIM(earray) != 1)
elog(ERROR, "Array is multidimensional, only 1-d arrays are supported");
/* empty the entry */
*(values + cattrindex) = eelement;
*(nulls + cattrindex) = 'n';
if (earrayind <= ARR_DIMS(earray)[0])
{
Form_pg_attribute elemattr = tupdesc->attrs[cattrindex];
/* Get the element */
eelement = array_ref(earray,
1,
&earrayind,
0, /* > 0 means fixed length array */
elemattr->attlen,
elemattr->attbyval,
elemattr->attalign,
&eelementisnull);
/*
eelement = fetch_att(((Datum)(ARR_DATA_PTR(earray)) + earrayind),
tupdesc->attrs[cattrindex]->attbyval,
tupdesc->attrs[cattrindex]->attlen);
*/
}
if (earrayind >= ARR_DIMS(earray)[0])
/* Indicate end of array reached */
nextarrayind = 0;
if (!eelementisnull /*PointerIsValid(DatumGetPointer(eelement))*/)
{
*(values + cattrindex) = eelement;
*(nulls + cattrindex) = ' ';
}
}
else
/* Indicate end of array since no array present */
nextarrayind = 0;
MemoryContextSwitchTo(oldcontext);
/* Copies datums into a new tuple */
tuple = heap_formtuple(tupdesc, values, nulls);
/*
* Make the tuple into a datum
* - comment on this macro implies tuple should be in our context
* and possibly freed after here
*/
result = TupleGetDatum(slot, tuple);
MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
pfree(values);
pfree(nulls);
/* Set array index and previous call counter for next call */
fctx->lastindex = nextarrayind;
fctx->lastcall_cntr = call_cntr;
if (nextarrayind > 0)
/* -- call counter since API will ++ but we haven't finished */
call_cntr = --funcctx->call_cntr;
MemoryContextSwitchTo(oldcontext);
SRF_RETURN_NEXT(funcctx, result);
}
/*
* Check if two tupdescs match in type of attributes
* The standard equalTupleDescs() compares names, something
* which may be a problem, so we use a much loser check
* of sameness.
*/
static bool
similarTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc)
{
int i;
if (ret_tupdesc->natts != sql_tupdesc->natts)
return false;
for (i = 0; i < ret_tupdesc->natts; i++)
if (ret_tupdesc->attrs[i]->atttypid != sql_tupdesc->attrs[i]->atttypid)
return false;
return true;
}
/*
* make_unpacked_tupledesc
* Copy the given tupledesc changing the given column type from
* array to the type contained in the array
*/
static TupleDesc
makeUnpackedTupleDesc(TupleDesc src_tupdesc, int unpack_attrnum)
{
HeapTuple typtuple;
Form_pg_attribute attr;
Form_pg_type type;
Oid atttypid;
TupleDesc tupdesc;
int natts;
/* char attname[NAMEDATALEN+1]; */
natts = src_tupdesc->natts;
tupdesc = CreateTupleDescCopy(src_tupdesc);
/* safety check */
if (unpack_attrnum < 1 || unpack_attrnum > natts)
elog(ERROR, "No such attribute number %d", unpack_attrnum);
attr = tupdesc->attrs[unpack_attrnum-1];
#if 0
/*
* This doesn't work because attndims = 0 in the test case
* of pg_group.grolist
* Since the easiest way to find the dimensions seems to be
* to query an array instance this task is left for the
* tuple fetch section.
*/
if (attr->attndims != 1)
elog(ERROR,
"Attribute number %d not an array or multidimensional",
unpack_attrnum);
#endif
atttypid = attr->atttypid;
/* Find information about type of array element */
/* First about the array */
typtuple = SearchSysCache(TYPEOID,
ObjectIdGetDatum(atttypid),
0, 0, 0);
if (!HeapTupleIsValid(typtuple))
elog(ERROR, "Unable to look up type id %u", atttypid);
type = (Form_pg_type) GETSTRUCT(typtuple);
/*
* I'm confused we can check this is an array type above
* using attr->attndims (like what we are doing) can't we?
* - test case says 'obviously not'.
*/
if (type->typelem == 0 || type->typlen != -1)
elog(ERROR, "Column %d is not an array type", unpack_attrnum);
atttypid = type->typelem;
ReleaseSysCache(typtuple);
/* Now about the element */
typtuple = SearchSysCache(TYPEOID,
ObjectIdGetDatum(atttypid),
0, 0, 0);
if (!HeapTupleIsValid(typtuple))
elog(ERROR, "Unable to look up type id %u", atttypid);
/* type = (Form_pg_type) GETSTRUCT(typtuple); */
atttypid = HeapTupleGetOid(typtuple); /* Why this? */
/* Override the array attribute in tupledesc */
tupdesc->attrs[unpack_attrnum-1] = NULL;
TupleDescInitEntry(
tupdesc,
(AttrNumber)unpack_attrnum,
NameStr(attr->attname),
atttypid,
-1, /* atttypmod */
0, /* attndims - only 1-d arrays supported at moment */
false /* attisset */
);
ReleaseSysCache(typtuple);
/* TupleDescInitEntry() allocates new storage for this */
pfree(attr);
return tupdesc;
}
/*
* Copyright 2002 by PostgreSQL Global Development Group
*
* Permission to use, copy, modify, and distribute this software and its
* documentation for any purpose, without fee, and without a written agreement
* is hereby granted, provided that the above copyright notice and this
* paragraph and the following two paragraphs appear in all copies.
*
* IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
* DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
* LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
* DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
* AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
* ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
* PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
*
*/
#ifndef PGGROUPING_H
#define PGGROUPING_H
/*
* External declarations
*/
extern Datum pg_group_long(PG_FUNCTION_ARGS);
extern Datum expand_array_srf(PG_FUNCTION_ARGS);
#endif /* PGGROUPING_H */
subdir = contrib/pggrouping
top_builddir = ../..
include $(top_builddir)/src/Makefile.global
MODULES = pggrouping
DATA_built = pggrouping.sql
DOCS = README.pggrouping
include $(top_srcdir)/contrib/contrib-global.mk
CREATE OR REPLACE VIEW pg_group_long AS
SELECT ''::name AS groname
, 1::integer AS grosysid
, 1::integer AS usesysid;
CREATE OR REPLACE FUNCTION pg_group_long()
RETURNS setof pg_group_long
AS 'MODULE_PATHNAME','pg_group_long' LANGUAGE 'c' VOLATILE;
CREATE OR REPLACE FUNCTION expand_array(text,int)
RETURNS setof record
AS 'MODULE_PATHNAME','expand_array_srf' LANGUAGE 'c' VOLATILE;
--
-- examples of use on pg_group
--
select * from pg_group;
select * from pg_group_long();
select * from expand_array('select groname, grosysid, grolist from pg_group', 3) AS
grps(grp name, grpid int, useid int);
---------------------------(end of broadcast)---------------------------
TIP 6: Have you searched our list archives?
http://archives.postgresql.org