On Fri, 2 Jul 2021 at 10:24, David Rowley <[email protected]> wrote:
>
> I ran this again with a few different worker counts after tuning a few
> memory settings so there was no spilling to disk and so everything was
> in RAM. Mostly so I could get consistent results.
>
> Here's the results. Average over 3 runs on each:
>
> Workers Master Patched Percent
> 8 11094.1 11084.9 100.08%
> 16 8711.4 8562.6 101.74%
> 32 6961.4 6726.3 103.50%
> 64 6137.4 5854.8 104.83%
> 128 6090.3 5747.4 105.96%
>
Thanks for testing again. Those are nice looking results, and are much
more in line with what I was seeing.
> So the gains are much less at lower worker counts. I think this is
> because most of the gains are in the serial part of the plan and with
> higher worker counts that part of the plan is relatively much bigger.
>
> So likely performance isn't too critical here, but it is something to
> keep in mind.
>
Yes, agreed. I suspect there's not much more that can be shaved off
this particular piece of code now though. Here's an update with the
last set of changes discussed.
Regards,
Dean
diff --git a/src/backend/utils/adt/numeric.c b/src/backend/utils/adt/numeric.c
new file mode 100644
index eb78f0b..a8c6bbf
--- a/src/backend/utils/adt/numeric.c
+++ b/src/backend/utils/adt/numeric.c
@@ -515,6 +515,9 @@ static void set_var_from_var(const Numer
static char *get_str_from_var(const NumericVar *var);
static char *get_str_from_var_sci(const NumericVar *var, int rscale);
+static void numericvar_serialize(StringInfo buf, const NumericVar *var);
+static void numericvar_deserialize(StringInfo buf, NumericVar *var);
+
static Numeric duplicate_numeric(Numeric num);
static Numeric make_result(const NumericVar *var);
static Numeric make_result_opt_error(const NumericVar *var, bool *error);
@@ -4943,38 +4946,25 @@ numeric_avg_serialize(PG_FUNCTION_ARGS)
{
NumericAggState *state;
StringInfoData buf;
- Datum temp;
- bytea *sumX;
bytea *result;
NumericVar tmp_var;
+ init_var(&tmp_var);
+
/* Ensure we disallow calling when not in aggregate context */
if (!AggCheckCallContext(fcinfo, NULL))
elog(ERROR, "aggregate function called in non-aggregate context");
state = (NumericAggState *) PG_GETARG_POINTER(0);
- /*
- * This is a little wasteful since make_result converts the NumericVar
- * into a Numeric and numeric_send converts it back again. Is it worth
- * splitting the tasks in numeric_send into separate functions to stop
- * this? Doing so would also remove the fmgr call overhead.
- */
- init_var(&tmp_var);
- accum_sum_final(&state->sumX, &tmp_var);
-
- temp = DirectFunctionCall1(numeric_send,
- NumericGetDatum(make_result(&tmp_var)));
- sumX = DatumGetByteaPP(temp);
- free_var(&tmp_var);
-
pq_begintypsend(&buf);
/* N */
pq_sendint64(&buf, state->N);
/* sumX */
- pq_sendbytes(&buf, VARDATA_ANY(sumX), VARSIZE_ANY_EXHDR(sumX));
+ accum_sum_final(&state->sumX, &tmp_var);
+ numericvar_serialize(&buf, &tmp_var);
/* maxScale */
pq_sendint32(&buf, state->maxScale);
@@ -4993,6 +4983,8 @@ numeric_avg_serialize(PG_FUNCTION_ARGS)
result = pq_endtypsend(&buf);
+ free_var(&tmp_var);
+
PG_RETURN_BYTEA_P(result);
}
@@ -5006,9 +4998,10 @@ numeric_avg_deserialize(PG_FUNCTION_ARGS
{
bytea *sstate;
NumericAggState *result;
- Datum temp;
- NumericVar tmp_var;
StringInfoData buf;
+ NumericVar tmp_var;
+
+ init_var(&tmp_var);
if (!AggCheckCallContext(fcinfo, NULL))
elog(ERROR, "aggregate function called in non-aggregate context");
@@ -5029,11 +5022,7 @@ numeric_avg_deserialize(PG_FUNCTION_ARGS
result->N = pq_getmsgint64(&buf);
/* sumX */
- temp = DirectFunctionCall3(numeric_recv,
- PointerGetDatum(&buf),
- ObjectIdGetDatum(InvalidOid),
- Int32GetDatum(-1));
- init_var_from_num(DatumGetNumeric(temp), &tmp_var);
+ numericvar_deserialize(&buf, &tmp_var);
accum_sum_add(&(result->sumX), &tmp_var);
/* maxScale */
@@ -5054,6 +5043,8 @@ numeric_avg_deserialize(PG_FUNCTION_ARGS
pq_getmsgend(&buf);
pfree(buf.data);
+ free_var(&tmp_var);
+
PG_RETURN_POINTER(result);
}
@@ -5067,11 +5058,10 @@ numeric_serialize(PG_FUNCTION_ARGS)
{
NumericAggState *state;
StringInfoData buf;
- Datum temp;
- bytea *sumX;
- NumericVar tmp_var;
- bytea *sumX2;
bytea *result;
+ NumericVar tmp_var;
+
+ init_var(&tmp_var);
/* Ensure we disallow calling when not in aggregate context */
if (!AggCheckCallContext(fcinfo, NULL))
@@ -5079,36 +5069,18 @@ numeric_serialize(PG_FUNCTION_ARGS)
state = (NumericAggState *) PG_GETARG_POINTER(0);
- /*
- * This is a little wasteful since make_result converts the NumericVar
- * into a Numeric and numeric_send converts it back again. Is it worth
- * splitting the tasks in numeric_send into separate functions to stop
- * this? Doing so would also remove the fmgr call overhead.
- */
- init_var(&tmp_var);
-
- accum_sum_final(&state->sumX, &tmp_var);
- temp = DirectFunctionCall1(numeric_send,
- NumericGetDatum(make_result(&tmp_var)));
- sumX = DatumGetByteaPP(temp);
-
- accum_sum_final(&state->sumX2, &tmp_var);
- temp = DirectFunctionCall1(numeric_send,
- NumericGetDatum(make_result(&tmp_var)));
- sumX2 = DatumGetByteaPP(temp);
-
- free_var(&tmp_var);
-
pq_begintypsend(&buf);
/* N */
pq_sendint64(&buf, state->N);
/* sumX */
- pq_sendbytes(&buf, VARDATA_ANY(sumX), VARSIZE_ANY_EXHDR(sumX));
+ accum_sum_final(&state->sumX, &tmp_var);
+ numericvar_serialize(&buf, &tmp_var);
/* sumX2 */
- pq_sendbytes(&buf, VARDATA_ANY(sumX2), VARSIZE_ANY_EXHDR(sumX2));
+ accum_sum_final(&state->sumX2, &tmp_var);
+ numericvar_serialize(&buf, &tmp_var);
/* maxScale */
pq_sendint32(&buf, state->maxScale);
@@ -5127,6 +5099,8 @@ numeric_serialize(PG_FUNCTION_ARGS)
result = pq_endtypsend(&buf);
+ free_var(&tmp_var);
+
PG_RETURN_BYTEA_P(result);
}
@@ -5140,10 +5114,10 @@ numeric_deserialize(PG_FUNCTION_ARGS)
{
bytea *sstate;
NumericAggState *result;
- Datum temp;
- NumericVar sumX_var;
- NumericVar sumX2_var;
StringInfoData buf;
+ NumericVar tmp_var;
+
+ init_var(&tmp_var);
if (!AggCheckCallContext(fcinfo, NULL))
elog(ERROR, "aggregate function called in non-aggregate context");
@@ -5164,20 +5138,12 @@ numeric_deserialize(PG_FUNCTION_ARGS)
result->N = pq_getmsgint64(&buf);
/* sumX */
- temp = DirectFunctionCall3(numeric_recv,
- PointerGetDatum(&buf),
- ObjectIdGetDatum(InvalidOid),
- Int32GetDatum(-1));
- init_var_from_num(DatumGetNumeric(temp), &sumX_var);
- accum_sum_add(&(result->sumX), &sumX_var);
+ numericvar_deserialize(&buf, &tmp_var);
+ accum_sum_add(&(result->sumX), &tmp_var);
/* sumX2 */
- temp = DirectFunctionCall3(numeric_recv,
- PointerGetDatum(&buf),
- ObjectIdGetDatum(InvalidOid),
- Int32GetDatum(-1));
- init_var_from_num(DatumGetNumeric(temp), &sumX2_var);
- accum_sum_add(&(result->sumX2), &sumX2_var);
+ numericvar_deserialize(&buf, &tmp_var);
+ accum_sum_add(&(result->sumX2), &tmp_var);
/* maxScale */
result->maxScale = pq_getmsgint(&buf, 4);
@@ -5197,6 +5163,8 @@ numeric_deserialize(PG_FUNCTION_ARGS)
pq_getmsgend(&buf);
pfree(buf.data);
+ free_var(&tmp_var);
+
PG_RETURN_POINTER(result);
}
@@ -5459,9 +5427,10 @@ numeric_poly_serialize(PG_FUNCTION_ARGS)
{
PolyNumAggState *state;
StringInfoData buf;
- bytea *sumX;
- bytea *sumX2;
bytea *result;
+ NumericVar tmp_var;
+
+ init_var(&tmp_var);
/* Ensure we disallow calling when not in aggregate context */
if (!AggCheckCallContext(fcinfo, NULL))
@@ -5477,32 +5446,6 @@ numeric_poly_serialize(PG_FUNCTION_ARGS)
* day we might like to send these over to another server for further
* processing and we want a standard format to work with.
*/
- {
- Datum temp;
- NumericVar num;
-
- init_var(&num);
-
-#ifdef HAVE_INT128
- int128_to_numericvar(state->sumX, &num);
-#else
- accum_sum_final(&state->sumX, &num);
-#endif
- temp = DirectFunctionCall1(numeric_send,
- NumericGetDatum(make_result(&num)));
- sumX = DatumGetByteaPP(temp);
-
-#ifdef HAVE_INT128
- int128_to_numericvar(state->sumX2, &num);
-#else
- accum_sum_final(&state->sumX2, &num);
-#endif
- temp = DirectFunctionCall1(numeric_send,
- NumericGetDatum(make_result(&num)));
- sumX2 = DatumGetByteaPP(temp);
-
- free_var(&num);
- }
pq_begintypsend(&buf);
@@ -5510,13 +5453,25 @@ numeric_poly_serialize(PG_FUNCTION_ARGS)
pq_sendint64(&buf, state->N);
/* sumX */
- pq_sendbytes(&buf, VARDATA_ANY(sumX), VARSIZE_ANY_EXHDR(sumX));
+#ifdef HAVE_INT128
+ int128_to_numericvar(state->sumX, &tmp_var);
+#else
+ accum_sum_final(&state->sumX, &tmp_var);
+#endif
+ numericvar_serialize(&buf, &tmp_var);
/* sumX2 */
- pq_sendbytes(&buf, VARDATA_ANY(sumX2), VARSIZE_ANY_EXHDR(sumX2));
+#ifdef HAVE_INT128
+ int128_to_numericvar(state->sumX2, &tmp_var);
+#else
+ accum_sum_final(&state->sumX2, &tmp_var);
+#endif
+ numericvar_serialize(&buf, &tmp_var);
result = pq_endtypsend(&buf);
+ free_var(&tmp_var);
+
PG_RETURN_BYTEA_P(result);
}
@@ -5530,11 +5485,10 @@ numeric_poly_deserialize(PG_FUNCTION_ARG
{
bytea *sstate;
PolyNumAggState *result;
- Datum sumX;
- NumericVar sumX_var;
- Datum sumX2;
- NumericVar sumX2_var;
StringInfoData buf;
+ NumericVar tmp_var;
+
+ init_var(&tmp_var);
if (!AggCheckCallContext(fcinfo, NULL))
elog(ERROR, "aggregate function called in non-aggregate context");
@@ -5555,34 +5509,26 @@ numeric_poly_deserialize(PG_FUNCTION_ARG
result->N = pq_getmsgint64(&buf);
/* sumX */
- sumX = DirectFunctionCall3(numeric_recv,
- PointerGetDatum(&buf),
- ObjectIdGetDatum(InvalidOid),
- Int32GetDatum(-1));
-
- /* sumX2 */
- sumX2 = DirectFunctionCall3(numeric_recv,
- PointerGetDatum(&buf),
- ObjectIdGetDatum(InvalidOid),
- Int32GetDatum(-1));
-
- init_var_from_num(DatumGetNumeric(sumX), &sumX_var);
+ numericvar_deserialize(&buf, &tmp_var);
#ifdef HAVE_INT128
- numericvar_to_int128(&sumX_var, &result->sumX);
+ numericvar_to_int128(&tmp_var, &result->sumX);
#else
- accum_sum_add(&result->sumX, &sumX_var);
+ accum_sum_add(&result->sumX, &tmp_var);
#endif
- init_var_from_num(DatumGetNumeric(sumX2), &sumX2_var);
+ /* sumX2 */
+ numericvar_deserialize(&buf, &tmp_var);
#ifdef HAVE_INT128
- numericvar_to_int128(&sumX2_var, &result->sumX2);
+ numericvar_to_int128(&tmp_var, &result->sumX2);
#else
- accum_sum_add(&result->sumX2, &sumX2_var);
+ accum_sum_add(&result->sumX2, &tmp_var);
#endif
pq_getmsgend(&buf);
pfree(buf.data);
+ free_var(&tmp_var);
+
PG_RETURN_POINTER(result);
}
@@ -5681,8 +5627,10 @@ int8_avg_serialize(PG_FUNCTION_ARGS)
{
PolyNumAggState *state;
StringInfoData buf;
- bytea *sumX;
bytea *result;
+ NumericVar tmp_var;
+
+ init_var(&tmp_var);
/* Ensure we disallow calling when not in aggregate context */
if (!AggCheckCallContext(fcinfo, NULL))
@@ -5698,23 +5646,6 @@ int8_avg_serialize(PG_FUNCTION_ARGS)
* like to send these over to another server for further processing and we
* want a standard format to work with.
*/
- {
- Datum temp;
- NumericVar num;
-
- init_var(&num);
-
-#ifdef HAVE_INT128
- int128_to_numericvar(state->sumX, &num);
-#else
- accum_sum_final(&state->sumX, &num);
-#endif
- temp = DirectFunctionCall1(numeric_send,
- NumericGetDatum(make_result(&num)));
- sumX = DatumGetByteaPP(temp);
-
- free_var(&num);
- }
pq_begintypsend(&buf);
@@ -5722,10 +5653,17 @@ int8_avg_serialize(PG_FUNCTION_ARGS)
pq_sendint64(&buf, state->N);
/* sumX */
- pq_sendbytes(&buf, VARDATA_ANY(sumX), VARSIZE_ANY_EXHDR(sumX));
+#ifdef HAVE_INT128
+ int128_to_numericvar(state->sumX, &tmp_var);
+#else
+ accum_sum_final(&state->sumX, &tmp_var);
+#endif
+ numericvar_serialize(&buf, &tmp_var);
result = pq_endtypsend(&buf);
+ free_var(&tmp_var);
+
PG_RETURN_BYTEA_P(result);
}
@@ -5739,8 +5677,9 @@ int8_avg_deserialize(PG_FUNCTION_ARGS)
bytea *sstate;
PolyNumAggState *result;
StringInfoData buf;
- Datum temp;
- NumericVar num;
+ NumericVar tmp_var;
+
+ init_var(&tmp_var);
if (!AggCheckCallContext(fcinfo, NULL))
elog(ERROR, "aggregate function called in non-aggregate context");
@@ -5761,20 +5700,18 @@ int8_avg_deserialize(PG_FUNCTION_ARGS)
result->N = pq_getmsgint64(&buf);
/* sumX */
- temp = DirectFunctionCall3(numeric_recv,
- PointerGetDatum(&buf),
- ObjectIdGetDatum(InvalidOid),
- Int32GetDatum(-1));
- init_var_from_num(DatumGetNumeric(temp), &num);
+ numericvar_deserialize(&buf, &tmp_var);
#ifdef HAVE_INT128
- numericvar_to_int128(&num, &result->sumX);
+ numericvar_to_int128(&tmp_var, &result->sumX);
#else
- accum_sum_add(&result->sumX, &num);
+ accum_sum_add(&result->sumX, &tmp_var);
#endif
pq_getmsgend(&buf);
pfree(buf.data);
+ free_var(&tmp_var);
+
PG_RETURN_POINTER(result);
}
@@ -7286,6 +7223,48 @@ get_str_from_var_sci(const NumericVar *v
}
+/*
+ * numericvar_serialize - serialize NumericVar to binary format
+ *
+ * At variable level, no checks are performed on the weight or dscale, allowing
+ * us to pass around intermediate values with higher precision than supported
+ * by the numeric type. Note: this is incompatible with numeric_send/recv(),
+ * which use 16-bit integers for these fields.
+ */
+static void
+numericvar_serialize(StringInfo buf, const NumericVar *var)
+{
+ int i;
+
+ pq_sendint32(buf, var->ndigits);
+ pq_sendint32(buf, var->weight);
+ pq_sendint32(buf, var->sign);
+ pq_sendint32(buf, var->dscale);
+ for (i = 0; i < var->ndigits; i++)
+ pq_sendint16(buf, var->digits[i]);
+}
+
+/*
+ * numericvar_deserialize - deserialize binary format to NumericVar
+ */
+static void
+numericvar_deserialize(StringInfo buf, NumericVar *var)
+{
+ int len,
+ i;
+
+ len = pq_getmsgint(buf, sizeof(int32));
+
+ alloc_var(var, len); /* sets var->ndigits */
+
+ var->weight = pq_getmsgint(buf, sizeof(int32));
+ var->sign = pq_getmsgint(buf, sizeof(int32));
+ var->dscale = pq_getmsgint(buf, sizeof(int32));
+ for (i = 0; i < len; i++)
+ var->digits[i] = pq_getmsgint(buf, sizeof(int16));
+}
+
+
/*
* duplicate_numeric() - copy a packed-format Numeric
*
diff --git a/src/test/regress/expected/numeric.out b/src/test/regress/expected/numeric.out
new file mode 100644
index 30a5642..4ad4851
--- a/src/test/regress/expected/numeric.out
+++ b/src/test/regress/expected/numeric.out
@@ -2967,6 +2967,56 @@ SELECT SUM((-9999)::numeric) FROM genera
(1 row)
--
+-- Tests for VARIANCE()
+--
+CREATE TABLE num_variance (a numeric);
+INSERT INTO num_variance VALUES (0);
+INSERT INTO num_variance VALUES (3e-500);
+INSERT INTO num_variance VALUES (-3e-500);
+INSERT INTO num_variance VALUES (4e-500 - 1e-16383);
+INSERT INTO num_variance VALUES (-4e-500 + 1e-16383);
+-- variance is just under 12.5e-1000 and so should round down to 12e-1000
+SELECT trim_scale(variance(a) * 1e1000) FROM num_variance;
+ trim_scale
+------------
+ 12
+(1 row)
+
+-- check that parallel execution produces the same result
+BEGIN;
+ALTER TABLE num_variance SET (parallel_workers = 4);
+SET LOCAL parallel_setup_cost = 0;
+SET LOCAL max_parallel_workers_per_gather = 4;
+SELECT trim_scale(variance(a) * 1e1000) FROM num_variance;
+ trim_scale
+------------
+ 12
+(1 row)
+
+ROLLBACK;
+-- case where sum of squares would overflow but variance does not
+DELETE FROM num_variance;
+INSERT INTO num_variance SELECT 9e131071 + x FROM generate_series(1, 5) x;
+SELECT variance(a) FROM num_variance;
+ variance
+--------------------
+ 2.5000000000000000
+(1 row)
+
+-- check that parallel execution produces the same result
+BEGIN;
+ALTER TABLE num_variance SET (parallel_workers = 4);
+SET LOCAL parallel_setup_cost = 0;
+SET LOCAL max_parallel_workers_per_gather = 4;
+SELECT variance(a) FROM num_variance;
+ variance
+--------------------
+ 2.5000000000000000
+(1 row)
+
+ROLLBACK;
+DROP TABLE num_variance;
+--
-- Tests for GCD()
--
SELECT a, b, gcd(a, b), gcd(a, -b), gcd(-b, a), gcd(-b, -a)
diff --git a/src/test/regress/sql/numeric.sql b/src/test/regress/sql/numeric.sql
new file mode 100644
index db812c8..3784c52
--- a/src/test/regress/sql/numeric.sql
+++ b/src/test/regress/sql/numeric.sql
@@ -1278,6 +1278,42 @@ SELECT SUM(9999::numeric) FROM generate_
SELECT SUM((-9999)::numeric) FROM generate_series(1, 100000);
--
+-- Tests for VARIANCE()
+--
+CREATE TABLE num_variance (a numeric);
+INSERT INTO num_variance VALUES (0);
+INSERT INTO num_variance VALUES (3e-500);
+INSERT INTO num_variance VALUES (-3e-500);
+INSERT INTO num_variance VALUES (4e-500 - 1e-16383);
+INSERT INTO num_variance VALUES (-4e-500 + 1e-16383);
+
+-- variance is just under 12.5e-1000 and so should round down to 12e-1000
+SELECT trim_scale(variance(a) * 1e1000) FROM num_variance;
+
+-- check that parallel execution produces the same result
+BEGIN;
+ALTER TABLE num_variance SET (parallel_workers = 4);
+SET LOCAL parallel_setup_cost = 0;
+SET LOCAL max_parallel_workers_per_gather = 4;
+SELECT trim_scale(variance(a) * 1e1000) FROM num_variance;
+ROLLBACK;
+
+-- case where sum of squares would overflow but variance does not
+DELETE FROM num_variance;
+INSERT INTO num_variance SELECT 9e131071 + x FROM generate_series(1, 5) x;
+SELECT variance(a) FROM num_variance;
+
+-- check that parallel execution produces the same result
+BEGIN;
+ALTER TABLE num_variance SET (parallel_workers = 4);
+SET LOCAL parallel_setup_cost = 0;
+SET LOCAL max_parallel_workers_per_gather = 4;
+SELECT variance(a) FROM num_variance;
+ROLLBACK;
+
+DROP TABLE num_variance;
+
+--
-- Tests for GCD()
--
SELECT a, b, gcd(a, b), gcd(a, -b), gcd(-b, a), gcd(-b, -a)