Hi hackers,
I want to propose to you an old patch for Postgres 11, off-site developed
by Oliver Ford,
but I have permission from him to publish it and to continue it's
development,
that allow distinct aggregates, like select sum(distinct nums) within a
window function.
I have rebased it for current git master branch and have made necessary
changes to it to work with Postgres 13devel.
It's a WIP, because it doesn't have tests yet (I will add them later) and
also, it works for a int, float, and numeric types,
but probably distinct check can be rewritten for possible performance
improvement,
with storing the distinct elements in a hash table which should give a
performance improvement.
If you find the implementation of patch acceptable from committers
perspective,
I will answer to all yours design and review notes and will try to go ahead
with it,
also, I will add this patch to the March commit fest.
For example usage of a patch, if you have time series data, with current
Postgres you will get an error:
postgres=# CREATE TABLE t_demo AS
postgres-# SELECT ordinality, day, date_part('week', day) AS week
postgres-# FROM generate_series('2020-01-02', '2020-01-15', '1
day'::interval)
postgres-# WITH ORDINALITY AS day;
SELECT 14
postgres=# SELECT * FROM t_demo;
ordinality | day | week
------------+------------------------+------
1 | 2020-01-02 00:00:00+02 | 1
2 | 2020-01-03 00:00:00+02 | 1
3 | 2020-01-04 00:00:00+02 | 1
4 | 2020-01-05 00:00:00+02 | 1
5 | 2020-01-06 00:00:00+02 | 2
6 | 2020-01-07 00:00:00+02 | 2
7 | 2020-01-08 00:00:00+02 | 2
8 | 2020-01-09 00:00:00+02 | 2
9 | 2020-01-10 00:00:00+02 | 2
10 | 2020-01-11 00:00:00+02 | 2
11 | 2020-01-12 00:00:00+02 | 2
12 | 2020-01-13 00:00:00+02 | 3
13 | 2020-01-14 00:00:00+02 | 3
14 | 2020-01-15 00:00:00+02 | 3
(14 rows)
postgres=# SELECT *,
postgres-# array_agg(DISTINCT week) OVER (ORDER BY day ROWS
postgres(# BETWEEN 2 PRECEDING AND 2
FOLLOWING)
postgres-# FROM t_demo;
ERROR: DISTINCT is not implemented for window functions
LINE 2: array_agg(DISTINCT week) OVER (ORDER BY day ROWS
^
So you will need to write something like this:
postgres=# SELECT *, (SELECT array_agg(DISTINCT unnest) FROM unnest(x)) AS
b
postgres-# FROM
postgres-# (
postgres(# SELECT *,
postgres(# array_agg(week) OVER (ORDER BY day ROWS
postgres(# BETWEEN 2 PRECEDING AND 2 FOLLOWING) AS x
postgres(# FROM t_demo
postgres(# ) AS a;
ordinality | day | week | x | b
------------+------------------------+------+-------------+-------
1 | 2020-01-02 00:00:00+02 | 1 | {1,1,1} | {1}
2 | 2020-01-03 00:00:00+02 | 1 | {1,1,1,1} | {1}
3 | 2020-01-04 00:00:00+02 | 1 | {1,1,1,1,2} | {1,2}
4 | 2020-01-05 00:00:00+02 | 1 | {1,1,1,2,2} | {1,2}
5 | 2020-01-06 00:00:00+02 | 2 | {1,1,2,2,2} | {1,2}
6 | 2020-01-07 00:00:00+02 | 2 | {1,2,2,2,2} | {1,2}
7 | 2020-01-08 00:00:00+02 | 2 | {2,2,2,2,2} | {2}
8 | 2020-01-09 00:00:00+02 | 2 | {2,2,2,2,2} | {2}
9 | 2020-01-10 00:00:00+02 | 2 | {2,2,2,2,2} | {2}
10 | 2020-01-11 00:00:00+02 | 2 | {2,2,2,2,3} | {2,3}
11 | 2020-01-12 00:00:00+02 | 2 | {2,2,2,3,3} | {2,3}
12 | 2020-01-13 00:00:00+02 | 3 | {2,2,3,3,3} | {2,3}
13 | 2020-01-14 00:00:00+02 | 3 | {2,3,3,3} | {2,3}
14 | 2020-01-15 00:00:00+02 | 3 | {3,3,3} | {3}
(14 rows)
With attached version, you will get the desired results:
postgres=# SELECT *,
postgres-# array_agg(DISTINCT week) OVER (ORDER BY day ROWS
postgres(# BETWEEN 2 PRECEDING AND 2
FOLLOWING)
postgres-# FROM t_demo;
ordinality | day | week | array_agg
------------+------------------------+------+-----------
1 | 2020-01-02 00:00:00+02 | 1 | {1}
2 | 2020-01-03 00:00:00+02 | 1 | {1}
3 | 2020-01-04 00:00:00+02 | 1 | {1,2}
4 | 2020-01-05 00:00:00+02 | 1 | {1,2}
5 | 2020-01-06 00:00:00+02 | 2 | {1,2}
6 | 2020-01-07 00:00:00+02 | 2 | {1,2}
7 | 2020-01-08 00:00:00+02 | 2 | {2}
8 | 2020-01-09 00:00:00+02 | 2 | {2}
9 | 2020-01-10 00:00:00+02 | 2 | {2}
10 | 2020-01-11 00:00:00+02 | 2 | {2,3}
11 | 2020-01-12 00:00:00+02 | 2 | {2,3}
12 | 2020-01-13 00:00:00+02 | 3 | {2,3}
13 | 2020-01-14 00:00:00+02 | 3 | {2,3}
14 | 2020-01-15 00:00:00+02 | 3 | {3}
(14 rows)
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c
index 4cc7da268d..66ab18bab6 100644
--- a/src/backend/executor/nodeWindowAgg.c
+++ b/src/backend/executor/nodeWindowAgg.c
@@ -34,9 +34,14 @@
#include "postgres.h"
#include "access/htup_details.h"
+#include "access/nbtree.h"
#include "catalog/objectaccess.h"
#include "catalog/pg_aggregate.h"
+#include "catalog/pg_am.h"
#include "catalog/pg_proc.h"
+#include "catalog/pg_type.h"
+#include "commands/defrem.h"
+#include "common/int.h"
#include "executor/executor.h"
#include "executor/nodeWindowAgg.h"
#include "miscadmin.h"
@@ -156,6 +161,12 @@ typedef struct WindowStatePerAggData
/* Data local to eval_windowaggregates() */
bool restart; /* need to restart this agg in this cycle? */
+
+ FmgrInfo distinct_cmpfn;
+ int16 distinct_typlen;
+ void *distinctArr; /* Array of DISTINCT values */
+ int64 distinctSize; /* Current size of the working distinctArr */
+ int64 distinctCount; /* Current number of values in the working distinctArr */
} WindowStatePerAggData;
static void initialize_windowaggregate(WindowAggState *winstate,
@@ -209,6 +220,24 @@ initialize_windowaggregate(WindowAggState *winstate,
{
MemoryContext oldContext;
+ if (perfuncstate->wfunc->windistinct)
+ {
+ if (peraggstate->distinctSize > 0)
+ {
+ pfree(peraggstate->distinctArr);
+ peraggstate->distinctCount = 0;
+ peraggstate->distinctSize = 0;
+ }
+
+ /* If the type is a fixed length, allocate an initial array of size 16 */
+ Assert(peraggstate->distinct_typlen > 0 || peraggstate->distinct_typlen == -1);
+ if (peraggstate->distinct_typlen > 0)
+ {
+ peraggstate->distinctArr = palloc0(sizeof(int64) * 16);
+ peraggstate->distinctSize = 16;
+ }
+ }
+
/*
* If we're using a private aggcontext, we may reset it here. But if the
* context is shared, we don't know which other aggregates may still need
@@ -278,6 +307,68 @@ advance_windowaggregate(WindowAggState *winstate,
i++;
}
+ if (wfuncstate->wfunc->windistinct && !fcinfo->args[1].isnull)
+ {
+ MemoryContext tupleContext = MemoryContextSwitchTo(oldContext);
+
+ if (peraggstate->distinct_typlen > 0)
+ {
+ for (i = 0; i < peraggstate->distinctCount; i++)
+ {
+ if (DatumGetInt32(FunctionCall2(&peraggstate->distinct_cmpfn,
+ fcinfo->args[1].value,
+ *((int64 *)peraggstate->distinctArr + i))) == 0)
+ return;
+ }
+
+ if (peraggstate->distinctCount == peraggstate->distinctSize)
+ {
+ peraggstate->distinctSize *= 2;
+ peraggstate->distinctArr = (int64 *) repalloc(peraggstate->distinctArr,
+ sizeof(int64) * peraggstate->distinctSize);
+ }
+ *((int64 *)peraggstate->distinctArr + peraggstate->distinctCount) = fcinfo->args[1].value;
+ }
+ else if (peraggstate->distinct_typlen == -1)
+ {
+ uint64 len, tmp_len = 0;
+ int8 *arr = (int8 *) peraggstate->distinctArr;
+ int8 *arg = (int8 *) DatumGetPointer(fcinfo->args[1].value);
+ int64 varlen = VARSIZE_ANY(fcinfo->args[1].value);
+
+ for (i = 0; i < peraggstate->distinctCount; i++)
+ {
+ uint64 curr_len;
+ if (DatumGetInt32(FunctionCall2(&peraggstate->distinct_cmpfn,
+ fcinfo->args[1].value,
+ PointerGetDatum(arr)) == 0))
+ return;
+ curr_len = VARSIZE_ANY(arr);
+ tmp_len += curr_len;
+ arr += curr_len;
+ }
+
+ if (peraggstate->distinctCount == 0)
+ {
+ len = sh_pow2(varlen);
+ peraggstate->distinctArr = (int8 *) palloc0(len);
+ }
+ else
+ {
+ len = sh_pow2(tmp_len + varlen);
+ if (len > peraggstate->distinctSize)
+ peraggstate->distinctArr = (int8 *) repalloc(peraggstate->distinctArr,
+ len);
+ }
+ arr = (int8 *) peraggstate->distinctArr + tmp_len;
+ peraggstate->distinctSize = len;
+ for (i = 0; i < varlen; i++) *arr++ = *arg++;
+ }
+
+ peraggstate->distinctCount++;
+ MemoryContextSwitchTo(tupleContext);
+ }
+
if (peraggstate->transfn.fn_strict)
{
/*
@@ -428,6 +519,9 @@ advance_windowaggregate_base(WindowAggState *winstate,
ExprContext *econtext = winstate->tmpcontext;
ExprState *filter = wfuncstate->aggfilter;
+ if (wfuncstate->wfunc->windistinct)
+ return false;
+
oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
/* Skip anything FILTERed out */
@@ -2610,8 +2704,6 @@ ExecReScanWindowAgg(WindowAggState *node)
/*
* initialize_peragg
- *
- * Almost same as in nodeAgg.c, except we don't support DISTINCT currently.
*/
static WindowStatePerAggData *
initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
@@ -2651,6 +2743,27 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
wfunc->winfnoid);
aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
+ if (wfunc->windistinct)
+ {
+ Oid opclass,
+ opfamily,
+ cmp_func_oid;
+ HeapTuple typeTuple;
+ Form_pg_type typeform;
+
+ opclass = GetDefaultOpClass(wfunc->winaggargtype, BTREE_AM_OID);
+ opfamily = get_opclass_family(opclass);
+ cmp_func_oid = get_opfamily_proc(opfamily, wfunc->winaggargtype, wfunc->winaggargtype, BTORDER_PROC);
+ fmgr_info(cmp_func_oid, &peraggstate->distinct_cmpfn);
+
+ typeTuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(wfunc->winaggargtype));
+ if (!HeapTupleIsValid(typeTuple))
+ elog(ERROR, "cache lookup failed for type %u", wfunc->winaggargtype);
+ typeform = (Form_pg_type) GETSTRUCT(typeTuple);
+ peraggstate->distinct_typlen = typeform->typlen;
+ ReleaseSysCache(typeTuple);
+ }
+
/*
* Figure out whether we want to use the moving-aggregate implementation,
* and collect the right set of fields from the pg_attribute entry.
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 54ad62bb7f..fb8058af24 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -1493,6 +1493,8 @@ _copyWindowFunc(const WindowFunc *from)
COPY_SCALAR_FIELD(winref);
COPY_SCALAR_FIELD(winstar);
COPY_SCALAR_FIELD(winagg);
+ COPY_SCALAR_FIELD(windistinct);
+ COPY_SCALAR_FIELD(winaggargtype);
COPY_LOCATION_FIELD(location);
return newnode;
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 5b1ba143b1..24a03097a0 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -264,6 +264,8 @@ _equalWindowFunc(const WindowFunc *a, const WindowFunc *b)
COMPARE_SCALAR_FIELD(winref);
COMPARE_SCALAR_FIELD(winstar);
COMPARE_SCALAR_FIELD(winagg);
+ COMPARE_SCALAR_FIELD(windistinct);
+ COMPARE_SCALAR_FIELD(winaggargtype);
COMPARE_LOCATION_FIELD(location);
return true;
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index d76fae44b8..c40dbd0cb4 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -1162,6 +1162,8 @@ _outWindowFunc(StringInfo str, const WindowFunc *node)
WRITE_UINT_FIELD(winref);
WRITE_BOOL_FIELD(winstar);
WRITE_BOOL_FIELD(winagg);
+ WRITE_BOOL_FIELD(windistinct);
+ WRITE_OID_FIELD(winaggargtype);
WRITE_LOCATION_FIELD(location);
}
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 551ce6c41c..aaa327ce87 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -653,6 +653,8 @@ _readWindowFunc(void)
READ_UINT_FIELD(winref);
READ_BOOL_FIELD(winstar);
READ_BOOL_FIELD(winagg);
+ READ_BOOL_FIELD(windistinct);
+ READ_OID_FIELD(winaggargtype);
READ_LOCATION_FIELD(location);
READ_DONE();
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index 2d3ec22407..5cb87c7073 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -2451,6 +2451,8 @@ eval_const_expressions_mutator(Node *node,
newexpr->winref = expr->winref;
newexpr->winstar = expr->winstar;
newexpr->winagg = expr->winagg;
+ newexpr->windistinct = expr->windistinct;
+ newexpr->winaggargtype = expr->winaggargtype;
newexpr->location = expr->location;
return (Node *) newexpr;
diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c
index 9c3b6ad916..c663403bfb 100644
--- a/src/backend/parser/parse_func.c
+++ b/src/backend/parser/parse_func.c
@@ -825,13 +825,21 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs,
wfunc->location = location;
/*
- * agg_star is allowed for aggregate functions but distinct isn't
+ * In a window function, DISTINCT is allowed only for aggregate
+ * functions and is handled in nodeWindowAgg.c separately from
+ * when it occurs outside of a window function.
*/
if (agg_distinct)
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("DISTINCT is not implemented for window functions"),
- parser_errposition(pstate, location)));
+ {
+ if (!wfunc->winagg)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("DISTINCT is only allowed in aggregate window functions"),
+ parser_errposition(pstate, location)));
+ wfunc->windistinct = true;
+ agg_distinct = false;
+ wfunc->winaggargtype = actual_arg_types[0];
+ }
/*
* Reject attempt to call a parameterless aggregate without (*)
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index 116e00bce4..bf4a86b0a9 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -9464,7 +9464,11 @@ get_windowfunc_expr(WindowFunc *wfunc, deparse_context *context)
if (wfunc->winstar)
appendStringInfoChar(buf, '*');
else
+ {
+ if (wfunc->windistinct)
+ appendStringInfoString(buf, "DISTINCT ");
get_rule_expr((Node *) wfunc->args, context, true);
+ }
if (wfunc->aggfilter != NULL)
{
diff --git a/src/include/common/int.h b/src/include/common/int.h
index a2972218e7..6fb2885977 100644
--- a/src/include/common/int.h
+++ b/src/include/common/int.h
@@ -434,4 +434,24 @@ pg_mul_u64_overflow(uint64 a, uint64 b, uint64 *result)
#endif
}
+
+/* calculate ceil(log base 2) of num */
+static inline uint64
+sh_log2(uint64 num)
+{
+ int i;
+ uint64 limit;
+
+ for (i = 0, limit = 1; limit < num; i++, limit <<= 1)
+ ;
+ return i;
+}
+
+/* calculate first power of 2 >= num */
+static inline uint64
+sh_pow2(uint64 num)
+{
+ return ((uint64) 1) << sh_log2(num);
+}
+
#endif /* COMMON_INT_H */
diff --git a/src/include/lib/simplehash.h b/src/include/lib/simplehash.h
index 5a6783f653..669d97cac2 100644
--- a/src/include/lib/simplehash.h
+++ b/src/include/lib/simplehash.h
@@ -57,6 +57,8 @@
* backwards, unless they're empty or already at their optimal position.
*/
+#include "common/int.h"
+
/* helpers */
#define SH_MAKE_PREFIX(a) CppConcat(a,_)
#define SH_MAKE_NAME(name) SH_MAKE_NAME_(SH_MAKE_PREFIX(SH_PREFIX),name)
@@ -215,27 +217,6 @@ SH_SCOPE void SH_STAT(SH_TYPE * tb);
#ifndef SIMPLEHASH_H
#define SIMPLEHASH_H
-/* FIXME: can we move these to a central location? */
-
-/* calculate ceil(log base 2) of num */
-static inline uint64
-sh_log2(uint64 num)
-{
- int i;
- uint64 limit;
-
- for (i = 0, limit = 1; limit < num; i++, limit <<= 1)
- ;
- return i;
-}
-
-/* calculate first power of 2 >= num */
-static inline uint64
-sh_pow2(uint64 num)
-{
- return ((uint64) 1) << sh_log2(num);
-}
-
#ifdef FRONTEND
#define sh_error(...) pg_log_error(__VA_ARGS__)
#define sh_log(...) pg_log_info(__VA_ARGS__)
diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h
index d73be2ad46..710eea443f 100644
--- a/src/include/nodes/primnodes.h
+++ b/src/include/nodes/primnodes.h
@@ -379,6 +379,8 @@ typedef struct WindowFunc
Index winref; /* index of associated WindowClause */
bool winstar; /* true if argument list was really '*' */
bool winagg; /* is function a simple aggregate? */
+ bool windistinct; /* is function a DISTINCT aggregate? */
+ Oid winaggargtype; /* arg type of function, used for DISTINCT */
int location; /* token location, or -1 if unknown */
} WindowFunc;