Hi I wrote a proof concept for the support window function from plpgsql.
Window function API - functions named WinFuncArg* are polymorphic and it is not easy to wrap these functions for usage from SQL level. I wrote an enhancement of the GET statement - for this case GET WINDOW_CONTEXT, that allows safe and fast access to the result of these functions. Custom variant of row_number can look like: create or replace function pl_row_number() returns bigint as $$ declare pos int8; begin pos := get_current_position(windowobject); pos := pos + 1; perform set_mark_position(windowobject, pos); return pos; end $$ language plpgsql window; Custom variant of lag function can look like: create or replace function pl_lag(numeric) returns numeric as $$ declare v numeric; begin perform get_input_value_in_partition(windowobject, 1, -1, 'seek_current', false); get pg_window_context v = PG_INPUT_VALUE; return v; end; $$ language plpgsql window; Custom window functions can be used for generating missing data in time series create table test_missing_values(id int, v integer); insert into test_missing_values values(1,10),(2,11),(3,12),(4,null),(5,null),(6,15),(7,16); create or replace function pl_pcontext_test(numeric) returns numeric as $$ declare n numeric; v numeric; begin perform get_input_value_for_row(windowobject, 1); get pg_window_context v = PG_INPUT_VALUE; if v is null then v := get_partition_context_value(windowobject, null::numeric); else perform set_partition_context_value(windowobject, v); end if; return v; end $$ language plpgsql window; select id, v, pl_pcontext_test(v) over (order by id) from test_missing_values; id | v | pl_pcontext_test. ----+----+------------------ 1 | 10 | 10 2 | 11 | 11 3 | 12 | 12 4 | | 12 5 | | 12 6 | 15 | 15 7 | 16 | 16 (7 rows) I think about another variant for WinFuncArg functions where polymorphic argument is used similarly like in get_partition_context_value - this patch is prototype, but it works and I think so support of custom window functions in PL languages is possible and probably useful. Comments, notes, ideas, objections? Regards Pavel
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index ba5a23ac25..fdc364c05e 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1418,6 +1418,18 @@ LANGUAGE internal STRICT IMMUTABLE PARALLEL SAFE AS 'unicode_is_normalized'; +CREATE OR REPLACE FUNCTION + get_partition_context_value(windowobjectproxy, anyelement, int4 DEFAULT NULL) + RETURNS anyelement +LANGUAGE internal +AS 'windowobject_get_partition_context_value'; + +CREATE OR REPLACE FUNCTION + set_partition_context_value(windowobjectproxy, anyelement, int4 DEFAULT NULL) + RETURNS void +LANGUAGE internal +AS 'windowobject_set_partition_context_value'; + -- -- The default permissions for functions mean that anyone can execute them. -- A number of functions shouldn't be executable by just anyone, but rather diff --git a/src/backend/utils/adt/pseudotypes.c b/src/backend/utils/adt/pseudotypes.c index 3d6b2f9093..ebb2a16572 100644 --- a/src/backend/utils/adt/pseudotypes.c +++ b/src/backend/utils/adt/pseudotypes.c @@ -334,6 +334,17 @@ pg_node_tree_send(PG_FUNCTION_ARGS) PSEUDOTYPE_DUMMY_IO_FUNCS(pg_ddl_command); PSEUDOTYPE_DUMMY_BINARY_IO_FUNCS(pg_ddl_command); +/* + * windowobjectproxy + * + * This type is pointer to WindowObjectProxyData. It is communication + * mechanism between PL environment and WinFuncArgs functions. Due + * performance reason I prefer using indirect result processing against + * using function returning polymorphic composite value. The indirect + * mechanism is implemented with proxy object represented by type + * WindowObjectProxyData. + */ +PSEUDOTYPE_DUMMY_IO_FUNCS(windowobjectproxy); /* * Dummy I/O functions for various other pseudotypes. diff --git a/src/backend/utils/adt/windowfuncs.c b/src/backend/utils/adt/windowfuncs.c index f0c8ae686d..f18495b228 100644 --- a/src/backend/utils/adt/windowfuncs.c +++ b/src/backend/utils/adt/windowfuncs.c @@ -14,6 +14,8 @@ #include "postgres.h" #include "utils/builtins.h" +#include "utils/datum.h" +#include "utils/lsyscache.h" #include "windowapi.h" /* @@ -35,6 +37,20 @@ typedef struct int64 remainder; /* (total rows) % (bucket num) */ } ntile_context; +#define PROXY_CONTEXT_MAGIC 19730715 + +typedef struct +{ + int magic; + Oid typid; + int16 typlen; + bool typbyval; + int allocsize; + bool isnull; + Datum value; + char data[FLEXIBLE_ARRAY_MEMBER]; +} proxy_context; + static bool rank_up(WindowObject winobj); static Datum leadlag_common(FunctionCallInfo fcinfo, bool forward, bool withoffset, bool withdefault); @@ -472,3 +488,485 @@ window_nth_value(PG_FUNCTION_ARGS) PG_RETURN_DATUM(result); } + +/* + * High level access function. These functions are wrappers for windows API + * for PL languages based on usage WindowObjectProxy. + */ +Datum +windowobject_get_current_position(PG_FUNCTION_ARGS) +{ + WindowObjectProxy wop; + WindowObject winobj; + int64 pos; + + wop = (WindowObjectProxy) DatumGetPointer(PG_GETARG_DATUM(0)); + + wop->isvalid = false; + + winobj = wop->winobj; + Assert(WindowObjectIsValid(winobj)); + + pos = WinGetCurrentPosition(winobj); + + PG_RETURN_INT64(pos); +} + +Datum +windowobject_set_mark_position(PG_FUNCTION_ARGS) +{ + WindowObjectProxy wop; + WindowObject winobj; + int64 pos; + + wop = (WindowObjectProxy) DatumGetPointer(PG_GETARG_DATUM(0)); + + wop->isvalid = false; + + winobj = wop->winobj; + Assert(WindowObjectIsValid(winobj)); + + pos = PG_GETARG_INT64(1); + + WinSetMarkPosition(winobj, pos); + + PG_RETURN_VOID(); +} + +Datum +windowobject_get_partition_rowcount(PG_FUNCTION_ARGS) +{ + WindowObjectProxy wop; + WindowObject winobj; + int64 rc; + + wop = (WindowObjectProxy) DatumGetPointer(PG_GETARG_DATUM(0)); + + wop->isvalid = false; + + winobj = wop->winobj; + Assert(WindowObjectIsValid(winobj)); + + rc = WinGetPartitionRowCount(winobj); + + PG_RETURN_INT64(rc); +} + +Datum +windowobject_rows_are_peers(PG_FUNCTION_ARGS) +{ + WindowObjectProxy wop; + WindowObject winobj; + int64 pos1, + pos2; + + wop = (WindowObjectProxy) DatumGetPointer(PG_GETARG_DATUM(0)); + + wop->isvalid = false; + + winobj = wop->winobj; + Assert(WindowObjectIsValid(winobj)); + + pos1 = PG_GETARG_INT64(1); + pos2 = PG_GETARG_INT64(2); + + PG_RETURN_BOOL(WinRowsArePeers(winobj, pos1, pos2)); +} + +#define SEEK_CURRENT_STR "seek_current" +#define SEEK_HEAD_STR "seek_head" +#define SEEK_TAIL_STR "seek_tail" + +#define STRLEN(s) (sizeof(s) - 1) + +static int +get_seek_type(text *seektype) +{ + char *str; + int len; + int result; + + str = VARDATA_ANY(seektype); + len = VARSIZE_ANY_EXHDR(seektype); + + if (len == STRLEN(SEEK_CURRENT_STR) && strncmp(str, SEEK_CURRENT_STR, len) == 0) + result = WINDOW_SEEK_CURRENT; + else if (len == STRLEN(SEEK_HEAD_STR) && strncmp(str, SEEK_HEAD_STR, len) == 0) + result = WINDOW_SEEK_HEAD; + else if (len == STRLEN(SEEK_TAIL_STR) && strncmp(str, SEEK_TAIL_STR, len) == 0) + result = WINDOW_SEEK_TAIL; + else + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("seek type value must be \"seek_current\", \"seek_head\" or \"seek_tail\""))); + + return result; +} + +/* + * Value should be copied to persistent memory context. + * GET PG_WINDOW_CONTEXT is second statement, and we should + * not to lost data. + */ +static void +copy_datum_to_winobj_proxy(WindowObjectProxy wop, + int argno, + Datum value, + bool isnull) +{ + MemoryContext oldcxt; + + if (argno != wop->last_argno) + { + Oid argtypid = get_fn_expr_argtype(wop->fcinfo->flinfo, argno); + + argtypid = getBaseType(argtypid); + get_typlenbyval(argtypid, &wop->typlen, &wop->typbyval); + wop->last_argno = argno; + } + + if (wop->freeval) + { + pfree(DatumGetPointer(wop->value)); + wop->freeval = false; + } + + if (!isnull) + { + oldcxt = MemoryContextSwitchTo(wop->proxy_cxt); + + wop->value = datumCopy(value, wop->typbyval, wop->typlen); + wop->freeval = !wop->typbyval; + wop->isnull = false; + + MemoryContextSwitchTo(oldcxt); + } + else + wop->isnull = true; + + wop->argno = argno; + wop->isvalid = true; +} + +Datum +windowobject_get_func_arg_partition(PG_FUNCTION_ARGS) +{ + WindowObjectProxy wop; + WindowObject winobj; + int argno; + int relpos; + int seektype; + bool set_mark; + Datum value; + bool isnull; + + wop = (WindowObjectProxy) DatumGetPointer(PG_GETARG_DATUM(0)); + + wop->isvalid = false; + wop->argno = -1; + + winobj = wop->winobj; + Assert(WindowObjectIsValid(winobj)); + + argno = PG_GETARG_INT32(1); + if (argno < 1) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("arg number less than one"))); + + argno -= 1; + + relpos = PG_GETARG_INT32(2); + seektype = get_seek_type(PG_GETARG_TEXT_P(3)); + set_mark = PG_GETARG_BOOL(4); + + value = WinGetFuncArgInPartition(winobj, + argno, + relpos, + seektype, + set_mark, + &isnull, + &wop->isout); + + copy_datum_to_winobj_proxy(wop, argno, value, isnull); + + PG_RETURN_VOID(); +} + +Datum +windowobject_get_func_arg_frame(PG_FUNCTION_ARGS) +{ + WindowObjectProxy wop; + WindowObject winobj; + int argno; + int relpos; + int seektype; + bool set_mark; + Datum value; + bool isnull; + + wop = (WindowObjectProxy) DatumGetPointer(PG_GETARG_DATUM(0)); + + wop->isvalid = false; + wop->argno = -1; + + winobj = wop->winobj; + Assert(WindowObjectIsValid(winobj)); + + argno = PG_GETARG_INT32(1); + relpos = PG_GETARG_INT32(2); + seektype = get_seek_type(PG_GETARG_TEXT_P(3)); + set_mark = PG_GETARG_BOOL(4); + + if (argno < 1) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("arg number less than one"))); + + argno -= 1; + + value = WinGetFuncArgInFrame(winobj, + argno, + relpos, + seektype, + set_mark, + &isnull, + &wop->isout); + + copy_datum_to_winobj_proxy(wop, argno, value, isnull); + + PG_RETURN_VOID(); +} + +Datum +windowobject_get_func_arg_current(PG_FUNCTION_ARGS) +{ + WindowObjectProxy wop; + WindowObject winobj; + int argno; + Datum value; + bool isnull; + + wop = (WindowObjectProxy) DatumGetPointer(PG_GETARG_DATUM(0)); + + wop->isvalid = false; + wop->argno = -1; + + winobj = wop->winobj; + + Assert(WindowObjectIsValid(winobj)); + + argno = PG_GETARG_INT32(1); + + if (argno < 1) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("arg number less than one"))); + + argno -= 1; + + value = WinGetFuncArgCurrent(winobj, argno, &isnull); + copy_datum_to_winobj_proxy(wop, argno, value, isnull); + + wop->isout = false; + + PG_RETURN_VOID(); +} + +static void +copy_datum_to_partition_context(proxy_context *pcontext, + Datum value, + bool isnull) +{ + if (!isnull) + { + if (pcontext->typbyval) + pcontext->value = value; + else if (pcontext->typlen == -1) + { + struct varlena *s = (struct varlena *) DatumGetPointer(value); + + memcpy(pcontext->data, s, VARSIZE_ANY(s)); + pcontext->value = PointerGetDatum(pcontext->data); + } + else + { + memcpy(pcontext->data, DatumGetPointer(value), pcontext->typlen); + pcontext->value = PointerGetDatum(pcontext->data); + } + + pcontext->isnull = false; + } + else + { + pcontext->value = (Datum) 0; + pcontext->isnull = true; + } +} + +/* + * Returns estimated size of windowobject partition context + */ +static int +estimate_partition_context_size(Datum value, + bool isnull, + int16 typlen, + int16 minsize, + int *realsize) +{ + if(typlen != -1) + { + if (typlen < sizeof(Datum)) + { + *realsize = offsetof(proxy_context, data); + + return *realsize; + } + + if (typlen > 1024) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("size of value is greather than limit (1024 bytes)"))); + + *realsize = offsetof(proxy_context, data) + typlen; + + return *realsize; + } + else + { + if (!isnull) + { + int size = VARSIZE_ANY_EXHDR(DatumGetPointer(value)); + + if (size > 1024) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("size of value is greather than limit (1024 bytes)"))); + + *realsize = size; + + size += size / 3; + + return offsetof(proxy_context, data) + + MAXALIGN(size > minsize ? size : minsize); + } + else + { + /* by default we allocate 30 bytes */ + *realsize = 0; + + return offsetof(proxy_context, data) + MAXALIGN(minsize); + } + } +} + +#define VARLENA_MINSIZE 32 + +static proxy_context * +get_partition_context(FunctionCallInfo fcinfo, bool write_mode) +{ + WindowObjectProxy wop; + WindowObject winobj; + Oid typid; + int16 typlen; + bool typbyval; + Datum value = (Datum) 0; + bool isnull = true; + int allocsize; + int minsize; + int realsize; + proxy_context *pcontext; + + if (PG_ARGISNULL(0)) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("windowobject is NULL"))); + + wop = (WindowObjectProxy) DatumGetPointer(PG_GETARG_DATUM(0)); + winobj = wop->winobj; + Assert(WindowObjectIsValid(winobj)); + + if (PG_ARGISNULL(2)) + minsize = VARLENA_MINSIZE; + else + minsize = PG_GETARG_INT32(2); + + if (!PG_ARGISNULL(1)) + { + value = PG_GETARG_DATUM(1); + isnull = false; + } + + typid = get_fn_expr_argtype(fcinfo->flinfo, 1); + if (!OidIsValid(typid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot detect type of context value"))); + + typid = getBaseType(typid); + get_typlenbyval(typid, &typlen, &typbyval); + + Assert(typlen != -2); + + allocsize = estimate_partition_context_size(value, + isnull, + typlen, + minsize, + &realsize); + + pcontext = (proxy_context *) WinGetPartitionLocalMemory(winobj, allocsize); + + /* fresh pcontext has zeroed memory */ + Assert(pcontext->magic == 0 || pcontext->magic == PROXY_CONTEXT_MAGIC); + + if (pcontext->allocsize > 0) + { + if (realsize > pcontext->allocsize) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("the value cannot be saved to allocated buffer"), + errhint("Try to increase the minsize argument."))); + + if (pcontext->typid != typid) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("partition context was initialized for different type"))); + + if (write_mode) + copy_datum_to_partition_context(pcontext, value, isnull); + + } + else + { + pcontext->magic = PROXY_CONTEXT_MAGIC; + pcontext->typid = typid; + pcontext->typlen = typlen; + pcontext->typbyval = typbyval; + pcontext->allocsize = allocsize; + + copy_datum_to_partition_context(pcontext, value, isnull); + } + + return pcontext; +} + +Datum +windowobject_set_partition_context_value(PG_FUNCTION_ARGS) +{ + (void) get_partition_context(fcinfo, true); + + PG_RETURN_VOID(); +} + +Datum +windowobject_get_partition_context_value(PG_FUNCTION_ARGS) +{ + proxy_context *pcontext; + + pcontext = get_partition_context(fcinfo, false); + + if (pcontext->isnull) + PG_RETURN_NULL(); + + PG_RETURN_DATUM(pcontext->value); +} diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 27989971db..0393bd66c4 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -7130,6 +7130,12 @@ { oid => '2305', descr => 'I/O', proname => 'internal_out', prorettype => 'cstring', proargtypes => 'internal', prosrc => 'internal_out' }, +{ oid => '9553', descr => 'I/O', + proname => 'windowobjectproxy_in', proisstrict => 'f', prorettype => 'windowobjectproxy', + proargtypes => 'cstring', prosrc => 'windowobjectproxy_in' }, +{ oid => '9554', descr => 'I/O', + proname => 'windowobjectproxy_out', prorettype => 'cstring', proargtypes => 'windowobjectproxy', + prosrc => 'windowobjectproxy_out' }, { oid => '2312', descr => 'I/O', proname => 'anyelement_in', prorettype => 'anyelement', proargtypes => 'cstring', prosrc => 'anyelement_in' }, @@ -9742,7 +9748,36 @@ { oid => '3114', descr => 'fetch the Nth row value', proname => 'nth_value', prokind => 'w', prorettype => 'anyelement', proargtypes => 'anyelement int4', prosrc => 'window_nth_value' }, - +{ oid => '9555', descr => 'get current position from window object', + proname => 'get_current_position', prokind => 'f', prorettype => 'int8', + proargtypes => 'windowobjectproxy', prosrc => 'windowobject_get_current_position' }, +{ oid => '9556', descr => 'set current position in window object', + proname => 'set_mark_position', prokind => 'f', prorettype => 'void', + proargtypes => 'windowobjectproxy int8', prosrc => 'windowobject_set_mark_position' }, +{ oid => '9560', descr => 'get partition row count', + proname => 'get_partition_rowcount', prokind => 'f', prorettype => 'int8', + proargtypes => 'windowobjectproxy', prosrc => 'windowobject_get_partition_rowcount' }, +{ oid => '9561', descr => 'returns true if two positions are peers', + proname => 'rows_are_peers', prokind => 'f', prorettype => 'bool', + proargtypes => 'windowobjectproxy int8 int8', prosrc => 'windowobject_rows_are_peers' }, +{ oid => '9562', descr => 'returns argument of window function against to partition', + proname => 'get_input_value_in_partition', prokind => 'f', prorettype => 'void', + proargtypes => 'windowobjectproxy int4 int4 text bool', prosrc => 'windowobject_get_func_arg_partition' }, +{ oid => '9563', descr => 'returns argument of window function against to frame', + proname => 'get_input_value_in_frame', prokind => 'f', prorettype => 'void', + proargtypes => 'windowobjectproxy int4 int4 text bool', prosrc => 'windowobject_get_func_arg_frame' }, +{ oid => '9564', descr => 'returns argument of window function against to current row', + proname => 'get_input_value_for_row', prokind => 'f', prorettype => 'void', + proargtypes => 'windowobjectproxy int4', prosrc => 'windowobject_get_func_arg_current' }, +{ oid => '9565', descr => 'returns a value stored in a partition context', + proname => 'get_partition_context_value', prokind => 'f', prorettype => 'anyelement', + proargtypes => 'windowobjectproxy anyelement int4', + prosrc => 'windowobject_get_partition_context_value', proisstrict => 'f' }, +{ oid => '9566', descr => 'store a value to partition context', + proname => 'set_partition_context_value', prokind => 'f', prorettype => 'void', + proargtypes => 'windowobjectproxy anyelement int4', + prosrc => 'windowobject_set_partition_context_value', + proisstrict => 'f' }, # functions for range types { oid => '3832', descr => 'I/O', proname => 'anyrange_in', provolatile => 's', prorettype => 'anyrange', diff --git a/src/include/catalog/pg_type.dat b/src/include/catalog/pg_type.dat index b2cec07416..78b166ecb1 100644 --- a/src/include/catalog/pg_type.dat +++ b/src/include/catalog/pg_type.dat @@ -560,6 +560,12 @@ typtype => 'p', typcategory => 'P', typinput => 'internal_in', typoutput => 'internal_out', typreceive => '-', typsend => '-', typalign => 'ALIGNOF_POINTER' }, +{ oid => '9552', + descr => 'pseudo-type representing an pointer to WindowObjectProxy structure', + typname => 'windowobjectproxy', typlen => '-1', typbyval => 't', + typtype => 'p', typcategory => 'P', typinput => 'windowobjectproxy_in', + typoutput => 'windowobjectproxy_out', typreceive => '-', typsend => '-', + typalign => 'd' }, { oid => '2283', descr => 'pseudo-type representing a polymorphic base type', typname => 'anyelement', typlen => '4', typbyval => 't', typtype => 'p', typcategory => 'P', typinput => 'anyelement_in', diff --git a/src/include/windowapi.h b/src/include/windowapi.h index e8c9fc54d8..0ec6b82412 100644 --- a/src/include/windowapi.h +++ b/src/include/windowapi.h @@ -36,6 +36,33 @@ /* this struct is private in nodeWindowAgg.c */ typedef struct WindowObjectData *WindowObject; +/* + * This type is used as proxy between PL variants of WinFuncArg + * functions and PL environment. + */ +typedef struct WindowObjectProxyData +{ + int32 vl_len; /* varlena header */ + WindowObject winobj; + + bool isout; + int argno; + bool isvalid; + + Datum value; + bool isnull; + bool freeval; + + int last_argno; + int16 typlen; + bool typbyval; + + MemoryContext proxy_cxt; + FunctionCallInfo fcinfo; +} WindowObjectProxyData; + +typedef WindowObjectProxyData *WindowObjectProxy; + #define PG_WINDOW_OBJECT() ((WindowObject) fcinfo->context) #define WindowObjectIsValid(winobj) \ diff --git a/src/pl/plpgsql/src/Makefile b/src/pl/plpgsql/src/Makefile index 193df8a010..ba6edfd42e 100644 --- a/src/pl/plpgsql/src/Makefile +++ b/src/pl/plpgsql/src/Makefile @@ -34,7 +34,7 @@ REGRESS_OPTS = --dbname=$(PL_TESTDB) REGRESS = plpgsql_call plpgsql_control plpgsql_copy plpgsql_domain \ plpgsql_record plpgsql_cache plpgsql_simple plpgsql_transaction \ - plpgsql_trap plpgsql_trigger plpgsql_varprops + plpgsql_trap plpgsql_trigger plpgsql_varprops plpgsql_window # where to find gen_keywordlist.pl and subsidiary files TOOLSDIR = $(top_srcdir)/src/tools diff --git a/src/pl/plpgsql/src/expected/plpgsql_window.out b/src/pl/plpgsql/src/expected/plpgsql_window.out new file mode 100644 index 0000000000..9ee8189eb5 --- /dev/null +++ b/src/pl/plpgsql/src/expected/plpgsql_window.out @@ -0,0 +1,237 @@ +create or replace function pl_row_number() +returns bigint as $$ +declare pos int8; +begin + pos := get_current_position(windowobject); + pos := pos + 1; + perform set_mark_position(windowobject, pos); + return pos; +end +$$ +language plpgsql window; +select pl_row_number() over (), v from (values(10),(20),(30)) v(v); + pl_row_number | v +---------------+---- + 1 | 10 + 2 | 20 + 3 | 30 +(3 rows) + +create or replace function pl_round_value(numeric) +returns int as $$ +declare + num numeric; +begin + perform get_input_value_for_row(windowobject, 1); + get pg_window_context num = PG_INPUT_VALUE; + return round(num); +end +$$ language plpgsql window; +select pl_round_value(v) over(order by v desc) from generate_series(0.1, 1.0, 0.1) g(v); + pl_round_value +---------------- + 1 + 1 + 1 + 1 + 1 + 1 + 0 + 0 + 0 + 0 +(10 rows) + +select pl_round_value(v + 1) over(order by v desc) from generate_series(0.1, 1.0, 0.1) g(v); + pl_round_value +---------------- + 2 + 2 + 2 + 2 + 2 + 2 + 1 + 1 + 1 + 1 +(10 rows) + +create table test_table(v numeric); +insert into test_table values(1),(3),(6),(6),(8),(7),(6),(5),(4); +create or replace function pl_lag(numeric) +returns numeric as $$ +declare + v numeric; +begin + perform get_input_value_in_partition(windowobject, 1, -1, 'seek_current', false); + get pg_window_context v = PG_INPUT_VALUE; + return v; +end; +$$ language plpgsql window; +select pl_lag(v) over (), lag(v) over () from test_table; + pl_lag | lag +--------+----- + | + 1 | 1 + 3 | 3 + 6 | 6 + 6 | 6 + 8 | 8 + 7 | 7 + 6 | 6 + 5 | 5 +(9 rows) + +drop table test_table; +create table test_table(v integer); +insert into test_table values(1),(3),(6),(6),(8),(7),(6),(5),(4); +select pl_lag(v) over (), lag(v) over () from test_table; + pl_lag | lag +--------+----- + | + 1 | 1 + 3 | 3 + 6 | 6 + 6 | 6 + 8 | 8 + 7 | 7 + 6 | 6 + 5 | 5 +(9 rows) + +create or replace function pl_moving_avg(numeric) +returns numeric as $$ +declare + s numeric default 0.0; + v numeric; + c numeric default 0.0; +begin + -- look before + perform get_input_value_in_partition(windowobject, 1, -1, 'seek_current', false); + get pg_window_context v = PG_INPUT_VALUE; + if v is not null then + s := s + v; + c := c + 1.0; + end if; + + -- look after + perform get_input_value_in_partition(windowobject, 1, 0, 'seek_current', false); + get pg_window_context v = PG_INPUT_VALUE; + if v is not null then + s := s + v; + c := c + 1.0; + end if; + + perform get_input_value_in_partition(windowobject, 1, 1, 'seek_current', false); + get pg_window_context v = PG_INPUT_VALUE; + if v is not null then + s := s + v; + c := c + 1.0; + end if; + + return trim_scale(s / c); +end +$$ language plpgsql window; +select pl_moving_avg(v) over (), v from test_table; + pl_moving_avg | v +--------------------+--- + 2 | 1 + 3.3333333333333333 | 3 + 5 | 6 + 6.6666666666666667 | 6 + 7 | 8 + 7 | 7 + 6 | 6 + 5 | 5 + 4.5 | 4 +(9 rows) + +create or replace function pl_lag_polymorphic(anyelement) +returns anyelement as $$ +declare + v $0%type; +begin + perform get_input_value_in_partition(windowobject, 1, -1, 'seek_current', false); + get pg_window_context v = PG_INPUT_VALUE; + return v; +end; +$$ language plpgsql window; +select pl_lag_polymorphic(v) over (), lag(v) over () from test_table; + pl_lag_polymorphic | lag +--------------------+----- + | + 1 | 1 + 3 | 3 + 6 | 6 + 6 | 6 + 8 | 8 + 7 | 7 + 6 | 6 + 5 | 5 +(9 rows) + +create or replace function pl_pcontext_test(numeric) +returns numeric as $$ +declare + n numeric; + v numeric; +begin + n := get_partition_context_value(windowobject, null::numeric); + + perform get_input_value_for_row(windowobject, 1); + get pg_window_context v = PG_INPUT_VALUE; + + perform set_partition_context_value(windowobject, v); + return n; +end +$$ +language plpgsql window; +select v, pl_pcontext_test(v) over () from generate_series(0.1, 1.0, 0.1) g(v); + v | pl_pcontext_test +-----+------------------ + 0.1 | + 0.2 | 0.1 + 0.3 | 0.2 + 0.4 | 0.3 + 0.5 | 0.4 + 0.6 | 0.5 + 0.7 | 0.6 + 0.8 | 0.7 + 0.9 | 0.8 + 1.0 | 0.9 +(10 rows) + +create table test_missing_values(id int, v integer); +insert into test_missing_values values(1,10),(2,11),(3,12),(4,null),(5,null),(6,15),(7,16); +create or replace function pl_pcontext_test(numeric) +returns numeric as $$ +declare + n numeric; + v numeric; +begin + perform get_input_value_for_row(windowobject, 1); + get pg_window_context v = PG_INPUT_VALUE; + + if v is null then + v := get_partition_context_value(windowobject, null::numeric); + else + perform set_partition_context_value(windowobject, v); + end if; + + return v; +end +$$ +language plpgsql window; +select id, v, pl_pcontext_test(v) over (order by id) from test_missing_values; + id | v | pl_pcontext_test +----+----+------------------ + 1 | 10 | 10 + 2 | 11 | 11 + 3 | 12 | 12 + 4 | | 12 + 5 | | 12 + 6 | 15 | 15 + 7 | 16 | 16 +(7 rows) + diff --git a/src/pl/plpgsql/src/pl_comp.c b/src/pl/plpgsql/src/pl_comp.c index e7f4a5f291..ee4bbb560e 100644 --- a/src/pl/plpgsql/src/pl_comp.c +++ b/src/pl/plpgsql/src/pl_comp.c @@ -582,6 +582,41 @@ do_compile(FunctionCallInfo fcinfo, true); } + if (function->fn_prokind == PROKIND_WINDOW) + { + PLpgSQL_type *dtype; + PLpgSQL_var *var; + + /* + * Add the promise variable windowobject with windowobjectproxy type + * + * Pseudotypes are disallowed for custom variables. It is checked + * in plpgsql_build_variable, so instead call this function, build + * promise variable here. + */ + + dtype = plpgsql_build_datatype(WINDOWOBJECTPROXYOID, + -1, + function->fn_input_collation, + NULL); + + /* this should be pseudotype */ + Assert(dtype->ttype == PLPGSQL_TTYPE_PSEUDO); + + var = palloc0(sizeof(PLpgSQL_var)); + + var->dtype = PLPGSQL_DTYPE_PROMISE; + var->promise = PLPGSQL_PROMISE_WINDOWOBJECT; + + var->refname = pstrdup("windowobject"); + var->datatype = dtype; + + plpgsql_adddatum((PLpgSQL_datum *) var); + plpgsql_ns_additem(PLPGSQL_NSTYPE_VAR, + var->dno, + var->refname); + } + ReleaseSysCache(typeTup); break; diff --git a/src/pl/plpgsql/src/pl_exec.c b/src/pl/plpgsql/src/pl_exec.c index d4a3d58daa..25858ed815 100644 --- a/src/pl/plpgsql/src/pl_exec.c +++ b/src/pl/plpgsql/src/pl_exec.c @@ -320,6 +320,8 @@ static int exec_stmt_rollback(PLpgSQL_execstate *estate, PLpgSQL_stmt_rollback *stmt); static int exec_stmt_set(PLpgSQL_execstate *estate, PLpgSQL_stmt_set *stmt); +static int exec_stmt_getwincxt(PLpgSQL_execstate *estate, + PLpgSQL_stmt_getwincxt *stmt); static void plpgsql_estate_setup(PLpgSQL_execstate *estate, PLpgSQL_function *func, @@ -593,6 +595,42 @@ plpgsql_exec_function(PLpgSQL_function *func, FunctionCallInfo fcinfo, */ exec_set_found(&estate, false); + /* + * Initialize promise winobject + */ + if (func->fn_prokind == PROKIND_WINDOW) + { + /* fcinfo is available in this function too */ + WindowObjectProxy wop; + MemoryContext oldcontext; + + oldcontext = MemoryContextSwitchTo(estate.datum_context); + + wop = palloc(sizeof(WindowObjectProxyData)); + SET_VARSIZE(wop, sizeof(WindowObjectProxy)); + + wop->winobj = PG_WINDOW_OBJECT(); + + Assert(WindowObjectIsValid(wop->winobj)); + + wop->value = (Datum) 0; + wop->isnull = true; + wop->isout = false; + wop->freeval = false; + wop->argno = -1; + wop->isvalid = false; + wop->last_argno = -1; + + wop->proxy_cxt = estate.datum_context; + wop->fcinfo = fcinfo; + + MemoryContextSwitchTo(oldcontext); + + estate.winobjproxy = wop; + } + else + estate.winobjproxy = NULL; + /* * Let the instrumentation plugin peek at this function */ @@ -915,6 +953,11 @@ plpgsql_exec_trigger(PLpgSQL_function *func, plpgsql_estate_setup(&estate, func, NULL, NULL, NULL); estate.trigdata = trigdata; + /* + * Trigger function cannot be WINDOW function + */ + estate.winobjproxy = NULL; + /* * Setup error traceback support for ereport() */ @@ -1293,11 +1336,13 @@ copy_plpgsql_datums(PLpgSQL_execstate *estate, PLpgSQL_datum *indatum = indatums[i]; PLpgSQL_datum *outdatum; + /* This must agree with plpgsql_finish_datums on what is copiable */ switch (indatum->dtype) { case PLPGSQL_DTYPE_VAR: case PLPGSQL_DTYPE_PROMISE: + outdatum = (PLpgSQL_datum *) ws_next; memcpy(outdatum, indatum, sizeof(PLpgSQL_var)); ws_next += MAXALIGN(sizeof(PLpgSQL_var)); @@ -1486,6 +1531,17 @@ plpgsql_fulfill_promise(PLpgSQL_execstate *estate, assign_text_var(estate, var, GetCommandTagName(estate->evtrigdata->tag)); break; + case PLPGSQL_PROMISE_WINDOWOBJECT: + if (!estate->winobjproxy) + elog(ERROR, "windowobject promise is not in a window function"); + + assign_simple_var(estate, + var, + PointerGetDatum(estate->winobjproxy), + false, + false); + break; + default: elog(ERROR, "unrecognized promise type: %d", var->promise); } @@ -1995,6 +2051,10 @@ exec_stmts(PLpgSQL_execstate *estate, List *stmts) rc = exec_stmt_getdiag(estate, (PLpgSQL_stmt_getdiag *) stmt); break; + case PLPGSQL_STMT_GETWINCXT: + rc = exec_stmt_getwincxt(estate, (PLpgSQL_stmt_getwincxt *) stmt); + break; + case PLPGSQL_STMT_IF: rc = exec_stmt_if(estate, (PLpgSQL_stmt_if *) stmt); break; @@ -2486,6 +2546,72 @@ exec_stmt_getdiag(PLpgSQL_execstate *estate, PLpgSQL_stmt_getdiag *stmt) return PLPGSQL_RC_OK; } +/* ---------- + * exec_stmt_getwincxt + * + * Inside this statement we can do safe cast from window_result type + * to some target variable. We should to ensure so target variable + * has same datatype as result type. Then we can safely copy + * referenced datum to target variable. + * + * ---------- + */ +static int +exec_stmt_getwincxt(PLpgSQL_execstate *estate, PLpgSQL_stmt_getwincxt *stmt) +{ + WindowObjectProxy wop; + ListCell *lc; + + wop = estate->winobjproxy; + + if (!wop) + elog(ERROR, "GET WINDOWS_CONTEXT is not in windows function"); + + + if (!wop->isvalid) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("windows context has not valid data"))); + + foreach(lc, stmt->items) + { + PLpgSQL_wincxt_item *item = (PLpgSQL_wincxt_item *) lfirst(lc); + + if (item->kind == PLPGSQL_GETWINCXT_INPUT_VALUE) + { + if (wop->argno >= 0) + exec_assign_value(estate, + estate->datums[item->target], + wop->value, + wop->isnull, + get_fn_expr_argtype(wop->fcinfo->flinfo, wop->argno), + -1); + else + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("windows context has not requested data"))); + } + else if (item->kind == PLPGSQL_GETWINCXT_IS_OUT_OF_INPUT) + { + if (wop->argno >= 0) + exec_assign_value(estate, + estate->datums[item->target], + BoolGetDatum(wop->isout), + false, + BOOLOID, + -1); + else + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("windows context has not requested data"))); + } + else + elog(ERROR, "unsupported PLpgSQL_getwincxt_kind"); + } + + return PLPGSQL_RC_OK; +} + /* ---------- * exec_stmt_if Evaluate a bool expression and * execute the true or false body diff --git a/src/pl/plpgsql/src/pl_funcs.c b/src/pl/plpgsql/src/pl_funcs.c index ee60ced583..7080b5c6da 100644 --- a/src/pl/plpgsql/src/pl_funcs.c +++ b/src/pl/plpgsql/src/pl_funcs.c @@ -274,6 +274,8 @@ plpgsql_stmt_typename(PLpgSQL_stmt *stmt) case PLPGSQL_STMT_GETDIAG: return ((PLpgSQL_stmt_getdiag *) stmt)->is_stacked ? "GET STACKED DIAGNOSTICS" : "GET DIAGNOSTICS"; + case PLPGSQL_STMT_GETWINCXT: + return "GET PG_WINDOW_CONTEXT"; case PLPGSQL_STMT_OPEN: return "OPEN"; case PLPGSQL_STMT_FETCH: @@ -363,6 +365,7 @@ static void free_execsql(PLpgSQL_stmt_execsql *stmt); static void free_dynexecute(PLpgSQL_stmt_dynexecute *stmt); static void free_dynfors(PLpgSQL_stmt_dynfors *stmt); static void free_getdiag(PLpgSQL_stmt_getdiag *stmt); +static void free_getwincxt(PLpgSQL_stmt_getwincxt *stmt); static void free_open(PLpgSQL_stmt_open *stmt); static void free_fetch(PLpgSQL_stmt_fetch *stmt); static void free_close(PLpgSQL_stmt_close *stmt); @@ -439,6 +442,9 @@ free_stmt(PLpgSQL_stmt *stmt) case PLPGSQL_STMT_GETDIAG: free_getdiag((PLpgSQL_stmt_getdiag *) stmt); break; + case PLPGSQL_STMT_GETWINCXT: + free_getwincxt((PLpgSQL_stmt_getwincxt *) stmt); + break; case PLPGSQL_STMT_OPEN: free_open((PLpgSQL_stmt_open *) stmt); break; @@ -723,6 +729,11 @@ free_getdiag(PLpgSQL_stmt_getdiag *stmt) { } +static void +free_getwincxt(PLpgSQL_stmt_getwincxt *stmt) +{ +} + static void free_expr(PLpgSQL_expr *expr) { @@ -820,6 +831,7 @@ static void dump_execsql(PLpgSQL_stmt_execsql *stmt); static void dump_dynexecute(PLpgSQL_stmt_dynexecute *stmt); static void dump_dynfors(PLpgSQL_stmt_dynfors *stmt); static void dump_getdiag(PLpgSQL_stmt_getdiag *stmt); +static void dump_getwincxt(PLpgSQL_stmt_getwincxt *stmt); static void dump_open(PLpgSQL_stmt_open *stmt); static void dump_fetch(PLpgSQL_stmt_fetch *stmt); static void dump_cursor_direction(PLpgSQL_stmt_fetch *stmt); @@ -907,6 +919,9 @@ dump_stmt(PLpgSQL_stmt *stmt) case PLPGSQL_STMT_GETDIAG: dump_getdiag((PLpgSQL_stmt_getdiag *) stmt); break; + case PLPGSQL_STMT_GETWINCXT: + dump_getwincxt((PLpgSQL_stmt_getwincxt *) stmt); + break; case PLPGSQL_STMT_OPEN: dump_open((PLpgSQL_stmt_open *) stmt); break; @@ -1614,6 +1629,41 @@ dump_getdiag(PLpgSQL_stmt_getdiag *stmt) printf("\n"); } +static const char * +getwincxt_kindname(PLpgSQL_getwincxt_kind kind) +{ + switch (kind) + { + case PLPGSQL_GETWINCXT_INPUT_VALUE: + return "PG_INPUT_VALUE"; + case PLPGSQL_GETWINCXT_IS_OUT_OF_INPUT: + return "PG_IS_OUT_OF_INPUT"; + default: + elog(ERROR, "unknown PLpgSQL_getwincxt_kind value"); + } +} + +static void +dump_getwincxt(PLpgSQL_stmt_getwincxt *stmt) +{ + ListCell *lc; + + dump_ind(); + + printf("GET PG_WINDOW_CONTEXT "); + foreach(lc, stmt->items) + { + PLpgSQL_wincxt_item *item = (PLpgSQL_wincxt_item *) lfirst(lc); + + if (lc != list_head(stmt->items)) + printf(", "); + + printf("{var %d} = %s", item->target, + getwincxt_kindname(item->kind)); + } + printf("\n"); +} + static void dump_expr(PLpgSQL_expr *expr) { diff --git a/src/pl/plpgsql/src/pl_gram.y b/src/pl/plpgsql/src/pl_gram.y index 5a7e1a4444..143b8a86ab 100644 --- a/src/pl/plpgsql/src/pl_gram.y +++ b/src/pl/plpgsql/src/pl_gram.y @@ -166,6 +166,7 @@ static void check_raise_parameters(PLpgSQL_stmt_raise *stmt); PLpgSQL_diag_item *diagitem; PLpgSQL_stmt_fetch *fetch; PLpgSQL_case_when *casewhen; + PLpgSQL_wincxt_item *wincxtitem; } %type <declhdr> decl_sect @@ -201,6 +202,9 @@ static void check_raise_parameters(PLpgSQL_stmt_raise *stmt); %type <stmt> stmt_open stmt_fetch stmt_move stmt_close stmt_null %type <stmt> stmt_commit stmt_rollback stmt_set %type <stmt> stmt_case stmt_foreach_a +%type <stmt> stmt_getwincxt +%type <list> get_wincxt_items +%type <wincxtitem> get_wincxt_item %type <list> proc_exceptions %type <exception_block> exception_sect @@ -325,6 +329,9 @@ static void check_raise_parameters(PLpgSQL_stmt_raise *stmt); %token <keyword> K_PG_EXCEPTION_CONTEXT %token <keyword> K_PG_EXCEPTION_DETAIL %token <keyword> K_PG_EXCEPTION_HINT +%token <keyword> K_PG_INPUT_VALUE +%token <keyword> K_PG_IS_OUT_OF_INPUT +%token <keyword> K_PG_WINDOW_CONTEXT %token <keyword> K_PRINT_STRICT_PARAMS %token <keyword> K_PRIOR %token <keyword> K_QUERY @@ -886,6 +893,8 @@ proc_stmt : pl_block ';' { $$ = $1; } | stmt_getdiag { $$ = $1; } + | stmt_getwincxt + { $$ = $1; } | stmt_open { $$ = $1; } | stmt_fetch @@ -1153,6 +1162,56 @@ assign_var : T_DATUM } ; +stmt_getwincxt : K_GET K_PG_WINDOW_CONTEXT get_wincxt_items ';' + { + PLpgSQL_stmt_getwincxt *new = palloc0(sizeof(PLpgSQL_stmt_getwincxt)); + + new->cmd_type = PLPGSQL_STMT_GETWINCXT; + + /* Allow this statement only inside window function */ + if (plpgsql_curr_compile->fn_prokind != PROKIND_WINDOW) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("GET WINDOW_FUNCTION_ARGUMENT statement can be used only in WINDOW function"))); + + new->lineno = plpgsql_location_to_lineno(@1); + new->stmtid = ++plpgsql_curr_compile->nstatements; + new->items = $3; + + $$ = (PLpgSQL_stmt *) new; + } + ; + +get_wincxt_items : get_wincxt_items ',' get_wincxt_item + { + $$ = lappend($1, $3); + } + | get_wincxt_item + { + $$ = list_make1($1); + } + ; + +get_wincxt_item: T_DATUM '=' K_PG_INPUT_VALUE + { + PLpgSQL_wincxt_item *item = palloc(sizeof(PLpgSQL_wincxt_item)); + + check_assignable($1.datum, @1); + + item->target = $1.datum->dno; + item->kind = PLPGSQL_GETWINCXT_INPUT_VALUE; + } + | T_DATUM '=' K_PG_IS_OUT_OF_INPUT + { + PLpgSQL_wincxt_item *item = palloc(sizeof(PLpgSQL_wincxt_item)); + + check_assignable($1.datum, @1); + + item->target = $1.datum->dno; + item->kind = PLPGSQL_GETWINCXT_IS_OUT_OF_INPUT; + } + ; + stmt_if : K_IF expr_until_then proc_sect stmt_elsifs stmt_else K_END K_IF ';' { PLpgSQL_stmt_if *new; diff --git a/src/pl/plpgsql/src/pl_unreserved_kwlist.h b/src/pl/plpgsql/src/pl_unreserved_kwlist.h index 99b3cf7d8a..67cb8b7203 100644 --- a/src/pl/plpgsql/src/pl_unreserved_kwlist.h +++ b/src/pl/plpgsql/src/pl_unreserved_kwlist.h @@ -84,6 +84,9 @@ PG_KEYWORD("pg_datatype_name", K_PG_DATATYPE_NAME) PG_KEYWORD("pg_exception_context", K_PG_EXCEPTION_CONTEXT) PG_KEYWORD("pg_exception_detail", K_PG_EXCEPTION_DETAIL) PG_KEYWORD("pg_exception_hint", K_PG_EXCEPTION_HINT) +PG_KEYWORD("pg_input_value", K_PG_INPUT_VALUE) +PG_KEYWORD("pg_is_out_of_input", K_PG_IS_OUT_OF_INPUT) +PG_KEYWORD("pg_window_context", K_PG_WINDOW_CONTEXT) PG_KEYWORD("print_strict_params", K_PRINT_STRICT_PARAMS) PG_KEYWORD("prior", K_PRIOR) PG_KEYWORD("query", K_QUERY) diff --git a/src/pl/plpgsql/src/plpgsql.h b/src/pl/plpgsql/src/plpgsql.h index 0c3d30fb13..16c898c4ef 100644 --- a/src/pl/plpgsql/src/plpgsql.h +++ b/src/pl/plpgsql/src/plpgsql.h @@ -23,6 +23,8 @@ #include "utils/expandedrecord.h" #include "utils/typcache.h" +#include "windowapi.h" + /********************************************************************** * Definitions @@ -84,7 +86,8 @@ typedef enum PLpgSQL_promise_type PLPGSQL_PROMISE_TG_NARGS, PLPGSQL_PROMISE_TG_ARGV, PLPGSQL_PROMISE_TG_EVENT, - PLPGSQL_PROMISE_TG_TAG + PLPGSQL_PROMISE_TG_TAG, + PLPGSQL_PROMISE_WINDOWOBJECT } PLpgSQL_promise_type; /* @@ -122,6 +125,7 @@ typedef enum PLpgSQL_stmt_type PLPGSQL_STMT_DYNEXECUTE, PLPGSQL_STMT_DYNFORS, PLPGSQL_STMT_GETDIAG, + PLPGSQL_STMT_GETWINCXT, PLPGSQL_STMT_OPEN, PLPGSQL_STMT_FETCH, PLPGSQL_STMT_CLOSE, @@ -162,6 +166,15 @@ typedef enum PLpgSQL_getdiag_kind PLPGSQL_GETDIAG_SCHEMA_NAME } PLpgSQL_getdiag_kind; +/* + * GET WINDOW_CONTEXT information items + */ +typedef enum PLpgSQL_getwincxt_kind +{ + PLPGSQL_GETWINCXT_INPUT_VALUE, + PLPGSQL_GETWINCXT_IS_OUT_OF_INPUT +} PLpgSQL_getwincxt_kind; + /* * RAISE statement options */ @@ -612,6 +625,26 @@ typedef struct PLpgSQL_stmt_getdiag List *diag_items; /* List of PLpgSQL_diag_item */ } PLpgSQL_stmt_getdiag; +/* + * GET PG_WINDOW_CONTEXT item + */ +typedef struct PLpgSQL_wincxt_item +{ + PLpgSQL_getwincxt_kind kind; /* id for diagnostic value desired */ + int target; /* where to assign it */ +} PLpgSQL_wincxt_item; + +/* + * GET PG_WINDOW_CONTEXT statement + */ +typedef struct PLpgSQL_stmt_getwincxt +{ + PLpgSQL_stmt_type cmd_type; + int lineno; + unsigned int stmtid; + List *items; +} PLpgSQL_stmt_getwincxt; + /* * IF statement */ @@ -1049,6 +1082,9 @@ typedef struct PLpgSQL_execstate TriggerData *trigdata; /* if regular trigger, data about firing */ EventTriggerData *evtrigdata; /* if event trigger, data about firing */ + WindowObjectProxy winobjproxy; /* for window function we need proxy + * object between PL and WinFucArg funcions */ + Datum retval; bool retisnull; Oid rettype; /* type of current retval */ diff --git a/src/pl/plpgsql/src/sql/plpgsql_window.sql b/src/pl/plpgsql/src/sql/plpgsql_window.sql new file mode 100644 index 0000000000..f2f275bd1a --- /dev/null +++ b/src/pl/plpgsql/src/sql/plpgsql_window.sql @@ -0,0 +1,144 @@ +create or replace function pl_row_number() +returns bigint as $$ +declare pos int8; +begin + pos := get_current_position(windowobject); + pos := pos + 1; + perform set_mark_position(windowobject, pos); + return pos; +end +$$ +language plpgsql window; + +select pl_row_number() over (), v from (values(10),(20),(30)) v(v); + +create or replace function pl_round_value(numeric) +returns int as $$ +declare + num numeric; +begin + perform get_input_value_for_row(windowobject, 1); + get pg_window_context num = PG_INPUT_VALUE; + return round(num); +end +$$ language plpgsql window; + +select pl_round_value(v) over(order by v desc) from generate_series(0.1, 1.0, 0.1) g(v); + +select pl_round_value(v + 1) over(order by v desc) from generate_series(0.1, 1.0, 0.1) g(v); + +create table test_table(v numeric); +insert into test_table values(1),(3),(6),(6),(8),(7),(6),(5),(4); + +create or replace function pl_lag(numeric) +returns numeric as $$ +declare + v numeric; +begin + perform get_input_value_in_partition(windowobject, 1, -1, 'seek_current', false); + get pg_window_context v = PG_INPUT_VALUE; + return v; +end; +$$ language plpgsql window; + +select pl_lag(v) over (), lag(v) over () from test_table; + +drop table test_table; + +create table test_table(v integer); +insert into test_table values(1),(3),(6),(6),(8),(7),(6),(5),(4); + +select pl_lag(v) over (), lag(v) over () from test_table; + +create or replace function pl_moving_avg(numeric) +returns numeric as $$ +declare + s numeric default 0.0; + v numeric; + c numeric default 0.0; +begin + -- look before + perform get_input_value_in_partition(windowobject, 1, -1, 'seek_current', false); + get pg_window_context v = PG_INPUT_VALUE; + if v is not null then + s := s + v; + c := c + 1.0; + end if; + + -- look after + perform get_input_value_in_partition(windowobject, 1, 0, 'seek_current', false); + get pg_window_context v = PG_INPUT_VALUE; + if v is not null then + s := s + v; + c := c + 1.0; + end if; + + perform get_input_value_in_partition(windowobject, 1, 1, 'seek_current', false); + get pg_window_context v = PG_INPUT_VALUE; + if v is not null then + s := s + v; + c := c + 1.0; + end if; + + return trim_scale(s / c); +end +$$ language plpgsql window; + +select pl_moving_avg(v) over (), v from test_table; + +create or replace function pl_lag_polymorphic(anyelement) +returns anyelement as $$ +declare + v $0%type; +begin + perform get_input_value_in_partition(windowobject, 1, -1, 'seek_current', false); + get pg_window_context v = PG_INPUT_VALUE; + return v; +end; +$$ language plpgsql window; + +select pl_lag_polymorphic(v) over (), lag(v) over () from test_table; + +create or replace function pl_pcontext_test(numeric) +returns numeric as $$ +declare + n numeric; + v numeric; +begin + n := get_partition_context_value(windowobject, null::numeric); + + perform get_input_value_for_row(windowobject, 1); + get pg_window_context v = PG_INPUT_VALUE; + + perform set_partition_context_value(windowobject, v); + return n; +end +$$ +language plpgsql window; + +select v, pl_pcontext_test(v) over () from generate_series(0.1, 1.0, 0.1) g(v); + +create table test_missing_values(id int, v integer); +insert into test_missing_values values(1,10),(2,11),(3,12),(4,null),(5,null),(6,15),(7,16); + +create or replace function pl_pcontext_test(numeric) +returns numeric as $$ +declare + n numeric; + v numeric; +begin + perform get_input_value_for_row(windowobject, 1); + get pg_window_context v = PG_INPUT_VALUE; + + if v is null then + v := get_partition_context_value(windowobject, null::numeric); + else + perform set_partition_context_value(windowobject, v); + end if; + + return v; +end +$$ +language plpgsql window; + +select id, v, pl_pcontext_test(v) over (order by id) from test_missing_values;