Hi,

I spent some time profiling a simply query with a SUM() aggregate. We've made some big improvements in this area in recent years, but it seems there's still some room for improvement. A lot of CPU time is spent in the numeric add_var() and friends. Which isn't that surprising, I guess.

I came up with the attached patch that keeps the sum in a specialized accumulator, instead of a NumericVar. The specialized accumulator has a few tricks, compared to the status quo:

1. Uses 32-bit integers to represent each base-10000 "digit". Instead of propagating carry after each new value, it's done only every 9999 values (or at the end).

2. Accumulates positive and negative values separately. They positive and negative sums are added together only at the end. This avoids the overhead in add_var(), for figuring out which value is larger and determining the result sign at each step.

3. Only allocate a new buffer when it needs to be enlarged. add_abs() allocates a new one on every call.


These optimizations give a nice speedup for SUM(), and other aggregates like AVG() and STDDEV() that use the same agg state. For example, using the same test query that Hadi Moshayedi used on previous work on numeric aggregates (https://www.postgresql.org/message-id/CAK%3D1%3DWrmCkWc_xQXs_bpUyswCPr7O9zkLmm8Oa7_nT2vybvBEQ%40mail.gmail.com):

CREATE TABLE avg_test AS SELECT (random() * 999)::decimal(5,2) as d FROM
generate_series(1, 10000000) s;

SELECT avg(d) FROM avg_test;

On my laptop, with max_parallel_workers_per_gather=0, this runs in about 1.5 s without the patch, and 1.2 s with the patch.

- Heikki
From 1f9556d13ca05dae4092e2c4a8a0d7b444039726 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Mon, 25 Jul 2016 13:17:04 +0300
Subject: [PATCH 1/1] Speed up SUM calculation in numeric aggregates.

This introduces a numeric sum accumulator, which performs better than
repeatedly calling add_var(). The performance comes from using wider
digits and delaying carry propagation, and using separate sums for
positive and negative values, and avoiding a round of palloc/pfree on
every value. This speeds up SUM(), as well as other standard aggregates
like AVG() and STDDEV() that also calculate a sum internally.
---
 src/backend/utils/adt/numeric.c | 596 +++++++++++++++++++++++++++++++++-------
 1 file changed, 492 insertions(+), 104 deletions(-)

diff --git a/src/backend/utils/adt/numeric.c b/src/backend/utils/adt/numeric.c
index 2fbdfe0..746a4c6 100644
--- a/src/backend/utils/adt/numeric.c
+++ b/src/backend/utils/adt/numeric.c
@@ -65,7 +65,8 @@
  * and are no longer supported in any sense; no mechanism exists for the client
  * to discover the base, so every client supporting binary mode expects the
  * base-10000 format.  If you plan to change this, also note the numeric
- * abbreviation code, which assumes NBASE=10000.
+ * abbreviation code, and the numeric sum accum code, which both assume
+ * NBASE=10000.
  * ----------
  */
 
@@ -302,6 +303,51 @@ typedef struct
 	hyperLogLogState abbr_card; /* cardinality estimator */
 } NumericSortSupport;
 
+
+/* ----------
+ * Fast sum accumulator.
+ *
+ * NumericSumAccum is used to implement SUM(), and other standard aggregates
+ * that track the sum of input values. It uses 32-bit integers to store the
+ * digits, instead of the normal 16-bit integers (with NBASE=10000). This
+ * way, we can safely accumulate up to 9999 values without propagating carry,
+ * before risking overflow any of the digits. Delaying carry propagation
+ * avoids a lot of overhead. 'num_uncarried' tracks how many values have been
+ * accumulated without propagating carry.
+ *
+ * Positive and negative values are accumulated separately, in 'pos_digits'
+ * 'neg_digits'. This is simpler and faster than deciding whether to add
+ * or subtract from the current value, for each new value (see sub_var()
+ * for the logic we avoid by doing this). Both buffers are of same size,
+ * and have the same weight and scale. In accum_sum_final(), the positive
+ * and negative sums are added together to produce the final result.
+ *
+ * When a new value has a larger ndigits or weight than the accumulator
+ * currently does, the ndigits and weight of the accumulator are enlarged
+ * to accommodate the new value. We normally have one zero digit reserved
+ * for carry propagation, and that is indicated by the 'have_carry_space'
+ * flag. When accum_sum_carry() uses up the reserved digit, it clears the
+ * 'have_carry_space' flag. The next call to accum_sum_add() will enlarge
+ * the buffer, to make room for the extra digit, and set the flag again.
+ *
+ * To initialize a new accumulator, simply reset all fields to zeros.
+ *
+ * The accumulator does not handle NaNs.
+ *
+ * ----------
+ */
+typedef struct NumericSumAccum
+{
+	int			ndigits;
+	int			weight;
+	int			dscale;
+	int			num_uncarried;
+	bool		have_carry_space;
+	int32	   *pos_digits;
+	int32	   *neg_digits;
+} NumericSumAccum;
+
+
 /*
  * We define our own macros for packing and unpacking abbreviated-key
  * representations for numeric values in order to avoid depending on
@@ -490,6 +536,14 @@ static void strip_var(NumericVar *var);
 static void compute_bucket(Numeric operand, Numeric bound1, Numeric bound2,
 			   NumericVar *count_var, NumericVar *result_var);
 
+static void accum_sum_add(NumericSumAccum *accum, NumericVar *var1);
+static void accum_sum_rescale(NumericSumAccum *accum, NumericVar *val);
+static void accum_sum_carry(NumericSumAccum *accum);
+static void accum_sum_reset(NumericSumAccum *accum);
+static void accum_sum_final(NumericSumAccum *accum, NumericVar *result);
+static void accum_sum_copy(NumericSumAccum *dst, NumericSumAccum *src);
+static void accum_sum_combine(NumericSumAccum *accum, NumericSumAccum *accum2);
+
 
 /* ----------------------------------------------------------------------
  *
@@ -3144,8 +3198,8 @@ typedef struct NumericAggState
 	bool		calcSumX2;		/* if true, calculate sumX2 */
 	MemoryContext agg_context;	/* context we're calculating in */
 	int64		N;				/* count of processed numbers */
-	NumericVar	sumX;			/* sum of processed numbers */
-	NumericVar	sumX2;			/* sum of squares of processed numbers */
+	NumericSumAccum sumX;		/* sum of processed numbers */
+	NumericSumAccum sumX2;		/* sum of squares of processed numbers */
 	int			maxScale;		/* maximum scale seen so far */
 	int64		maxScaleCount;	/* number of values seen with maximum scale */
 	int64		NaNcount;		/* count of NaN values (not included in N!) */
@@ -3234,22 +3288,13 @@ do_numeric_accum(NumericAggState *state, Numeric newval)
 	/* The rest of this needs to work in the aggregate context */
 	old_context = MemoryContextSwitchTo(state->agg_context);
 
-	if (state->N++ > 0)
-	{
-		/* Accumulate sums */
-		add_var(&X, &(state->sumX), &(state->sumX));
+	state->N++;
 
-		if (state->calcSumX2)
-			add_var(&X2, &(state->sumX2), &(state->sumX2));
-	}
-	else
-	{
-		/* First input, so initialize sums */
-		set_var_from_var(&X, &(state->sumX));
+	/* Accumulate sums */
+	accum_sum_add(&(state->sumX), &X);
 
-		if (state->calcSumX2)
-			set_var_from_var(&X2, &(state->sumX2));
-	}
+	if (state->calcSumX2)
+		accum_sum_add(&(state->sumX2), &X2);
 
 	MemoryContextSwitchTo(old_context);
 }
@@ -3328,16 +3373,25 @@ do_numeric_discard(NumericAggState *state, Numeric newval)
 
 	if (state->N-- > 1)
 	{
-		/* De-accumulate sums */
-		sub_var(&(state->sumX), &X, &(state->sumX));
+		/* Negate X, to subtract it from the sum */
+		X.sign = X.sign == NUMERIC_POS ? NUMERIC_NEG : NUMERIC_POS;
+		accum_sum_add(&(state->sumX), &X);
 
 		if (state->calcSumX2)
-			sub_var(&(state->sumX2), &X2, &(state->sumX2));
+		{
+			/* Negate X^2. X^2 is always positive */
+			X2.sign = NUMERIC_NEG;
+			accum_sum_add(&(state->sumX2), &X2);
+		}
 	}
 	else
 	{
-		/* Sums will be reset by next call to do_numeric_accum */
+		/* Zero the sums */
 		Assert(state->N == 0);
+
+		accum_sum_reset(&state->sumX);
+		if (state->calcSumX2)
+			accum_sum_reset(&state->sumX2);
 	}
 
 	MemoryContextSwitchTo(old_context);
@@ -3396,11 +3450,8 @@ numeric_combine(PG_FUNCTION_ARGS)
 		state1->maxScale = state2->maxScale;
 		state1->maxScaleCount = state2->maxScaleCount;
 
-		init_var(&state1->sumX);
-		set_var_from_var(&state2->sumX, &state1->sumX);
-
-		init_var(&state1->sumX2);
-		set_var_from_var(&state2->sumX2, &state1->sumX2);
+		accum_sum_copy(&state1->sumX, &state2->sumX);
+		accum_sum_copy(&state1->sumX2, &state2->sumX2);
 
 		MemoryContextSwitchTo(old_context);
 
@@ -3428,8 +3479,8 @@ numeric_combine(PG_FUNCTION_ARGS)
 		old_context = MemoryContextSwitchTo(agg_context);
 
 		/* Accumulate sums */
-		add_var(&(state1->sumX), &(state2->sumX), &(state1->sumX));
-		add_var(&(state1->sumX2), &(state2->sumX2), &(state1->sumX2));
+		accum_sum_combine(&state1->sumX, &state2->sumX);
+		accum_sum_combine(&state1->sumX2, &state2->sumX2);
 
 		MemoryContextSwitchTo(old_context);
 	}
@@ -3487,8 +3538,7 @@ numeric_avg_combine(PG_FUNCTION_ARGS)
 		state1->maxScale = state2->maxScale;
 		state1->maxScaleCount = state2->maxScaleCount;
 
-		init_var(&state1->sumX);
-		set_var_from_var(&state2->sumX, &state1->sumX);
+		accum_sum_copy(&state1->sumX, &state2->sumX);
 
 		MemoryContextSwitchTo(old_context);
 
@@ -3516,7 +3566,7 @@ numeric_avg_combine(PG_FUNCTION_ARGS)
 		old_context = MemoryContextSwitchTo(agg_context);
 
 		/* Accumulate sums */
-		add_var(&(state1->sumX), &(state2->sumX), &(state1->sumX));
+		accum_sum_combine(&state1->sumX, &state2->sumX);
 
 		MemoryContextSwitchTo(old_context);
 	}
@@ -3536,6 +3586,7 @@ numeric_avg_serialize(PG_FUNCTION_ARGS)
 	Datum		temp;
 	bytea	   *sumX;
 	bytea	   *result;
+	NumericVar	tmp_var;
 
 	/* Ensure we disallow calling when not in aggregate context */
 	if (!AggCheckCallContext(fcinfo, NULL))
@@ -3549,9 +3600,13 @@ numeric_avg_serialize(PG_FUNCTION_ARGS)
 	 * splitting the tasks in numeric_send into separate functions to stop
 	 * this? Doing so would also remove the fmgr call overhead.
 	 */
+	init_var(&tmp_var);
+	accum_sum_final(&state->sumX, &tmp_var);
+
 	temp = DirectFunctionCall1(numeric_send,
-							   NumericGetDatum(make_result(&state->sumX)));
+							   NumericGetDatum(make_result(&tmp_var)));
 	sumX = DatumGetByteaP(temp);
+	free_var(&tmp_var);
 
 	pq_begintypsend(&buf);
 
@@ -3586,6 +3641,7 @@ numeric_avg_deserialize(PG_FUNCTION_ARGS)
 	bytea	   *sstate;
 	NumericAggState *result;
 	Datum		temp;
+	NumericVar	tmp_var;
 	StringInfoData buf;
 
 	if (!AggCheckCallContext(fcinfo, NULL))
@@ -3610,7 +3666,8 @@ numeric_avg_deserialize(PG_FUNCTION_ARGS)
 							   PointerGetDatum(&buf),
 							   InvalidOid,
 							   -1);
-	set_var_from_num(DatumGetNumeric(temp), &result->sumX);
+	init_var_from_num(DatumGetNumeric(temp), &tmp_var);
+	accum_sum_add(&(result->sumX), &tmp_var);
 
 	/* maxScale */
 	result->maxScale = pq_getmsgint(&buf, 4);
@@ -3639,6 +3696,7 @@ numeric_serialize(PG_FUNCTION_ARGS)
 	StringInfoData buf;
 	Datum		temp;
 	bytea	   *sumX;
+	NumericVar	tmp_var;
 	bytea	   *sumX2;
 	bytea	   *result;
 
@@ -3654,14 +3712,20 @@ numeric_serialize(PG_FUNCTION_ARGS)
 	 * splitting the tasks in numeric_send into separate functions to stop
 	 * this? Doing so would also remove the fmgr call overhead.
 	 */
+	init_var(&tmp_var);
+
+	accum_sum_final(&state->sumX, &tmp_var);
 	temp = DirectFunctionCall1(numeric_send,
-							   NumericGetDatum(make_result(&state->sumX)));
+							   NumericGetDatum(make_result(&tmp_var)));
 	sumX = DatumGetByteaP(temp);
 
+	accum_sum_final(&state->sumX2, &tmp_var);
 	temp = DirectFunctionCall1(numeric_send,
-							   NumericGetDatum(make_result(&state->sumX2)));
+							   NumericGetDatum(make_result(&tmp_var)));
 	sumX2 = DatumGetByteaP(temp);
 
+	free_var(&tmp_var);
+
 	pq_begintypsend(&buf);
 
 	/* N */
@@ -3698,6 +3762,8 @@ numeric_deserialize(PG_FUNCTION_ARGS)
 	bytea	   *sstate;
 	NumericAggState *result;
 	Datum		temp;
+	NumericVar	sumX_var;
+	NumericVar	sumX2_var;
 	StringInfoData buf;
 
 	if (!AggCheckCallContext(fcinfo, NULL))
@@ -3722,14 +3788,16 @@ numeric_deserialize(PG_FUNCTION_ARGS)
 							   PointerGetDatum(&buf),
 							   InvalidOid,
 							   -1);
-	set_var_from_num(DatumGetNumeric(temp), &result->sumX);
+	init_var_from_num(DatumGetNumeric(temp), &sumX_var);
+	accum_sum_add(&(result->sumX), &sumX_var);
 
 	/* sumX2 */
 	temp = DirectFunctionCall3(numeric_recv,
 							   PointerGetDatum(&buf),
 							   InvalidOid,
 							   -1);
-	set_var_from_num(DatumGetNumeric(temp), &result->sumX2);
+	init_var_from_num(DatumGetNumeric(temp), &sumX2_var);
+	accum_sum_add(&(result->sumX2), &sumX2_var);
 
 	/* maxScale */
 	result->maxScale = pq_getmsgint(&buf, 4);
@@ -3978,11 +4046,8 @@ numeric_poly_combine(PG_FUNCTION_ARGS)
 		state1->sumX = state2->sumX;
 		state1->sumX2 = state2->sumX2;
 #else
-		init_var(&(state1->sumX));
-		set_var_from_var(&(state2->sumX), &(state1->sumX));
-
-		init_var(&state1->sumX2);
-		set_var_from_var(&(state2->sumX2), &(state1->sumX2));
+		accum_sum_copy(&state2->sumX, &state1->sumX);
+		accum_sum_copy(&state2->sumX2, &state1->sumX2);
 #endif
 
 		MemoryContextSwitchTo(old_context);
@@ -4002,8 +4067,8 @@ numeric_poly_combine(PG_FUNCTION_ARGS)
 		old_context = MemoryContextSwitchTo(agg_context);
 
 		/* Accumulate sums */
-		add_var(&(state1->sumX), &(state2->sumX), &(state1->sumX));
-		add_var(&(state1->sumX2), &(state2->sumX2), &(state1->sumX2));
+		accum_sum_combine(&state1->sumX, &state2->sumX);
+		accum_sum_combine(&state1->sumX2, &state2->sumX2);
 
 		MemoryContextSwitchTo(old_context);
 #endif
@@ -4042,30 +4107,29 @@ numeric_poly_serialize(PG_FUNCTION_ARGS)
 	 */
 	{
 		Datum		temp;
-
-#ifdef HAVE_INT128
 		NumericVar	num;
 
 		init_var(&num);
+
+#ifdef HAVE_INT128
 		int128_to_numericvar(state->sumX, &num);
+#else
+		accum_sum_final(&state->sumX, &num);
+#endif
 		temp = DirectFunctionCall1(numeric_send,
 								   NumericGetDatum(make_result(&num)));
 		sumX = DatumGetByteaP(temp);
 
+#ifdef HAVE_INT128
 		int128_to_numericvar(state->sumX2, &num);
+#else
+		accum_sum_final(&state->sumX2, &num);
+#endif
 		temp = DirectFunctionCall1(numeric_send,
 								   NumericGetDatum(make_result(&num)));
 		sumX2 = DatumGetByteaP(temp);
-		free_var(&num);
-#else
-		temp = DirectFunctionCall1(numeric_send,
-								 NumericGetDatum(make_result(&state->sumX)));
-		sumX = DatumGetByteaP(temp);
 
-		temp = DirectFunctionCall1(numeric_send,
-								NumericGetDatum(make_result(&state->sumX2)));
-		sumX2 = DatumGetByteaP(temp);
-#endif
+		free_var(&num);
 	}
 
 	pq_begintypsend(&buf);
@@ -4095,7 +4159,9 @@ numeric_poly_deserialize(PG_FUNCTION_ARGS)
 	bytea	   *sstate;
 	PolyNumAggState *result;
 	Datum		sumX;
+	NumericVar	sumX_var;
 	Datum		sumX2;
+	NumericVar	sumX2_var;
 	StringInfoData buf;
 
 	if (!AggCheckCallContext(fcinfo, NULL))
@@ -4127,22 +4193,18 @@ numeric_poly_deserialize(PG_FUNCTION_ARGS)
 								InvalidOid,
 								-1);
 
+	init_var_from_num(DatumGetNumeric(sumX), &sumX_var);
 #ifdef HAVE_INT128
-	{
-		NumericVar	num;
-
-		init_var(&num);
-		set_var_from_num(DatumGetNumeric(sumX), &num);
-		numericvar_to_int128(&num, &result->sumX);
-
-		set_var_from_num(DatumGetNumeric(sumX2), &num);
-		numericvar_to_int128(&num, &result->sumX2);
+	numericvar_to_int128(&sumX_var, &result->sumX);
+#else
+	accum_sum_add(&result->sumX, &sumX_var);
+#endif
 
-		free_var(&num);
-	}
+	set_var_from_num(DatumGetNumeric(sumX2), &sumX2_var);
+#ifdef HAVE_INT128
+	numericvar_to_int128(&sumX2_var, &result->sumX2);
 #else
-	set_var_from_num(DatumGetNumeric(sumX), &result->sumX);
-	set_var_from_num(DatumGetNumeric(sumX2), &result->sumX2);
+	accum_sum_add(&result->sumX2, &sumX_var);
 #endif
 
 	pq_getmsgend(&buf);
@@ -4213,8 +4275,7 @@ int8_avg_combine(PG_FUNCTION_ARGS)
 #ifdef HAVE_INT128
 		state1->sumX = state2->sumX;
 #else
-		init_var(&state1->sumX);
-		set_var_from_var(&state2->sumX, &state1->sumX);
+		accum_sum_copy(&state1->sumX, &state2->sumX);
 #endif
 		MemoryContextSwitchTo(old_context);
 
@@ -4232,7 +4293,7 @@ int8_avg_combine(PG_FUNCTION_ARGS)
 		old_context = MemoryContextSwitchTo(agg_context);
 
 		/* Accumulate sums */
-		add_var(&(state1->sumX), &(state2->sumX), &(state1->sumX));
+		accum_sum_combine(&state1->sumX, &state2->sumX);
 
 		MemoryContextSwitchTo(old_context);
 #endif
@@ -4270,20 +4331,20 @@ int8_avg_serialize(PG_FUNCTION_ARGS)
 	 */
 	{
 		Datum		temp;
-#ifdef HAVE_INT128
 		NumericVar	num;
 
 		init_var(&num);
+
+#ifdef HAVE_INT128
 		int128_to_numericvar(state->sumX, &num);
-		temp = DirectFunctionCall1(numeric_send,
-								   NumericGetDatum(make_result(&num)));
-		free_var(&num);
-		sumX = DatumGetByteaP(temp);
 #else
+		accum_sum_final(&state->sumX, &num);
+#endif
 		temp = DirectFunctionCall1(numeric_send,
-								 NumericGetDatum(make_result(&state->sumX)));
+								   NumericGetDatum(make_result(&num)));
 		sumX = DatumGetByteaP(temp);
-#endif
+
+		free_var(&num);
 	}
 
 	pq_begintypsend(&buf);
@@ -4310,6 +4371,7 @@ int8_avg_deserialize(PG_FUNCTION_ARGS)
 	PolyNumAggState *result;
 	StringInfoData buf;
 	Datum		temp;
+	NumericVar	num;
 
 	if (!AggCheckCallContext(fcinfo, NULL))
 		elog(ERROR, "aggregate function called in non-aggregate context");
@@ -4333,18 +4395,11 @@ int8_avg_deserialize(PG_FUNCTION_ARGS)
 							   PointerGetDatum(&buf),
 							   InvalidOid,
 							   -1);
-
+	init_var_from_num(DatumGetNumeric(temp), &num);
 #ifdef HAVE_INT128
-	{
-		NumericVar	num;
-
-		init_var(&num);
-		set_var_from_num(DatumGetNumeric(temp), &num);
-		numericvar_to_int128(&num, &result->sumX);
-		free_var(&num);
-	}
+	numericvar_to_int128(&num, &result->sumX);
 #else
-	set_var_from_num(DatumGetNumeric(temp), &result->sumX);
+	accum_sum_add(&result->sumX, &num);
 #endif
 
 	pq_getmsgend(&buf);
@@ -4538,6 +4593,7 @@ numeric_avg(PG_FUNCTION_ARGS)
 	NumericAggState *state;
 	Datum		N_datum;
 	Datum		sumX_datum;
+	NumericVar	sumX_var;
 
 	state = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
 
@@ -4549,7 +4605,11 @@ numeric_avg(PG_FUNCTION_ARGS)
 		PG_RETURN_NUMERIC(make_result(&const_nan));
 
 	N_datum = DirectFunctionCall1(int8_numeric, Int64GetDatum(state->N));
-	sumX_datum = NumericGetDatum(make_result(&state->sumX));
+
+	init_var(&sumX_var);
+	accum_sum_final(&state->sumX, &sumX_var);
+	sumX_datum = NumericGetDatum(make_result(&sumX_var));
+	free_var(&sumX_var);
 
 	PG_RETURN_DATUM(DirectFunctionCall2(numeric_div, sumX_datum, N_datum));
 }
@@ -4558,6 +4618,8 @@ Datum
 numeric_sum(PG_FUNCTION_ARGS)
 {
 	NumericAggState *state;
+	NumericVar	sumX_var;
+	Numeric		result;
 
 	state = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
 
@@ -4568,7 +4630,12 @@ numeric_sum(PG_FUNCTION_ARGS)
 	if (state->NaNcount > 0)	/* there was at least one NaN input */
 		PG_RETURN_NUMERIC(make_result(&const_nan));
 
-	PG_RETURN_NUMERIC(make_result(&(state->sumX)));
+	init_var(&sumX_var);
+	accum_sum_final(&state->sumX, &sumX_var);
+	result = make_result(&sumX_var);
+	free_var(&sumX_var);
+
+	PG_RETURN_NUMERIC(result);
 }
 
 /*
@@ -4612,8 +4679,8 @@ numeric_stddev_internal(NumericAggState *state,
 	init_var(&vsumX2);
 
 	int64_to_numericvar(state->N, &vN);
-	set_var_from_var(&(state->sumX), &vsumX);
-	set_var_from_var(&(state->sumX2), &vsumX2);
+	accum_sum_final(&(state->sumX), &vsumX);
+	accum_sum_final(&(state->sumX2), &vsumX2);
 
 	/*
 	 * Sample stddev and variance are undefined when N <= 1; population stddev
@@ -4743,26 +4810,38 @@ numeric_poly_stddev_internal(Int128AggState *state,
 	NumericAggState numstate;
 	Numeric		res;
 
-	init_var(&numstate.sumX);
-	init_var(&numstate.sumX2);
-	numstate.NaNcount = 0;
-	numstate.agg_context = NULL;
+	/* Initialize an empty agg state */
+	memset(&numstate, 0, sizeof(NumericAggState));
 
 	if (state)
 	{
+		NumericVar	tmp_var;
+
 		numstate.N = state->N;
-		int128_to_numericvar(state->sumX, &numstate.sumX);
-		int128_to_numericvar(state->sumX2, &numstate.sumX2);
-	}
-	else
-	{
-		numstate.N = 0;
+
+		init_var(&tmp_var);
+
+		int128_to_numericvar(state->sumX, &tmp_var);
+		accum_sum_add(&numstate.sumX, &tmp_var);
+
+		int128_to_numericvar(state->sumX2, &tmp_var);
+		accum_sum_add(&numstate.sumX2, &tmp_var);
+
+		free_var(&tmp_var);
 	}
 
 	res = numeric_stddev_internal(&numstate, variance, sample, is_null);
 
-	free_var(&numstate.sumX);
-	free_var(&numstate.sumX2);
+	if (numstate.sumX.ndigits > 0)
+	{
+		pfree(numstate.sumX.pos_digits);
+		pfree(numstate.sumX.neg_digits);
+	}
+	if (numstate.sumX2.ndigits > 0)
+	{
+		pfree(numstate.sumX2.pos_digits);
+		pfree(numstate.sumX2.neg_digits);
+	}
 
 	return res;
 }
@@ -8699,3 +8778,312 @@ strip_var(NumericVar *var)
 	var->digits = digits;
 	var->ndigits = ndigits;
 }
+
+
+/* ----------------------------------------------------------------------
+ *
+ * Fast sum accumulator functions
+ *
+ * ----------------------------------------------------------------------
+ */
+
+/*
+ * Reset the accumulator's value to zero. The buffers to hold
+ * the digits are not free'd.
+ */
+static void
+accum_sum_reset(NumericSumAccum *accum)
+{
+	int			i;
+
+	accum->dscale = 0;
+	for (i = 0; i < accum->ndigits; i++)
+	{
+		accum->pos_digits[i] = 0;
+		accum->neg_digits[i] = 0;
+	}
+}
+
+/*
+ * Accumulate a new value.
+ */
+static void
+accum_sum_add(NumericSumAccum *accum, NumericVar *val)
+{
+	int32	   *accum_digits;
+	int			i,
+				val_i;
+	int			val_ndigits;
+	NumericDigit *val_digits;
+
+	/*
+	 * If we have accumulated too many values since the last carry
+	 * propagation, do it now, to avoid overflowing.
+	 */
+	if (accum->num_uncarried++ == 9999)
+		accum_sum_carry(accum);
+
+	/*
+	 * Adjust the weight or scale of the old value, so that it can accommodate
+	 * the new value.
+	 */
+	accum_sum_rescale(accum, val);
+
+	/* */
+	if (val->sign == NUMERIC_POS)
+		accum_digits = accum->pos_digits;
+	else
+		accum_digits = accum->neg_digits;
+
+	/* copy these values into local vars for speed in loop */
+	val_ndigits = val->ndigits;
+	val_digits = val->digits;
+
+	i = accum->weight - val->weight;
+	for (val_i = 0; val_i < val_ndigits; val_i++)
+	{
+		accum_digits[i] += (int32) val_digits[val_i];
+		i++;
+	}
+}
+
+/*
+ * Propagate carries.
+ */
+static void
+accum_sum_carry(NumericSumAccum *accum)
+{
+	int			i;
+	int			ndigits;
+	int32	   *dig;
+	int32		carry;
+	int32		newdig = 0;
+
+	/*
+	 * If no new values have been added since last carry propagation, nothing
+	 * to do.
+	 */
+	if (accum->num_uncarried == 0)
+		return;
+
+	/*
+	 * We maintain that the weight of the accumulator is always one larger
+	 * than needed to hold the current value, before carrying, to make sure
+	 * there is enough space for the possible extra digit when carry is
+	 * propagated. We cannot expand the buffer here, unless we require callers
+	 * of accum_sum_final() to switch to the right memory context.
+	 */
+	Assert(accum->pos_digits[0] == 0 && accum->neg_digits[0] == 0);
+
+	ndigits = accum->ndigits;
+
+	/* Propagate carry in the positive sum */
+	dig = accum->pos_digits;
+	carry = 0;
+	for (i = ndigits - 1; i >= 0; i--)
+	{
+		newdig = dig[i] + carry;
+		if (newdig >= NBASE)
+		{
+			carry = newdig / NBASE;
+			newdig -= carry * NBASE;
+		}
+		else
+			carry = 0;
+		dig[i] = newdig;
+	}
+	if (newdig > 0)
+		accum->have_carry_space = false;
+
+	/* And the same for the negative sum */
+	dig = accum->neg_digits;
+	carry = 0;
+	for (i = ndigits - 1; i >= 0; i--)
+	{
+		newdig = dig[i] + carry;
+		if (newdig >= NBASE)
+		{
+			carry = newdig / NBASE;
+			newdig -= carry * NBASE;
+		}
+		else
+			carry = 0;
+		dig[i] = newdig;
+	}
+	if (newdig > 0)
+		accum->have_carry_space = false;
+
+	accum->num_uncarried = 0;
+}
+
+/*
+ * Re-scale accumulator to accommodate new value.
+ *
+ * If the new value has more digits on than the current digit
+ * buffers in the accumulator, enlarge the buffers.
+ */
+static void
+accum_sum_rescale(NumericSumAccum *accum, NumericVar *val)
+{
+	int			old_weight = accum->weight;
+	int			old_ndigits = accum->ndigits;
+	int			accum_ndigits;
+	int			accum_weight;
+	int			accum_rscale;
+	int			val_rscale;
+
+	accum_weight = old_weight;
+	accum_ndigits = old_ndigits;
+
+	/*
+	 * Does the new value have a larger weight? If so, enlarge the buffers,
+	 * and shift the existing value to the new weight, by adding leading
+	 * zeros.
+	 *
+	 * We enforce that the accumulator always has a weight one larger than
+	 * needed for the inputs, so that we have space for the final
+	 * carry-propagation phase, if necessary.
+	 */
+	if (val->weight >= accum_weight)
+	{
+		accum_weight = val->weight + 1;
+		accum_ndigits = accum_ndigits + (accum_weight - old_weight);
+	}
+
+	/*
+	 * Even though the new value is small, we might've used up the space
+	 * reserved for the carry digit in the last call to accum_sum_carry(). If
+	 * so, enlarge to make room for another one.
+	 */
+	else if (!accum->have_carry_space)
+	{
+		accum_weight++;
+		accum_ndigits++;
+	}
+
+	/* Is the new value wider on the right side? */
+	accum_rscale = accum_ndigits - accum_weight - 1;
+	val_rscale = val->ndigits - val->weight - 1;
+	if (val_rscale > accum_rscale)
+		accum_ndigits = accum_ndigits + (val_rscale - accum_rscale);
+
+	if (accum_ndigits != old_ndigits ||
+		accum_weight != old_weight)
+	{
+		int32	   *new_pos_digits;
+		int32	   *new_neg_digits;
+		int			weightdiff;
+
+		weightdiff = accum_weight - old_weight;
+
+		new_pos_digits = palloc0(accum_ndigits * sizeof(int32));
+		new_neg_digits = palloc0(accum_ndigits * sizeof(int32));
+
+		if (accum->pos_digits)
+		{
+			memcpy(&new_pos_digits[weightdiff], accum->pos_digits, old_ndigits * sizeof(int32));
+			pfree(accum->pos_digits);
+
+			memcpy(&new_neg_digits[weightdiff], accum->neg_digits, old_ndigits * sizeof(int32));
+			pfree(accum->neg_digits);
+		}
+
+		accum->pos_digits = new_pos_digits;
+		accum->neg_digits = new_neg_digits;
+
+		accum->weight = accum_weight;
+		accum->ndigits = accum_ndigits;
+
+		Assert(accum->pos_digits[0] == 0 && accum->neg_digits[0] == 0);
+		accum->have_carry_space = true;
+	}
+
+	if (val->dscale > accum->dscale)
+		accum->dscale = val->dscale;
+}
+
+/*
+ * Return the current value of the accumulator. This perform final carry
+ * propagation, and adds together the positive and negative sums.
+ *
+ * Unlike all the other routines, the caller is not required to switch to
+ * the memory context that holds the accumulator.
+ */
+static void
+accum_sum_final(NumericSumAccum *accum, NumericVar *result)
+{
+	int			i;
+	NumericVar	pos_var;
+	NumericVar	neg_var;
+
+	if (accum->ndigits == 0)
+	{
+		set_var_from_var(&const_zero, result);
+		return;
+	}
+
+	accum_sum_carry(accum);
+
+	init_var(&pos_var);
+	init_var(&neg_var);
+
+	pos_var.ndigits = neg_var.ndigits = accum->ndigits;
+	pos_var.weight = neg_var.weight = accum->weight;
+	pos_var.dscale = neg_var.dscale = accum->dscale;
+	pos_var.sign = NUMERIC_POS;
+	neg_var.sign = NUMERIC_NEG;
+
+	pos_var.buf = pos_var.digits = digitbuf_alloc(accum->ndigits);
+	neg_var.buf = neg_var.digits = digitbuf_alloc(accum->ndigits);
+
+	for (i = 0; i < accum->ndigits; i++)
+	{
+		Assert(accum->pos_digits[i] < NBASE);
+		pos_var.digits[i] = (int16) accum->pos_digits[i];
+
+		Assert(accum->neg_digits[i] < NBASE);
+		neg_var.digits[i] = (int16) accum->neg_digits[i];
+	}
+
+	/* Add together the positive and negative sums */
+	add_var(&pos_var, &neg_var, result);
+
+	/* Remove leading/trailing zeroes */
+	strip_var(result);
+}
+
+/*
+ * Copy an accumulator's state.
+ *
+ * 'dst' is assumed to be uninitialized beforehand. No attempt is
+ * made at freeing old values.
+ */
+static void
+accum_sum_copy(NumericSumAccum *dst, NumericSumAccum *src)
+{
+	dst->pos_digits = palloc(src->ndigits * sizeof(int32));
+	dst->neg_digits = palloc(src->ndigits * sizeof(int32));
+
+	memcpy(dst->pos_digits, src->pos_digits, src->ndigits * sizeof(int32));
+	memcpy(dst->neg_digits, src->neg_digits, src->ndigits * sizeof(int32));
+	dst->num_uncarried = src->num_uncarried;
+	dst->ndigits = src->ndigits;
+	dst->weight = src->weight;
+	dst->dscale = src->dscale;
+}
+
+/*
+ * Add the current value of 'accum2' into 'accum'.
+ */
+static void
+accum_sum_combine(NumericSumAccum *accum, NumericSumAccum *accum2)
+{
+	NumericVar	tmp_var;
+
+	init_var(&tmp_var);
+
+	accum_sum_final(accum2, &tmp_var);
+	accum_sum_add(accum, &tmp_var);
+
+	free_var(&tmp_var);
+}
-- 
2.8.1

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to