Hi
I wrote a proof concept for the support window function from plpgsql.
Window function API - functions named WinFuncArg* are polymorphic and it is
not easy to wrap these functions for usage from SQL level. I wrote an
enhancement of the GET statement - for this case GET WINDOW_CONTEXT, that
allows safe and fast access to the result of these functions.
Custom variant of row_number can look like:
create or replace function pl_row_number()
returns bigint as $$
declare pos int8;
begin
pos := get_current_position(windowobject);
pos := pos + 1;
perform set_mark_position(windowobject, pos);
return pos;
end
$$
language plpgsql window;
Custom variant of lag function can look like:
create or replace function pl_lag(numeric)
returns numeric as $$
declare
v numeric;
begin
perform get_input_value_in_partition(windowobject, 1, -1, 'seek_current',
false);
get pg_window_context v = PG_INPUT_VALUE;
return v;
end;
$$ language plpgsql window;
Custom window functions can be used for generating missing data in time
series
create table test_missing_values(id int, v integer);
insert into test_missing_values
values(1,10),(2,11),(3,12),(4,null),(5,null),(6,15),(7,16);
create or replace function pl_pcontext_test(numeric)
returns numeric as $$
declare
n numeric;
v numeric;
begin
perform get_input_value_for_row(windowobject, 1);
get pg_window_context v = PG_INPUT_VALUE;
if v is null then
v := get_partition_context_value(windowobject, null::numeric);
else
perform set_partition_context_value(windowobject, v);
end if;
return v;
end
$$
language plpgsql window;
select id, v, pl_pcontext_test(v) over (order by id) from
test_missing_values;
id | v | pl_pcontext_test.
----+----+------------------
1 | 10 | 10
2 | 11 | 11
3 | 12 | 12
4 | | 12
5 | | 12
6 | 15 | 15
7 | 16 | 16
(7 rows)
I think about another variant for WinFuncArg functions where polymorphic
argument is used similarly like in get_partition_context_value - this patch
is prototype, but it works and I think so support of custom window
functions in PL languages is possible and probably useful.
Comments, notes, ideas, objections?
Regards
Pavel
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index ba5a23ac25..fdc364c05e 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1418,6 +1418,18 @@ LANGUAGE internal
STRICT IMMUTABLE PARALLEL SAFE
AS 'unicode_is_normalized';
+CREATE OR REPLACE FUNCTION
+ get_partition_context_value(windowobjectproxy, anyelement, int4 DEFAULT NULL)
+ RETURNS anyelement
+LANGUAGE internal
+AS 'windowobject_get_partition_context_value';
+
+CREATE OR REPLACE FUNCTION
+ set_partition_context_value(windowobjectproxy, anyelement, int4 DEFAULT NULL)
+ RETURNS void
+LANGUAGE internal
+AS 'windowobject_set_partition_context_value';
+
--
-- The default permissions for functions mean that anyone can execute them.
-- A number of functions shouldn't be executable by just anyone, but rather
diff --git a/src/backend/utils/adt/pseudotypes.c b/src/backend/utils/adt/pseudotypes.c
index 3d6b2f9093..ebb2a16572 100644
--- a/src/backend/utils/adt/pseudotypes.c
+++ b/src/backend/utils/adt/pseudotypes.c
@@ -334,6 +334,17 @@ pg_node_tree_send(PG_FUNCTION_ARGS)
PSEUDOTYPE_DUMMY_IO_FUNCS(pg_ddl_command);
PSEUDOTYPE_DUMMY_BINARY_IO_FUNCS(pg_ddl_command);
+/*
+ * windowobjectproxy
+ *
+ * This type is pointer to WindowObjectProxyData. It is communication
+ * mechanism between PL environment and WinFuncArgs functions. Due
+ * performance reason I prefer using indirect result processing against
+ * using function returning polymorphic composite value. The indirect
+ * mechanism is implemented with proxy object represented by type
+ * WindowObjectProxyData.
+ */
+PSEUDOTYPE_DUMMY_IO_FUNCS(windowobjectproxy);
/*
* Dummy I/O functions for various other pseudotypes.
diff --git a/src/backend/utils/adt/windowfuncs.c b/src/backend/utils/adt/windowfuncs.c
index f0c8ae686d..f18495b228 100644
--- a/src/backend/utils/adt/windowfuncs.c
+++ b/src/backend/utils/adt/windowfuncs.c
@@ -14,6 +14,8 @@
#include "postgres.h"
#include "utils/builtins.h"
+#include "utils/datum.h"
+#include "utils/lsyscache.h"
#include "windowapi.h"
/*
@@ -35,6 +37,20 @@ typedef struct
int64 remainder; /* (total rows) % (bucket num) */
} ntile_context;
+#define PROXY_CONTEXT_MAGIC 19730715
+
+typedef struct
+{
+ int magic;
+ Oid typid;
+ int16 typlen;
+ bool typbyval;
+ int allocsize;
+ bool isnull;
+ Datum value;
+ char data[FLEXIBLE_ARRAY_MEMBER];
+} proxy_context;
+
static bool rank_up(WindowObject winobj);
static Datum leadlag_common(FunctionCallInfo fcinfo,
bool forward, bool withoffset, bool withdefault);
@@ -472,3 +488,485 @@ window_nth_value(PG_FUNCTION_ARGS)
PG_RETURN_DATUM(result);
}
+
+/*
+ * High level access function. These functions are wrappers for windows API
+ * for PL languages based on usage WindowObjectProxy.
+ */
+Datum
+windowobject_get_current_position(PG_FUNCTION_ARGS)
+{
+ WindowObjectProxy wop;
+ WindowObject winobj;
+ int64 pos;
+
+ wop = (WindowObjectProxy) DatumGetPointer(PG_GETARG_DATUM(0));
+
+ wop->isvalid = false;
+
+ winobj = wop->winobj;
+ Assert(WindowObjectIsValid(winobj));
+
+ pos = WinGetCurrentPosition(winobj);
+
+ PG_RETURN_INT64(pos);
+}
+
+Datum
+windowobject_set_mark_position(PG_FUNCTION_ARGS)
+{
+ WindowObjectProxy wop;
+ WindowObject winobj;
+ int64 pos;
+
+ wop = (WindowObjectProxy) DatumGetPointer(PG_GETARG_DATUM(0));
+
+ wop->isvalid = false;
+
+ winobj = wop->winobj;
+ Assert(WindowObjectIsValid(winobj));
+
+ pos = PG_GETARG_INT64(1);
+
+ WinSetMarkPosition(winobj, pos);
+
+ PG_RETURN_VOID();
+}
+
+Datum
+windowobject_get_partition_rowcount(PG_FUNCTION_ARGS)
+{
+ WindowObjectProxy wop;
+ WindowObject winobj;
+ int64 rc;
+
+ wop = (WindowObjectProxy) DatumGetPointer(PG_GETARG_DATUM(0));
+
+ wop->isvalid = false;
+
+ winobj = wop->winobj;
+ Assert(WindowObjectIsValid(winobj));
+
+ rc = WinGetPartitionRowCount(winobj);
+
+ PG_RETURN_INT64(rc);
+}
+
+Datum
+windowobject_rows_are_peers(PG_FUNCTION_ARGS)
+{
+ WindowObjectProxy wop;
+ WindowObject winobj;
+ int64 pos1,
+ pos2;
+
+ wop = (WindowObjectProxy) DatumGetPointer(PG_GETARG_DATUM(0));
+
+ wop->isvalid = false;
+
+ winobj = wop->winobj;
+ Assert(WindowObjectIsValid(winobj));
+
+ pos1 = PG_GETARG_INT64(1);
+ pos2 = PG_GETARG_INT64(2);
+
+ PG_RETURN_BOOL(WinRowsArePeers(winobj, pos1, pos2));
+}
+
+#define SEEK_CURRENT_STR "seek_current"
+#define SEEK_HEAD_STR "seek_head"
+#define SEEK_TAIL_STR "seek_tail"
+
+#define STRLEN(s) (sizeof(s) - 1)
+
+static int
+get_seek_type(text *seektype)
+{
+ char *str;
+ int len;
+ int result;
+
+ str = VARDATA_ANY(seektype);
+ len = VARSIZE_ANY_EXHDR(seektype);
+
+ if (len == STRLEN(SEEK_CURRENT_STR) && strncmp(str, SEEK_CURRENT_STR, len) == 0)
+ result = WINDOW_SEEK_CURRENT;
+ else if (len == STRLEN(SEEK_HEAD_STR) && strncmp(str, SEEK_HEAD_STR, len) == 0)
+ result = WINDOW_SEEK_HEAD;
+ else if (len == STRLEN(SEEK_TAIL_STR) && strncmp(str, SEEK_TAIL_STR, len) == 0)
+ result = WINDOW_SEEK_TAIL;
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("seek type value must be \"seek_current\", \"seek_head\" or \"seek_tail\"")));
+
+ return result;
+}
+
+/*
+ * Value should be copied to persistent memory context.
+ * GET PG_WINDOW_CONTEXT is second statement, and we should
+ * not to lost data.
+ */
+static void
+copy_datum_to_winobj_proxy(WindowObjectProxy wop,
+ int argno,
+ Datum value,
+ bool isnull)
+{
+ MemoryContext oldcxt;
+
+ if (argno != wop->last_argno)
+ {
+ Oid argtypid = get_fn_expr_argtype(wop->fcinfo->flinfo, argno);
+
+ argtypid = getBaseType(argtypid);
+ get_typlenbyval(argtypid, &wop->typlen, &wop->typbyval);
+ wop->last_argno = argno;
+ }
+
+ if (wop->freeval)
+ {
+ pfree(DatumGetPointer(wop->value));
+ wop->freeval = false;
+ }
+
+ if (!isnull)
+ {
+ oldcxt = MemoryContextSwitchTo(wop->proxy_cxt);
+
+ wop->value = datumCopy(value, wop->typbyval, wop->typlen);
+ wop->freeval = !wop->typbyval;
+ wop->isnull = false;
+
+ MemoryContextSwitchTo(oldcxt);
+ }
+ else
+ wop->isnull = true;
+
+ wop->argno = argno;
+ wop->isvalid = true;
+}
+
+Datum
+windowobject_get_func_arg_partition(PG_FUNCTION_ARGS)
+{
+ WindowObjectProxy wop;
+ WindowObject winobj;
+ int argno;
+ int relpos;
+ int seektype;
+ bool set_mark;
+ Datum value;
+ bool isnull;
+
+ wop = (WindowObjectProxy) DatumGetPointer(PG_GETARG_DATUM(0));
+
+ wop->isvalid = false;
+ wop->argno = -1;
+
+ winobj = wop->winobj;
+ Assert(WindowObjectIsValid(winobj));
+
+ argno = PG_GETARG_INT32(1);
+ if (argno < 1)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("arg number less than one")));
+
+ argno -= 1;
+
+ relpos = PG_GETARG_INT32(2);
+ seektype = get_seek_type(PG_GETARG_TEXT_P(3));
+ set_mark = PG_GETARG_BOOL(4);
+
+ value = WinGetFuncArgInPartition(winobj,
+ argno,
+ relpos,
+ seektype,
+ set_mark,
+ &isnull,
+ &wop->isout);
+
+ copy_datum_to_winobj_proxy(wop, argno, value, isnull);
+
+ PG_RETURN_VOID();
+}
+
+Datum
+windowobject_get_func_arg_frame(PG_FUNCTION_ARGS)
+{
+ WindowObjectProxy wop;
+ WindowObject winobj;
+ int argno;
+ int relpos;
+ int seektype;
+ bool set_mark;
+ Datum value;
+ bool isnull;
+
+ wop = (WindowObjectProxy) DatumGetPointer(PG_GETARG_DATUM(0));
+
+ wop->isvalid = false;
+ wop->argno = -1;
+
+ winobj = wop->winobj;
+ Assert(WindowObjectIsValid(winobj));
+
+ argno = PG_GETARG_INT32(1);
+ relpos = PG_GETARG_INT32(2);
+ seektype = get_seek_type(PG_GETARG_TEXT_P(3));
+ set_mark = PG_GETARG_BOOL(4);
+
+ if (argno < 1)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("arg number less than one")));
+
+ argno -= 1;
+
+ value = WinGetFuncArgInFrame(winobj,
+ argno,
+ relpos,
+ seektype,
+ set_mark,
+ &isnull,
+ &wop->isout);
+
+ copy_datum_to_winobj_proxy(wop, argno, value, isnull);
+
+ PG_RETURN_VOID();
+}
+
+Datum
+windowobject_get_func_arg_current(PG_FUNCTION_ARGS)
+{
+ WindowObjectProxy wop;
+ WindowObject winobj;
+ int argno;
+ Datum value;
+ bool isnull;
+
+ wop = (WindowObjectProxy) DatumGetPointer(PG_GETARG_DATUM(0));
+
+ wop->isvalid = false;
+ wop->argno = -1;
+
+ winobj = wop->winobj;
+
+ Assert(WindowObjectIsValid(winobj));
+
+ argno = PG_GETARG_INT32(1);
+
+ if (argno < 1)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("arg number less than one")));
+
+ argno -= 1;
+
+ value = WinGetFuncArgCurrent(winobj, argno, &isnull);
+ copy_datum_to_winobj_proxy(wop, argno, value, isnull);
+
+ wop->isout = false;
+
+ PG_RETURN_VOID();
+}
+
+static void
+copy_datum_to_partition_context(proxy_context *pcontext,
+ Datum value,
+ bool isnull)
+{
+ if (!isnull)
+ {
+ if (pcontext->typbyval)
+ pcontext->value = value;
+ else if (pcontext->typlen == -1)
+ {
+ struct varlena *s = (struct varlena *) DatumGetPointer(value);
+
+ memcpy(pcontext->data, s, VARSIZE_ANY(s));
+ pcontext->value = PointerGetDatum(pcontext->data);
+ }
+ else
+ {
+ memcpy(pcontext->data, DatumGetPointer(value), pcontext->typlen);
+ pcontext->value = PointerGetDatum(pcontext->data);
+ }
+
+ pcontext->isnull = false;
+ }
+ else
+ {
+ pcontext->value = (Datum) 0;
+ pcontext->isnull = true;
+ }
+}
+
+/*
+ * Returns estimated size of windowobject partition context
+ */
+static int
+estimate_partition_context_size(Datum value,
+ bool isnull,
+ int16 typlen,
+ int16 minsize,
+ int *realsize)
+{
+ if(typlen != -1)
+ {
+ if (typlen < sizeof(Datum))
+ {
+ *realsize = offsetof(proxy_context, data);
+
+ return *realsize;
+ }
+
+ if (typlen > 1024)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("size of value is greather than limit (1024 bytes)")));
+
+ *realsize = offsetof(proxy_context, data) + typlen;
+
+ return *realsize;
+ }
+ else
+ {
+ if (!isnull)
+ {
+ int size = VARSIZE_ANY_EXHDR(DatumGetPointer(value));
+
+ if (size > 1024)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("size of value is greather than limit (1024 bytes)")));
+
+ *realsize = size;
+
+ size += size / 3;
+
+ return offsetof(proxy_context, data)
+ + MAXALIGN(size > minsize ? size : minsize);
+ }
+ else
+ {
+ /* by default we allocate 30 bytes */
+ *realsize = 0;
+
+ return offsetof(proxy_context, data) + MAXALIGN(minsize);
+ }
+ }
+}
+
+#define VARLENA_MINSIZE 32
+
+static proxy_context *
+get_partition_context(FunctionCallInfo fcinfo, bool write_mode)
+{
+ WindowObjectProxy wop;
+ WindowObject winobj;
+ Oid typid;
+ int16 typlen;
+ bool typbyval;
+ Datum value = (Datum) 0;
+ bool isnull = true;
+ int allocsize;
+ int minsize;
+ int realsize;
+ proxy_context *pcontext;
+
+ if (PG_ARGISNULL(0))
+ ereport(ERROR,
+ (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+ errmsg("windowobject is NULL")));
+
+ wop = (WindowObjectProxy) DatumGetPointer(PG_GETARG_DATUM(0));
+ winobj = wop->winobj;
+ Assert(WindowObjectIsValid(winobj));
+
+ if (PG_ARGISNULL(2))
+ minsize = VARLENA_MINSIZE;
+ else
+ minsize = PG_GETARG_INT32(2);
+
+ if (!PG_ARGISNULL(1))
+ {
+ value = PG_GETARG_DATUM(1);
+ isnull = false;
+ }
+
+ typid = get_fn_expr_argtype(fcinfo->flinfo, 1);
+ if (!OidIsValid(typid))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot detect type of context value")));
+
+ typid = getBaseType(typid);
+ get_typlenbyval(typid, &typlen, &typbyval);
+
+ Assert(typlen != -2);
+
+ allocsize = estimate_partition_context_size(value,
+ isnull,
+ typlen,
+ minsize,
+ &realsize);
+
+ pcontext = (proxy_context *) WinGetPartitionLocalMemory(winobj, allocsize);
+
+ /* fresh pcontext has zeroed memory */
+ Assert(pcontext->magic == 0 || pcontext->magic == PROXY_CONTEXT_MAGIC);
+
+ if (pcontext->allocsize > 0)
+ {
+ if (realsize > pcontext->allocsize)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("the value cannot be saved to allocated buffer"),
+ errhint("Try to increase the minsize argument.")));
+
+ if (pcontext->typid != typid)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("partition context was initialized for different type")));
+
+ if (write_mode)
+ copy_datum_to_partition_context(pcontext, value, isnull);
+
+ }
+ else
+ {
+ pcontext->magic = PROXY_CONTEXT_MAGIC;
+ pcontext->typid = typid;
+ pcontext->typlen = typlen;
+ pcontext->typbyval = typbyval;
+ pcontext->allocsize = allocsize;
+
+ copy_datum_to_partition_context(pcontext, value, isnull);
+ }
+
+ return pcontext;
+}
+
+Datum
+windowobject_set_partition_context_value(PG_FUNCTION_ARGS)
+{
+ (void) get_partition_context(fcinfo, true);
+
+ PG_RETURN_VOID();
+}
+
+Datum
+windowobject_get_partition_context_value(PG_FUNCTION_ARGS)
+{
+ proxy_context *pcontext;
+
+ pcontext = get_partition_context(fcinfo, false);
+
+ if (pcontext->isnull)
+ PG_RETURN_NULL();
+
+ PG_RETURN_DATUM(pcontext->value);
+}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 27989971db..0393bd66c4 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -7130,6 +7130,12 @@
{ oid => '2305', descr => 'I/O',
proname => 'internal_out', prorettype => 'cstring', proargtypes => 'internal',
prosrc => 'internal_out' },
+{ oid => '9553', descr => 'I/O',
+ proname => 'windowobjectproxy_in', proisstrict => 'f', prorettype => 'windowobjectproxy',
+ proargtypes => 'cstring', prosrc => 'windowobjectproxy_in' },
+{ oid => '9554', descr => 'I/O',
+ proname => 'windowobjectproxy_out', prorettype => 'cstring', proargtypes => 'windowobjectproxy',
+ prosrc => 'windowobjectproxy_out' },
{ oid => '2312', descr => 'I/O',
proname => 'anyelement_in', prorettype => 'anyelement',
proargtypes => 'cstring', prosrc => 'anyelement_in' },
@@ -9742,7 +9748,36 @@
{ oid => '3114', descr => 'fetch the Nth row value',
proname => 'nth_value', prokind => 'w', prorettype => 'anyelement',
proargtypes => 'anyelement int4', prosrc => 'window_nth_value' },
-
+{ oid => '9555', descr => 'get current position from window object',
+ proname => 'get_current_position', prokind => 'f', prorettype => 'int8',
+ proargtypes => 'windowobjectproxy', prosrc => 'windowobject_get_current_position' },
+{ oid => '9556', descr => 'set current position in window object',
+ proname => 'set_mark_position', prokind => 'f', prorettype => 'void',
+ proargtypes => 'windowobjectproxy int8', prosrc => 'windowobject_set_mark_position' },
+{ oid => '9560', descr => 'get partition row count',
+ proname => 'get_partition_rowcount', prokind => 'f', prorettype => 'int8',
+ proargtypes => 'windowobjectproxy', prosrc => 'windowobject_get_partition_rowcount' },
+{ oid => '9561', descr => 'returns true if two positions are peers',
+ proname => 'rows_are_peers', prokind => 'f', prorettype => 'bool',
+ proargtypes => 'windowobjectproxy int8 int8', prosrc => 'windowobject_rows_are_peers' },
+{ oid => '9562', descr => 'returns argument of window function against to partition',
+ proname => 'get_input_value_in_partition', prokind => 'f', prorettype => 'void',
+ proargtypes => 'windowobjectproxy int4 int4 text bool', prosrc => 'windowobject_get_func_arg_partition' },
+{ oid => '9563', descr => 'returns argument of window function against to frame',
+ proname => 'get_input_value_in_frame', prokind => 'f', prorettype => 'void',
+ proargtypes => 'windowobjectproxy int4 int4 text bool', prosrc => 'windowobject_get_func_arg_frame' },
+{ oid => '9564', descr => 'returns argument of window function against to current row',
+ proname => 'get_input_value_for_row', prokind => 'f', prorettype => 'void',
+ proargtypes => 'windowobjectproxy int4', prosrc => 'windowobject_get_func_arg_current' },
+{ oid => '9565', descr => 'returns a value stored in a partition context',
+ proname => 'get_partition_context_value', prokind => 'f', prorettype => 'anyelement',
+ proargtypes => 'windowobjectproxy anyelement int4',
+ prosrc => 'windowobject_get_partition_context_value', proisstrict => 'f' },
+{ oid => '9566', descr => 'store a value to partition context',
+ proname => 'set_partition_context_value', prokind => 'f', prorettype => 'void',
+ proargtypes => 'windowobjectproxy anyelement int4',
+ prosrc => 'windowobject_set_partition_context_value',
+ proisstrict => 'f' },
# functions for range types
{ oid => '3832', descr => 'I/O',
proname => 'anyrange_in', provolatile => 's', prorettype => 'anyrange',
diff --git a/src/include/catalog/pg_type.dat b/src/include/catalog/pg_type.dat
index b2cec07416..78b166ecb1 100644
--- a/src/include/catalog/pg_type.dat
+++ b/src/include/catalog/pg_type.dat
@@ -560,6 +560,12 @@
typtype => 'p', typcategory => 'P', typinput => 'internal_in',
typoutput => 'internal_out', typreceive => '-', typsend => '-',
typalign => 'ALIGNOF_POINTER' },
+{ oid => '9552',
+ descr => 'pseudo-type representing an pointer to WindowObjectProxy structure',
+ typname => 'windowobjectproxy', typlen => '-1', typbyval => 't',
+ typtype => 'p', typcategory => 'P', typinput => 'windowobjectproxy_in',
+ typoutput => 'windowobjectproxy_out', typreceive => '-', typsend => '-',
+ typalign => 'd' },
{ oid => '2283', descr => 'pseudo-type representing a polymorphic base type',
typname => 'anyelement', typlen => '4', typbyval => 't', typtype => 'p',
typcategory => 'P', typinput => 'anyelement_in',
diff --git a/src/include/windowapi.h b/src/include/windowapi.h
index e8c9fc54d8..0ec6b82412 100644
--- a/src/include/windowapi.h
+++ b/src/include/windowapi.h
@@ -36,6 +36,33 @@
/* this struct is private in nodeWindowAgg.c */
typedef struct WindowObjectData *WindowObject;
+/*
+ * This type is used as proxy between PL variants of WinFuncArg
+ * functions and PL environment.
+ */
+typedef struct WindowObjectProxyData
+{
+ int32 vl_len; /* varlena header */
+ WindowObject winobj;
+
+ bool isout;
+ int argno;
+ bool isvalid;
+
+ Datum value;
+ bool isnull;
+ bool freeval;
+
+ int last_argno;
+ int16 typlen;
+ bool typbyval;
+
+ MemoryContext proxy_cxt;
+ FunctionCallInfo fcinfo;
+} WindowObjectProxyData;
+
+typedef WindowObjectProxyData *WindowObjectProxy;
+
#define PG_WINDOW_OBJECT() ((WindowObject) fcinfo->context)
#define WindowObjectIsValid(winobj) \
diff --git a/src/pl/plpgsql/src/Makefile b/src/pl/plpgsql/src/Makefile
index 193df8a010..ba6edfd42e 100644
--- a/src/pl/plpgsql/src/Makefile
+++ b/src/pl/plpgsql/src/Makefile
@@ -34,7 +34,7 @@ REGRESS_OPTS = --dbname=$(PL_TESTDB)
REGRESS = plpgsql_call plpgsql_control plpgsql_copy plpgsql_domain \
plpgsql_record plpgsql_cache plpgsql_simple plpgsql_transaction \
- plpgsql_trap plpgsql_trigger plpgsql_varprops
+ plpgsql_trap plpgsql_trigger plpgsql_varprops plpgsql_window
# where to find gen_keywordlist.pl and subsidiary files
TOOLSDIR = $(top_srcdir)/src/tools
diff --git a/src/pl/plpgsql/src/expected/plpgsql_window.out b/src/pl/plpgsql/src/expected/plpgsql_window.out
new file mode 100644
index 0000000000..9ee8189eb5
--- /dev/null
+++ b/src/pl/plpgsql/src/expected/plpgsql_window.out
@@ -0,0 +1,237 @@
+create or replace function pl_row_number()
+returns bigint as $$
+declare pos int8;
+begin
+ pos := get_current_position(windowobject);
+ pos := pos + 1;
+ perform set_mark_position(windowobject, pos);
+ return pos;
+end
+$$
+language plpgsql window;
+select pl_row_number() over (), v from (values(10),(20),(30)) v(v);
+ pl_row_number | v
+---------------+----
+ 1 | 10
+ 2 | 20
+ 3 | 30
+(3 rows)
+
+create or replace function pl_round_value(numeric)
+returns int as $$
+declare
+ num numeric;
+begin
+ perform get_input_value_for_row(windowobject, 1);
+ get pg_window_context num = PG_INPUT_VALUE;
+ return round(num);
+end
+$$ language plpgsql window;
+select pl_round_value(v) over(order by v desc) from generate_series(0.1, 1.0, 0.1) g(v);
+ pl_round_value
+----------------
+ 1
+ 1
+ 1
+ 1
+ 1
+ 1
+ 0
+ 0
+ 0
+ 0
+(10 rows)
+
+select pl_round_value(v + 1) over(order by v desc) from generate_series(0.1, 1.0, 0.1) g(v);
+ pl_round_value
+----------------
+ 2
+ 2
+ 2
+ 2
+ 2
+ 2
+ 1
+ 1
+ 1
+ 1
+(10 rows)
+
+create table test_table(v numeric);
+insert into test_table values(1),(3),(6),(6),(8),(7),(6),(5),(4);
+create or replace function pl_lag(numeric)
+returns numeric as $$
+declare
+ v numeric;
+begin
+ perform get_input_value_in_partition(windowobject, 1, -1, 'seek_current', false);
+ get pg_window_context v = PG_INPUT_VALUE;
+ return v;
+end;
+$$ language plpgsql window;
+select pl_lag(v) over (), lag(v) over () from test_table;
+ pl_lag | lag
+--------+-----
+ |
+ 1 | 1
+ 3 | 3
+ 6 | 6
+ 6 | 6
+ 8 | 8
+ 7 | 7
+ 6 | 6
+ 5 | 5
+(9 rows)
+
+drop table test_table;
+create table test_table(v integer);
+insert into test_table values(1),(3),(6),(6),(8),(7),(6),(5),(4);
+select pl_lag(v) over (), lag(v) over () from test_table;
+ pl_lag | lag
+--------+-----
+ |
+ 1 | 1
+ 3 | 3
+ 6 | 6
+ 6 | 6
+ 8 | 8
+ 7 | 7
+ 6 | 6
+ 5 | 5
+(9 rows)
+
+create or replace function pl_moving_avg(numeric)
+returns numeric as $$
+declare
+ s numeric default 0.0;
+ v numeric;
+ c numeric default 0.0;
+begin
+ -- look before
+ perform get_input_value_in_partition(windowobject, 1, -1, 'seek_current', false);
+ get pg_window_context v = PG_INPUT_VALUE;
+ if v is not null then
+ s := s + v;
+ c := c + 1.0;
+ end if;
+
+ -- look after
+ perform get_input_value_in_partition(windowobject, 1, 0, 'seek_current', false);
+ get pg_window_context v = PG_INPUT_VALUE;
+ if v is not null then
+ s := s + v;
+ c := c + 1.0;
+ end if;
+
+ perform get_input_value_in_partition(windowobject, 1, 1, 'seek_current', false);
+ get pg_window_context v = PG_INPUT_VALUE;
+ if v is not null then
+ s := s + v;
+ c := c + 1.0;
+ end if;
+
+ return trim_scale(s / c);
+end
+$$ language plpgsql window;
+select pl_moving_avg(v) over (), v from test_table;
+ pl_moving_avg | v
+--------------------+---
+ 2 | 1
+ 3.3333333333333333 | 3
+ 5 | 6
+ 6.6666666666666667 | 6
+ 7 | 8
+ 7 | 7
+ 6 | 6
+ 5 | 5
+ 4.5 | 4
+(9 rows)
+
+create or replace function pl_lag_polymorphic(anyelement)
+returns anyelement as $$
+declare
+ v $0%type;
+begin
+ perform get_input_value_in_partition(windowobject, 1, -1, 'seek_current', false);
+ get pg_window_context v = PG_INPUT_VALUE;
+ return v;
+end;
+$$ language plpgsql window;
+select pl_lag_polymorphic(v) over (), lag(v) over () from test_table;
+ pl_lag_polymorphic | lag
+--------------------+-----
+ |
+ 1 | 1
+ 3 | 3
+ 6 | 6
+ 6 | 6
+ 8 | 8
+ 7 | 7
+ 6 | 6
+ 5 | 5
+(9 rows)
+
+create or replace function pl_pcontext_test(numeric)
+returns numeric as $$
+declare
+ n numeric;
+ v numeric;
+begin
+ n := get_partition_context_value(windowobject, null::numeric);
+
+ perform get_input_value_for_row(windowobject, 1);
+ get pg_window_context v = PG_INPUT_VALUE;
+
+ perform set_partition_context_value(windowobject, v);
+ return n;
+end
+$$
+language plpgsql window;
+select v, pl_pcontext_test(v) over () from generate_series(0.1, 1.0, 0.1) g(v);
+ v | pl_pcontext_test
+-----+------------------
+ 0.1 |
+ 0.2 | 0.1
+ 0.3 | 0.2
+ 0.4 | 0.3
+ 0.5 | 0.4
+ 0.6 | 0.5
+ 0.7 | 0.6
+ 0.8 | 0.7
+ 0.9 | 0.8
+ 1.0 | 0.9
+(10 rows)
+
+create table test_missing_values(id int, v integer);
+insert into test_missing_values values(1,10),(2,11),(3,12),(4,null),(5,null),(6,15),(7,16);
+create or replace function pl_pcontext_test(numeric)
+returns numeric as $$
+declare
+ n numeric;
+ v numeric;
+begin
+ perform get_input_value_for_row(windowobject, 1);
+ get pg_window_context v = PG_INPUT_VALUE;
+
+ if v is null then
+ v := get_partition_context_value(windowobject, null::numeric);
+ else
+ perform set_partition_context_value(windowobject, v);
+ end if;
+
+ return v;
+end
+$$
+language plpgsql window;
+select id, v, pl_pcontext_test(v) over (order by id) from test_missing_values;
+ id | v | pl_pcontext_test
+----+----+------------------
+ 1 | 10 | 10
+ 2 | 11 | 11
+ 3 | 12 | 12
+ 4 | | 12
+ 5 | | 12
+ 6 | 15 | 15
+ 7 | 16 | 16
+(7 rows)
+
diff --git a/src/pl/plpgsql/src/pl_comp.c b/src/pl/plpgsql/src/pl_comp.c
index e7f4a5f291..ee4bbb560e 100644
--- a/src/pl/plpgsql/src/pl_comp.c
+++ b/src/pl/plpgsql/src/pl_comp.c
@@ -582,6 +582,41 @@ do_compile(FunctionCallInfo fcinfo,
true);
}
+ if (function->fn_prokind == PROKIND_WINDOW)
+ {
+ PLpgSQL_type *dtype;
+ PLpgSQL_var *var;
+
+ /*
+ * Add the promise variable windowobject with windowobjectproxy type
+ *
+ * Pseudotypes are disallowed for custom variables. It is checked
+ * in plpgsql_build_variable, so instead call this function, build
+ * promise variable here.
+ */
+
+ dtype = plpgsql_build_datatype(WINDOWOBJECTPROXYOID,
+ -1,
+ function->fn_input_collation,
+ NULL);
+
+ /* this should be pseudotype */
+ Assert(dtype->ttype == PLPGSQL_TTYPE_PSEUDO);
+
+ var = palloc0(sizeof(PLpgSQL_var));
+
+ var->dtype = PLPGSQL_DTYPE_PROMISE;
+ var->promise = PLPGSQL_PROMISE_WINDOWOBJECT;
+
+ var->refname = pstrdup("windowobject");
+ var->datatype = dtype;
+
+ plpgsql_adddatum((PLpgSQL_datum *) var);
+ plpgsql_ns_additem(PLPGSQL_NSTYPE_VAR,
+ var->dno,
+ var->refname);
+ }
+
ReleaseSysCache(typeTup);
break;
diff --git a/src/pl/plpgsql/src/pl_exec.c b/src/pl/plpgsql/src/pl_exec.c
index d4a3d58daa..25858ed815 100644
--- a/src/pl/plpgsql/src/pl_exec.c
+++ b/src/pl/plpgsql/src/pl_exec.c
@@ -320,6 +320,8 @@ static int exec_stmt_rollback(PLpgSQL_execstate *estate,
PLpgSQL_stmt_rollback *stmt);
static int exec_stmt_set(PLpgSQL_execstate *estate,
PLpgSQL_stmt_set *stmt);
+static int exec_stmt_getwincxt(PLpgSQL_execstate *estate,
+ PLpgSQL_stmt_getwincxt *stmt);
static void plpgsql_estate_setup(PLpgSQL_execstate *estate,
PLpgSQL_function *func,
@@ -593,6 +595,42 @@ plpgsql_exec_function(PLpgSQL_function *func, FunctionCallInfo fcinfo,
*/
exec_set_found(&estate, false);
+ /*
+ * Initialize promise winobject
+ */
+ if (func->fn_prokind == PROKIND_WINDOW)
+ {
+ /* fcinfo is available in this function too */
+ WindowObjectProxy wop;
+ MemoryContext oldcontext;
+
+ oldcontext = MemoryContextSwitchTo(estate.datum_context);
+
+ wop = palloc(sizeof(WindowObjectProxyData));
+ SET_VARSIZE(wop, sizeof(WindowObjectProxy));
+
+ wop->winobj = PG_WINDOW_OBJECT();
+
+ Assert(WindowObjectIsValid(wop->winobj));
+
+ wop->value = (Datum) 0;
+ wop->isnull = true;
+ wop->isout = false;
+ wop->freeval = false;
+ wop->argno = -1;
+ wop->isvalid = false;
+ wop->last_argno = -1;
+
+ wop->proxy_cxt = estate.datum_context;
+ wop->fcinfo = fcinfo;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ estate.winobjproxy = wop;
+ }
+ else
+ estate.winobjproxy = NULL;
+
/*
* Let the instrumentation plugin peek at this function
*/
@@ -915,6 +953,11 @@ plpgsql_exec_trigger(PLpgSQL_function *func,
plpgsql_estate_setup(&estate, func, NULL, NULL, NULL);
estate.trigdata = trigdata;
+ /*
+ * Trigger function cannot be WINDOW function
+ */
+ estate.winobjproxy = NULL;
+
/*
* Setup error traceback support for ereport()
*/
@@ -1293,11 +1336,13 @@ copy_plpgsql_datums(PLpgSQL_execstate *estate,
PLpgSQL_datum *indatum = indatums[i];
PLpgSQL_datum *outdatum;
+
/* This must agree with plpgsql_finish_datums on what is copiable */
switch (indatum->dtype)
{
case PLPGSQL_DTYPE_VAR:
case PLPGSQL_DTYPE_PROMISE:
+
outdatum = (PLpgSQL_datum *) ws_next;
memcpy(outdatum, indatum, sizeof(PLpgSQL_var));
ws_next += MAXALIGN(sizeof(PLpgSQL_var));
@@ -1486,6 +1531,17 @@ plpgsql_fulfill_promise(PLpgSQL_execstate *estate,
assign_text_var(estate, var, GetCommandTagName(estate->evtrigdata->tag));
break;
+ case PLPGSQL_PROMISE_WINDOWOBJECT:
+ if (!estate->winobjproxy)
+ elog(ERROR, "windowobject promise is not in a window function");
+
+ assign_simple_var(estate,
+ var,
+ PointerGetDatum(estate->winobjproxy),
+ false,
+ false);
+ break;
+
default:
elog(ERROR, "unrecognized promise type: %d", var->promise);
}
@@ -1995,6 +2051,10 @@ exec_stmts(PLpgSQL_execstate *estate, List *stmts)
rc = exec_stmt_getdiag(estate, (PLpgSQL_stmt_getdiag *) stmt);
break;
+ case PLPGSQL_STMT_GETWINCXT:
+ rc = exec_stmt_getwincxt(estate, (PLpgSQL_stmt_getwincxt *) stmt);
+ break;
+
case PLPGSQL_STMT_IF:
rc = exec_stmt_if(estate, (PLpgSQL_stmt_if *) stmt);
break;
@@ -2486,6 +2546,72 @@ exec_stmt_getdiag(PLpgSQL_execstate *estate, PLpgSQL_stmt_getdiag *stmt)
return PLPGSQL_RC_OK;
}
+/* ----------
+ * exec_stmt_getwincxt
+ *
+ * Inside this statement we can do safe cast from window_result type
+ * to some target variable. We should to ensure so target variable
+ * has same datatype as result type. Then we can safely copy
+ * referenced datum to target variable.
+ *
+ * ----------
+ */
+static int
+exec_stmt_getwincxt(PLpgSQL_execstate *estate, PLpgSQL_stmt_getwincxt *stmt)
+{
+ WindowObjectProxy wop;
+ ListCell *lc;
+
+ wop = estate->winobjproxy;
+
+ if (!wop)
+ elog(ERROR, "GET WINDOWS_CONTEXT is not in windows function");
+
+
+ if (!wop->isvalid)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("windows context has not valid data")));
+
+ foreach(lc, stmt->items)
+ {
+ PLpgSQL_wincxt_item *item = (PLpgSQL_wincxt_item *) lfirst(lc);
+
+ if (item->kind == PLPGSQL_GETWINCXT_INPUT_VALUE)
+ {
+ if (wop->argno >= 0)
+ exec_assign_value(estate,
+ estate->datums[item->target],
+ wop->value,
+ wop->isnull,
+ get_fn_expr_argtype(wop->fcinfo->flinfo, wop->argno),
+ -1);
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("windows context has not requested data")));
+ }
+ else if (item->kind == PLPGSQL_GETWINCXT_IS_OUT_OF_INPUT)
+ {
+ if (wop->argno >= 0)
+ exec_assign_value(estate,
+ estate->datums[item->target],
+ BoolGetDatum(wop->isout),
+ false,
+ BOOLOID,
+ -1);
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("windows context has not requested data")));
+ }
+ else
+ elog(ERROR, "unsupported PLpgSQL_getwincxt_kind");
+ }
+
+ return PLPGSQL_RC_OK;
+}
+
/* ----------
* exec_stmt_if Evaluate a bool expression and
* execute the true or false body
diff --git a/src/pl/plpgsql/src/pl_funcs.c b/src/pl/plpgsql/src/pl_funcs.c
index ee60ced583..7080b5c6da 100644
--- a/src/pl/plpgsql/src/pl_funcs.c
+++ b/src/pl/plpgsql/src/pl_funcs.c
@@ -274,6 +274,8 @@ plpgsql_stmt_typename(PLpgSQL_stmt *stmt)
case PLPGSQL_STMT_GETDIAG:
return ((PLpgSQL_stmt_getdiag *) stmt)->is_stacked ?
"GET STACKED DIAGNOSTICS" : "GET DIAGNOSTICS";
+ case PLPGSQL_STMT_GETWINCXT:
+ return "GET PG_WINDOW_CONTEXT";
case PLPGSQL_STMT_OPEN:
return "OPEN";
case PLPGSQL_STMT_FETCH:
@@ -363,6 +365,7 @@ static void free_execsql(PLpgSQL_stmt_execsql *stmt);
static void free_dynexecute(PLpgSQL_stmt_dynexecute *stmt);
static void free_dynfors(PLpgSQL_stmt_dynfors *stmt);
static void free_getdiag(PLpgSQL_stmt_getdiag *stmt);
+static void free_getwincxt(PLpgSQL_stmt_getwincxt *stmt);
static void free_open(PLpgSQL_stmt_open *stmt);
static void free_fetch(PLpgSQL_stmt_fetch *stmt);
static void free_close(PLpgSQL_stmt_close *stmt);
@@ -439,6 +442,9 @@ free_stmt(PLpgSQL_stmt *stmt)
case PLPGSQL_STMT_GETDIAG:
free_getdiag((PLpgSQL_stmt_getdiag *) stmt);
break;
+ case PLPGSQL_STMT_GETWINCXT:
+ free_getwincxt((PLpgSQL_stmt_getwincxt *) stmt);
+ break;
case PLPGSQL_STMT_OPEN:
free_open((PLpgSQL_stmt_open *) stmt);
break;
@@ -723,6 +729,11 @@ free_getdiag(PLpgSQL_stmt_getdiag *stmt)
{
}
+static void
+free_getwincxt(PLpgSQL_stmt_getwincxt *stmt)
+{
+}
+
static void
free_expr(PLpgSQL_expr *expr)
{
@@ -820,6 +831,7 @@ static void dump_execsql(PLpgSQL_stmt_execsql *stmt);
static void dump_dynexecute(PLpgSQL_stmt_dynexecute *stmt);
static void dump_dynfors(PLpgSQL_stmt_dynfors *stmt);
static void dump_getdiag(PLpgSQL_stmt_getdiag *stmt);
+static void dump_getwincxt(PLpgSQL_stmt_getwincxt *stmt);
static void dump_open(PLpgSQL_stmt_open *stmt);
static void dump_fetch(PLpgSQL_stmt_fetch *stmt);
static void dump_cursor_direction(PLpgSQL_stmt_fetch *stmt);
@@ -907,6 +919,9 @@ dump_stmt(PLpgSQL_stmt *stmt)
case PLPGSQL_STMT_GETDIAG:
dump_getdiag((PLpgSQL_stmt_getdiag *) stmt);
break;
+ case PLPGSQL_STMT_GETWINCXT:
+ dump_getwincxt((PLpgSQL_stmt_getwincxt *) stmt);
+ break;
case PLPGSQL_STMT_OPEN:
dump_open((PLpgSQL_stmt_open *) stmt);
break;
@@ -1614,6 +1629,41 @@ dump_getdiag(PLpgSQL_stmt_getdiag *stmt)
printf("\n");
}
+static const char *
+getwincxt_kindname(PLpgSQL_getwincxt_kind kind)
+{
+ switch (kind)
+ {
+ case PLPGSQL_GETWINCXT_INPUT_VALUE:
+ return "PG_INPUT_VALUE";
+ case PLPGSQL_GETWINCXT_IS_OUT_OF_INPUT:
+ return "PG_IS_OUT_OF_INPUT";
+ default:
+ elog(ERROR, "unknown PLpgSQL_getwincxt_kind value");
+ }
+}
+
+static void
+dump_getwincxt(PLpgSQL_stmt_getwincxt *stmt)
+{
+ ListCell *lc;
+
+ dump_ind();
+
+ printf("GET PG_WINDOW_CONTEXT ");
+ foreach(lc, stmt->items)
+ {
+ PLpgSQL_wincxt_item *item = (PLpgSQL_wincxt_item *) lfirst(lc);
+
+ if (lc != list_head(stmt->items))
+ printf(", ");
+
+ printf("{var %d} = %s", item->target,
+ getwincxt_kindname(item->kind));
+ }
+ printf("\n");
+}
+
static void
dump_expr(PLpgSQL_expr *expr)
{
diff --git a/src/pl/plpgsql/src/pl_gram.y b/src/pl/plpgsql/src/pl_gram.y
index 5a7e1a4444..143b8a86ab 100644
--- a/src/pl/plpgsql/src/pl_gram.y
+++ b/src/pl/plpgsql/src/pl_gram.y
@@ -166,6 +166,7 @@ static void check_raise_parameters(PLpgSQL_stmt_raise *stmt);
PLpgSQL_diag_item *diagitem;
PLpgSQL_stmt_fetch *fetch;
PLpgSQL_case_when *casewhen;
+ PLpgSQL_wincxt_item *wincxtitem;
}
%type <declhdr> decl_sect
@@ -201,6 +202,9 @@ static void check_raise_parameters(PLpgSQL_stmt_raise *stmt);
%type <stmt> stmt_open stmt_fetch stmt_move stmt_close stmt_null
%type <stmt> stmt_commit stmt_rollback stmt_set
%type <stmt> stmt_case stmt_foreach_a
+%type <stmt> stmt_getwincxt
+%type <list> get_wincxt_items
+%type <wincxtitem> get_wincxt_item
%type <list> proc_exceptions
%type <exception_block> exception_sect
@@ -325,6 +329,9 @@ static void check_raise_parameters(PLpgSQL_stmt_raise *stmt);
%token <keyword> K_PG_EXCEPTION_CONTEXT
%token <keyword> K_PG_EXCEPTION_DETAIL
%token <keyword> K_PG_EXCEPTION_HINT
+%token <keyword> K_PG_INPUT_VALUE
+%token <keyword> K_PG_IS_OUT_OF_INPUT
+%token <keyword> K_PG_WINDOW_CONTEXT
%token <keyword> K_PRINT_STRICT_PARAMS
%token <keyword> K_PRIOR
%token <keyword> K_QUERY
@@ -886,6 +893,8 @@ proc_stmt : pl_block ';'
{ $$ = $1; }
| stmt_getdiag
{ $$ = $1; }
+ | stmt_getwincxt
+ { $$ = $1; }
| stmt_open
{ $$ = $1; }
| stmt_fetch
@@ -1153,6 +1162,56 @@ assign_var : T_DATUM
}
;
+stmt_getwincxt : K_GET K_PG_WINDOW_CONTEXT get_wincxt_items ';'
+ {
+ PLpgSQL_stmt_getwincxt *new = palloc0(sizeof(PLpgSQL_stmt_getwincxt));
+
+ new->cmd_type = PLPGSQL_STMT_GETWINCXT;
+
+ /* Allow this statement only inside window function */
+ if (plpgsql_curr_compile->fn_prokind != PROKIND_WINDOW)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("GET WINDOW_FUNCTION_ARGUMENT statement can be used only in WINDOW function")));
+
+ new->lineno = plpgsql_location_to_lineno(@1);
+ new->stmtid = ++plpgsql_curr_compile->nstatements;
+ new->items = $3;
+
+ $$ = (PLpgSQL_stmt *) new;
+ }
+ ;
+
+get_wincxt_items : get_wincxt_items ',' get_wincxt_item
+ {
+ $$ = lappend($1, $3);
+ }
+ | get_wincxt_item
+ {
+ $$ = list_make1($1);
+ }
+ ;
+
+get_wincxt_item: T_DATUM '=' K_PG_INPUT_VALUE
+ {
+ PLpgSQL_wincxt_item *item = palloc(sizeof(PLpgSQL_wincxt_item));
+
+ check_assignable($1.datum, @1);
+
+ item->target = $1.datum->dno;
+ item->kind = PLPGSQL_GETWINCXT_INPUT_VALUE;
+ }
+ | T_DATUM '=' K_PG_IS_OUT_OF_INPUT
+ {
+ PLpgSQL_wincxt_item *item = palloc(sizeof(PLpgSQL_wincxt_item));
+
+ check_assignable($1.datum, @1);
+
+ item->target = $1.datum->dno;
+ item->kind = PLPGSQL_GETWINCXT_IS_OUT_OF_INPUT;
+ }
+ ;
+
stmt_if : K_IF expr_until_then proc_sect stmt_elsifs stmt_else K_END K_IF ';'
{
PLpgSQL_stmt_if *new;
diff --git a/src/pl/plpgsql/src/pl_unreserved_kwlist.h b/src/pl/plpgsql/src/pl_unreserved_kwlist.h
index 99b3cf7d8a..67cb8b7203 100644
--- a/src/pl/plpgsql/src/pl_unreserved_kwlist.h
+++ b/src/pl/plpgsql/src/pl_unreserved_kwlist.h
@@ -84,6 +84,9 @@ PG_KEYWORD("pg_datatype_name", K_PG_DATATYPE_NAME)
PG_KEYWORD("pg_exception_context", K_PG_EXCEPTION_CONTEXT)
PG_KEYWORD("pg_exception_detail", K_PG_EXCEPTION_DETAIL)
PG_KEYWORD("pg_exception_hint", K_PG_EXCEPTION_HINT)
+PG_KEYWORD("pg_input_value", K_PG_INPUT_VALUE)
+PG_KEYWORD("pg_is_out_of_input", K_PG_IS_OUT_OF_INPUT)
+PG_KEYWORD("pg_window_context", K_PG_WINDOW_CONTEXT)
PG_KEYWORD("print_strict_params", K_PRINT_STRICT_PARAMS)
PG_KEYWORD("prior", K_PRIOR)
PG_KEYWORD("query", K_QUERY)
diff --git a/src/pl/plpgsql/src/plpgsql.h b/src/pl/plpgsql/src/plpgsql.h
index 0c3d30fb13..16c898c4ef 100644
--- a/src/pl/plpgsql/src/plpgsql.h
+++ b/src/pl/plpgsql/src/plpgsql.h
@@ -23,6 +23,8 @@
#include "utils/expandedrecord.h"
#include "utils/typcache.h"
+#include "windowapi.h"
+
/**********************************************************************
* Definitions
@@ -84,7 +86,8 @@ typedef enum PLpgSQL_promise_type
PLPGSQL_PROMISE_TG_NARGS,
PLPGSQL_PROMISE_TG_ARGV,
PLPGSQL_PROMISE_TG_EVENT,
- PLPGSQL_PROMISE_TG_TAG
+ PLPGSQL_PROMISE_TG_TAG,
+ PLPGSQL_PROMISE_WINDOWOBJECT
} PLpgSQL_promise_type;
/*
@@ -122,6 +125,7 @@ typedef enum PLpgSQL_stmt_type
PLPGSQL_STMT_DYNEXECUTE,
PLPGSQL_STMT_DYNFORS,
PLPGSQL_STMT_GETDIAG,
+ PLPGSQL_STMT_GETWINCXT,
PLPGSQL_STMT_OPEN,
PLPGSQL_STMT_FETCH,
PLPGSQL_STMT_CLOSE,
@@ -162,6 +166,15 @@ typedef enum PLpgSQL_getdiag_kind
PLPGSQL_GETDIAG_SCHEMA_NAME
} PLpgSQL_getdiag_kind;
+/*
+ * GET WINDOW_CONTEXT information items
+ */
+typedef enum PLpgSQL_getwincxt_kind
+{
+ PLPGSQL_GETWINCXT_INPUT_VALUE,
+ PLPGSQL_GETWINCXT_IS_OUT_OF_INPUT
+} PLpgSQL_getwincxt_kind;
+
/*
* RAISE statement options
*/
@@ -612,6 +625,26 @@ typedef struct PLpgSQL_stmt_getdiag
List *diag_items; /* List of PLpgSQL_diag_item */
} PLpgSQL_stmt_getdiag;
+/*
+ * GET PG_WINDOW_CONTEXT item
+ */
+typedef struct PLpgSQL_wincxt_item
+{
+ PLpgSQL_getwincxt_kind kind; /* id for diagnostic value desired */
+ int target; /* where to assign it */
+} PLpgSQL_wincxt_item;
+
+/*
+ * GET PG_WINDOW_CONTEXT statement
+ */
+typedef struct PLpgSQL_stmt_getwincxt
+{
+ PLpgSQL_stmt_type cmd_type;
+ int lineno;
+ unsigned int stmtid;
+ List *items;
+} PLpgSQL_stmt_getwincxt;
+
/*
* IF statement
*/
@@ -1049,6 +1082,9 @@ typedef struct PLpgSQL_execstate
TriggerData *trigdata; /* if regular trigger, data about firing */
EventTriggerData *evtrigdata; /* if event trigger, data about firing */
+ WindowObjectProxy winobjproxy; /* for window function we need proxy
+ * object between PL and WinFucArg funcions */
+
Datum retval;
bool retisnull;
Oid rettype; /* type of current retval */
diff --git a/src/pl/plpgsql/src/sql/plpgsql_window.sql b/src/pl/plpgsql/src/sql/plpgsql_window.sql
new file mode 100644
index 0000000000..f2f275bd1a
--- /dev/null
+++ b/src/pl/plpgsql/src/sql/plpgsql_window.sql
@@ -0,0 +1,144 @@
+create or replace function pl_row_number()
+returns bigint as $$
+declare pos int8;
+begin
+ pos := get_current_position(windowobject);
+ pos := pos + 1;
+ perform set_mark_position(windowobject, pos);
+ return pos;
+end
+$$
+language plpgsql window;
+
+select pl_row_number() over (), v from (values(10),(20),(30)) v(v);
+
+create or replace function pl_round_value(numeric)
+returns int as $$
+declare
+ num numeric;
+begin
+ perform get_input_value_for_row(windowobject, 1);
+ get pg_window_context num = PG_INPUT_VALUE;
+ return round(num);
+end
+$$ language plpgsql window;
+
+select pl_round_value(v) over(order by v desc) from generate_series(0.1, 1.0, 0.1) g(v);
+
+select pl_round_value(v + 1) over(order by v desc) from generate_series(0.1, 1.0, 0.1) g(v);
+
+create table test_table(v numeric);
+insert into test_table values(1),(3),(6),(6),(8),(7),(6),(5),(4);
+
+create or replace function pl_lag(numeric)
+returns numeric as $$
+declare
+ v numeric;
+begin
+ perform get_input_value_in_partition(windowobject, 1, -1, 'seek_current', false);
+ get pg_window_context v = PG_INPUT_VALUE;
+ return v;
+end;
+$$ language plpgsql window;
+
+select pl_lag(v) over (), lag(v) over () from test_table;
+
+drop table test_table;
+
+create table test_table(v integer);
+insert into test_table values(1),(3),(6),(6),(8),(7),(6),(5),(4);
+
+select pl_lag(v) over (), lag(v) over () from test_table;
+
+create or replace function pl_moving_avg(numeric)
+returns numeric as $$
+declare
+ s numeric default 0.0;
+ v numeric;
+ c numeric default 0.0;
+begin
+ -- look before
+ perform get_input_value_in_partition(windowobject, 1, -1, 'seek_current', false);
+ get pg_window_context v = PG_INPUT_VALUE;
+ if v is not null then
+ s := s + v;
+ c := c + 1.0;
+ end if;
+
+ -- look after
+ perform get_input_value_in_partition(windowobject, 1, 0, 'seek_current', false);
+ get pg_window_context v = PG_INPUT_VALUE;
+ if v is not null then
+ s := s + v;
+ c := c + 1.0;
+ end if;
+
+ perform get_input_value_in_partition(windowobject, 1, 1, 'seek_current', false);
+ get pg_window_context v = PG_INPUT_VALUE;
+ if v is not null then
+ s := s + v;
+ c := c + 1.0;
+ end if;
+
+ return trim_scale(s / c);
+end
+$$ language plpgsql window;
+
+select pl_moving_avg(v) over (), v from test_table;
+
+create or replace function pl_lag_polymorphic(anyelement)
+returns anyelement as $$
+declare
+ v $0%type;
+begin
+ perform get_input_value_in_partition(windowobject, 1, -1, 'seek_current', false);
+ get pg_window_context v = PG_INPUT_VALUE;
+ return v;
+end;
+$$ language plpgsql window;
+
+select pl_lag_polymorphic(v) over (), lag(v) over () from test_table;
+
+create or replace function pl_pcontext_test(numeric)
+returns numeric as $$
+declare
+ n numeric;
+ v numeric;
+begin
+ n := get_partition_context_value(windowobject, null::numeric);
+
+ perform get_input_value_for_row(windowobject, 1);
+ get pg_window_context v = PG_INPUT_VALUE;
+
+ perform set_partition_context_value(windowobject, v);
+ return n;
+end
+$$
+language plpgsql window;
+
+select v, pl_pcontext_test(v) over () from generate_series(0.1, 1.0, 0.1) g(v);
+
+create table test_missing_values(id int, v integer);
+insert into test_missing_values values(1,10),(2,11),(3,12),(4,null),(5,null),(6,15),(7,16);
+
+create or replace function pl_pcontext_test(numeric)
+returns numeric as $$
+declare
+ n numeric;
+ v numeric;
+begin
+ perform get_input_value_for_row(windowobject, 1);
+ get pg_window_context v = PG_INPUT_VALUE;
+
+ if v is null then
+ v := get_partition_context_value(windowobject, null::numeric);
+ else
+ perform set_partition_context_value(windowobject, v);
+ end if;
+
+ return v;
+end
+$$
+language plpgsql window;
+
+select id, v, pl_pcontext_test(v) over (order by id) from test_missing_values;