Hi,
I spent some time profiling a simply query with a SUM() aggregate. We've
made some big improvements in this area in recent years, but it seems
there's still some room for improvement. A lot of CPU time is spent in
the numeric add_var() and friends. Which isn't that surprising, I guess.
I came up with the attached patch that keeps the sum in a specialized
accumulator, instead of a NumericVar. The specialized accumulator has a
few tricks, compared to the status quo:
1. Uses 32-bit integers to represent each base-10000 "digit". Instead of
propagating carry after each new value, it's done only every 9999 values
(or at the end).
2. Accumulates positive and negative values separately. They positive
and negative sums are added together only at the end. This avoids the
overhead in add_var(), for figuring out which value is larger and
determining the result sign at each step.
3. Only allocate a new buffer when it needs to be enlarged. add_abs()
allocates a new one on every call.
These optimizations give a nice speedup for SUM(), and other aggregates
like AVG() and STDDEV() that use the same agg state. For example, using
the same test query that Hadi Moshayedi used on previous work on numeric
aggregates
(https://www.postgresql.org/message-id/CAK%3D1%3DWrmCkWc_xQXs_bpUyswCPr7O9zkLmm8Oa7_nT2vybvBEQ%40mail.gmail.com):
CREATE TABLE avg_test AS SELECT (random() * 999)::decimal(5,2) as d FROM
generate_series(1, 10000000) s;
SELECT avg(d) FROM avg_test;
On my laptop, with max_parallel_workers_per_gather=0, this runs in about
1.5 s without the patch, and 1.2 s with the patch.
- Heikki
From 1f9556d13ca05dae4092e2c4a8a0d7b444039726 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Mon, 25 Jul 2016 13:17:04 +0300
Subject: [PATCH 1/1] Speed up SUM calculation in numeric aggregates.
This introduces a numeric sum accumulator, which performs better than
repeatedly calling add_var(). The performance comes from using wider
digits and delaying carry propagation, and using separate sums for
positive and negative values, and avoiding a round of palloc/pfree on
every value. This speeds up SUM(), as well as other standard aggregates
like AVG() and STDDEV() that also calculate a sum internally.
---
src/backend/utils/adt/numeric.c | 596 +++++++++++++++++++++++++++++++++-------
1 file changed, 492 insertions(+), 104 deletions(-)
diff --git a/src/backend/utils/adt/numeric.c b/src/backend/utils/adt/numeric.c
index 2fbdfe0..746a4c6 100644
--- a/src/backend/utils/adt/numeric.c
+++ b/src/backend/utils/adt/numeric.c
@@ -65,7 +65,8 @@
* and are no longer supported in any sense; no mechanism exists for the client
* to discover the base, so every client supporting binary mode expects the
* base-10000 format. If you plan to change this, also note the numeric
- * abbreviation code, which assumes NBASE=10000.
+ * abbreviation code, and the numeric sum accum code, which both assume
+ * NBASE=10000.
* ----------
*/
@@ -302,6 +303,51 @@ typedef struct
hyperLogLogState abbr_card; /* cardinality estimator */
} NumericSortSupport;
+
+/* ----------
+ * Fast sum accumulator.
+ *
+ * NumericSumAccum is used to implement SUM(), and other standard aggregates
+ * that track the sum of input values. It uses 32-bit integers to store the
+ * digits, instead of the normal 16-bit integers (with NBASE=10000). This
+ * way, we can safely accumulate up to 9999 values without propagating carry,
+ * before risking overflow any of the digits. Delaying carry propagation
+ * avoids a lot of overhead. 'num_uncarried' tracks how many values have been
+ * accumulated without propagating carry.
+ *
+ * Positive and negative values are accumulated separately, in 'pos_digits'
+ * 'neg_digits'. This is simpler and faster than deciding whether to add
+ * or subtract from the current value, for each new value (see sub_var()
+ * for the logic we avoid by doing this). Both buffers are of same size,
+ * and have the same weight and scale. In accum_sum_final(), the positive
+ * and negative sums are added together to produce the final result.
+ *
+ * When a new value has a larger ndigits or weight than the accumulator
+ * currently does, the ndigits and weight of the accumulator are enlarged
+ * to accommodate the new value. We normally have one zero digit reserved
+ * for carry propagation, and that is indicated by the 'have_carry_space'
+ * flag. When accum_sum_carry() uses up the reserved digit, it clears the
+ * 'have_carry_space' flag. The next call to accum_sum_add() will enlarge
+ * the buffer, to make room for the extra digit, and set the flag again.
+ *
+ * To initialize a new accumulator, simply reset all fields to zeros.
+ *
+ * The accumulator does not handle NaNs.
+ *
+ * ----------
+ */
+typedef struct NumericSumAccum
+{
+ int ndigits;
+ int weight;
+ int dscale;
+ int num_uncarried;
+ bool have_carry_space;
+ int32 *pos_digits;
+ int32 *neg_digits;
+} NumericSumAccum;
+
+
/*
* We define our own macros for packing and unpacking abbreviated-key
* representations for numeric values in order to avoid depending on
@@ -490,6 +536,14 @@ static void strip_var(NumericVar *var);
static void compute_bucket(Numeric operand, Numeric bound1, Numeric bound2,
NumericVar *count_var, NumericVar *result_var);
+static void accum_sum_add(NumericSumAccum *accum, NumericVar *var1);
+static void accum_sum_rescale(NumericSumAccum *accum, NumericVar *val);
+static void accum_sum_carry(NumericSumAccum *accum);
+static void accum_sum_reset(NumericSumAccum *accum);
+static void accum_sum_final(NumericSumAccum *accum, NumericVar *result);
+static void accum_sum_copy(NumericSumAccum *dst, NumericSumAccum *src);
+static void accum_sum_combine(NumericSumAccum *accum, NumericSumAccum *accum2);
+
/* ----------------------------------------------------------------------
*
@@ -3144,8 +3198,8 @@ typedef struct NumericAggState
bool calcSumX2; /* if true, calculate sumX2 */
MemoryContext agg_context; /* context we're calculating in */
int64 N; /* count of processed numbers */
- NumericVar sumX; /* sum of processed numbers */
- NumericVar sumX2; /* sum of squares of processed numbers */
+ NumericSumAccum sumX; /* sum of processed numbers */
+ NumericSumAccum sumX2; /* sum of squares of processed numbers */
int maxScale; /* maximum scale seen so far */
int64 maxScaleCount; /* number of values seen with maximum scale */
int64 NaNcount; /* count of NaN values (not included in N!) */
@@ -3234,22 +3288,13 @@ do_numeric_accum(NumericAggState *state, Numeric newval)
/* The rest of this needs to work in the aggregate context */
old_context = MemoryContextSwitchTo(state->agg_context);
- if (state->N++ > 0)
- {
- /* Accumulate sums */
- add_var(&X, &(state->sumX), &(state->sumX));
+ state->N++;
- if (state->calcSumX2)
- add_var(&X2, &(state->sumX2), &(state->sumX2));
- }
- else
- {
- /* First input, so initialize sums */
- set_var_from_var(&X, &(state->sumX));
+ /* Accumulate sums */
+ accum_sum_add(&(state->sumX), &X);
- if (state->calcSumX2)
- set_var_from_var(&X2, &(state->sumX2));
- }
+ if (state->calcSumX2)
+ accum_sum_add(&(state->sumX2), &X2);
MemoryContextSwitchTo(old_context);
}
@@ -3328,16 +3373,25 @@ do_numeric_discard(NumericAggState *state, Numeric newval)
if (state->N-- > 1)
{
- /* De-accumulate sums */
- sub_var(&(state->sumX), &X, &(state->sumX));
+ /* Negate X, to subtract it from the sum */
+ X.sign = X.sign == NUMERIC_POS ? NUMERIC_NEG : NUMERIC_POS;
+ accum_sum_add(&(state->sumX), &X);
if (state->calcSumX2)
- sub_var(&(state->sumX2), &X2, &(state->sumX2));
+ {
+ /* Negate X^2. X^2 is always positive */
+ X2.sign = NUMERIC_NEG;
+ accum_sum_add(&(state->sumX2), &X2);
+ }
}
else
{
- /* Sums will be reset by next call to do_numeric_accum */
+ /* Zero the sums */
Assert(state->N == 0);
+
+ accum_sum_reset(&state->sumX);
+ if (state->calcSumX2)
+ accum_sum_reset(&state->sumX2);
}
MemoryContextSwitchTo(old_context);
@@ -3396,11 +3450,8 @@ numeric_combine(PG_FUNCTION_ARGS)
state1->maxScale = state2->maxScale;
state1->maxScaleCount = state2->maxScaleCount;
- init_var(&state1->sumX);
- set_var_from_var(&state2->sumX, &state1->sumX);
-
- init_var(&state1->sumX2);
- set_var_from_var(&state2->sumX2, &state1->sumX2);
+ accum_sum_copy(&state1->sumX, &state2->sumX);
+ accum_sum_copy(&state1->sumX2, &state2->sumX2);
MemoryContextSwitchTo(old_context);
@@ -3428,8 +3479,8 @@ numeric_combine(PG_FUNCTION_ARGS)
old_context = MemoryContextSwitchTo(agg_context);
/* Accumulate sums */
- add_var(&(state1->sumX), &(state2->sumX), &(state1->sumX));
- add_var(&(state1->sumX2), &(state2->sumX2), &(state1->sumX2));
+ accum_sum_combine(&state1->sumX, &state2->sumX);
+ accum_sum_combine(&state1->sumX2, &state2->sumX2);
MemoryContextSwitchTo(old_context);
}
@@ -3487,8 +3538,7 @@ numeric_avg_combine(PG_FUNCTION_ARGS)
state1->maxScale = state2->maxScale;
state1->maxScaleCount = state2->maxScaleCount;
- init_var(&state1->sumX);
- set_var_from_var(&state2->sumX, &state1->sumX);
+ accum_sum_copy(&state1->sumX, &state2->sumX);
MemoryContextSwitchTo(old_context);
@@ -3516,7 +3566,7 @@ numeric_avg_combine(PG_FUNCTION_ARGS)
old_context = MemoryContextSwitchTo(agg_context);
/* Accumulate sums */
- add_var(&(state1->sumX), &(state2->sumX), &(state1->sumX));
+ accum_sum_combine(&state1->sumX, &state2->sumX);
MemoryContextSwitchTo(old_context);
}
@@ -3536,6 +3586,7 @@ numeric_avg_serialize(PG_FUNCTION_ARGS)
Datum temp;
bytea *sumX;
bytea *result;
+ NumericVar tmp_var;
/* Ensure we disallow calling when not in aggregate context */
if (!AggCheckCallContext(fcinfo, NULL))
@@ -3549,9 +3600,13 @@ numeric_avg_serialize(PG_FUNCTION_ARGS)
* splitting the tasks in numeric_send into separate functions to stop
* this? Doing so would also remove the fmgr call overhead.
*/
+ init_var(&tmp_var);
+ accum_sum_final(&state->sumX, &tmp_var);
+
temp = DirectFunctionCall1(numeric_send,
- NumericGetDatum(make_result(&state->sumX)));
+ NumericGetDatum(make_result(&tmp_var)));
sumX = DatumGetByteaP(temp);
+ free_var(&tmp_var);
pq_begintypsend(&buf);
@@ -3586,6 +3641,7 @@ numeric_avg_deserialize(PG_FUNCTION_ARGS)
bytea *sstate;
NumericAggState *result;
Datum temp;
+ NumericVar tmp_var;
StringInfoData buf;
if (!AggCheckCallContext(fcinfo, NULL))
@@ -3610,7 +3666,8 @@ numeric_avg_deserialize(PG_FUNCTION_ARGS)
PointerGetDatum(&buf),
InvalidOid,
-1);
- set_var_from_num(DatumGetNumeric(temp), &result->sumX);
+ init_var_from_num(DatumGetNumeric(temp), &tmp_var);
+ accum_sum_add(&(result->sumX), &tmp_var);
/* maxScale */
result->maxScale = pq_getmsgint(&buf, 4);
@@ -3639,6 +3696,7 @@ numeric_serialize(PG_FUNCTION_ARGS)
StringInfoData buf;
Datum temp;
bytea *sumX;
+ NumericVar tmp_var;
bytea *sumX2;
bytea *result;
@@ -3654,14 +3712,20 @@ numeric_serialize(PG_FUNCTION_ARGS)
* splitting the tasks in numeric_send into separate functions to stop
* this? Doing so would also remove the fmgr call overhead.
*/
+ init_var(&tmp_var);
+
+ accum_sum_final(&state->sumX, &tmp_var);
temp = DirectFunctionCall1(numeric_send,
- NumericGetDatum(make_result(&state->sumX)));
+ NumericGetDatum(make_result(&tmp_var)));
sumX = DatumGetByteaP(temp);
+ accum_sum_final(&state->sumX2, &tmp_var);
temp = DirectFunctionCall1(numeric_send,
- NumericGetDatum(make_result(&state->sumX2)));
+ NumericGetDatum(make_result(&tmp_var)));
sumX2 = DatumGetByteaP(temp);
+ free_var(&tmp_var);
+
pq_begintypsend(&buf);
/* N */
@@ -3698,6 +3762,8 @@ numeric_deserialize(PG_FUNCTION_ARGS)
bytea *sstate;
NumericAggState *result;
Datum temp;
+ NumericVar sumX_var;
+ NumericVar sumX2_var;
StringInfoData buf;
if (!AggCheckCallContext(fcinfo, NULL))
@@ -3722,14 +3788,16 @@ numeric_deserialize(PG_FUNCTION_ARGS)
PointerGetDatum(&buf),
InvalidOid,
-1);
- set_var_from_num(DatumGetNumeric(temp), &result->sumX);
+ init_var_from_num(DatumGetNumeric(temp), &sumX_var);
+ accum_sum_add(&(result->sumX), &sumX_var);
/* sumX2 */
temp = DirectFunctionCall3(numeric_recv,
PointerGetDatum(&buf),
InvalidOid,
-1);
- set_var_from_num(DatumGetNumeric(temp), &result->sumX2);
+ init_var_from_num(DatumGetNumeric(temp), &sumX2_var);
+ accum_sum_add(&(result->sumX2), &sumX2_var);
/* maxScale */
result->maxScale = pq_getmsgint(&buf, 4);
@@ -3978,11 +4046,8 @@ numeric_poly_combine(PG_FUNCTION_ARGS)
state1->sumX = state2->sumX;
state1->sumX2 = state2->sumX2;
#else
- init_var(&(state1->sumX));
- set_var_from_var(&(state2->sumX), &(state1->sumX));
-
- init_var(&state1->sumX2);
- set_var_from_var(&(state2->sumX2), &(state1->sumX2));
+ accum_sum_copy(&state2->sumX, &state1->sumX);
+ accum_sum_copy(&state2->sumX2, &state1->sumX2);
#endif
MemoryContextSwitchTo(old_context);
@@ -4002,8 +4067,8 @@ numeric_poly_combine(PG_FUNCTION_ARGS)
old_context = MemoryContextSwitchTo(agg_context);
/* Accumulate sums */
- add_var(&(state1->sumX), &(state2->sumX), &(state1->sumX));
- add_var(&(state1->sumX2), &(state2->sumX2), &(state1->sumX2));
+ accum_sum_combine(&state1->sumX, &state2->sumX);
+ accum_sum_combine(&state1->sumX2, &state2->sumX2);
MemoryContextSwitchTo(old_context);
#endif
@@ -4042,30 +4107,29 @@ numeric_poly_serialize(PG_FUNCTION_ARGS)
*/
{
Datum temp;
-
-#ifdef HAVE_INT128
NumericVar num;
init_var(&num);
+
+#ifdef HAVE_INT128
int128_to_numericvar(state->sumX, &num);
+#else
+ accum_sum_final(&state->sumX, &num);
+#endif
temp = DirectFunctionCall1(numeric_send,
NumericGetDatum(make_result(&num)));
sumX = DatumGetByteaP(temp);
+#ifdef HAVE_INT128
int128_to_numericvar(state->sumX2, &num);
+#else
+ accum_sum_final(&state->sumX2, &num);
+#endif
temp = DirectFunctionCall1(numeric_send,
NumericGetDatum(make_result(&num)));
sumX2 = DatumGetByteaP(temp);
- free_var(&num);
-#else
- temp = DirectFunctionCall1(numeric_send,
- NumericGetDatum(make_result(&state->sumX)));
- sumX = DatumGetByteaP(temp);
- temp = DirectFunctionCall1(numeric_send,
- NumericGetDatum(make_result(&state->sumX2)));
- sumX2 = DatumGetByteaP(temp);
-#endif
+ free_var(&num);
}
pq_begintypsend(&buf);
@@ -4095,7 +4159,9 @@ numeric_poly_deserialize(PG_FUNCTION_ARGS)
bytea *sstate;
PolyNumAggState *result;
Datum sumX;
+ NumericVar sumX_var;
Datum sumX2;
+ NumericVar sumX2_var;
StringInfoData buf;
if (!AggCheckCallContext(fcinfo, NULL))
@@ -4127,22 +4193,18 @@ numeric_poly_deserialize(PG_FUNCTION_ARGS)
InvalidOid,
-1);
+ init_var_from_num(DatumGetNumeric(sumX), &sumX_var);
#ifdef HAVE_INT128
- {
- NumericVar num;
-
- init_var(&num);
- set_var_from_num(DatumGetNumeric(sumX), &num);
- numericvar_to_int128(&num, &result->sumX);
-
- set_var_from_num(DatumGetNumeric(sumX2), &num);
- numericvar_to_int128(&num, &result->sumX2);
+ numericvar_to_int128(&sumX_var, &result->sumX);
+#else
+ accum_sum_add(&result->sumX, &sumX_var);
+#endif
- free_var(&num);
- }
+ set_var_from_num(DatumGetNumeric(sumX2), &sumX2_var);
+#ifdef HAVE_INT128
+ numericvar_to_int128(&sumX2_var, &result->sumX2);
#else
- set_var_from_num(DatumGetNumeric(sumX), &result->sumX);
- set_var_from_num(DatumGetNumeric(sumX2), &result->sumX2);
+ accum_sum_add(&result->sumX2, &sumX_var);
#endif
pq_getmsgend(&buf);
@@ -4213,8 +4275,7 @@ int8_avg_combine(PG_FUNCTION_ARGS)
#ifdef HAVE_INT128
state1->sumX = state2->sumX;
#else
- init_var(&state1->sumX);
- set_var_from_var(&state2->sumX, &state1->sumX);
+ accum_sum_copy(&state1->sumX, &state2->sumX);
#endif
MemoryContextSwitchTo(old_context);
@@ -4232,7 +4293,7 @@ int8_avg_combine(PG_FUNCTION_ARGS)
old_context = MemoryContextSwitchTo(agg_context);
/* Accumulate sums */
- add_var(&(state1->sumX), &(state2->sumX), &(state1->sumX));
+ accum_sum_combine(&state1->sumX, &state2->sumX);
MemoryContextSwitchTo(old_context);
#endif
@@ -4270,20 +4331,20 @@ int8_avg_serialize(PG_FUNCTION_ARGS)
*/
{
Datum temp;
-#ifdef HAVE_INT128
NumericVar num;
init_var(&num);
+
+#ifdef HAVE_INT128
int128_to_numericvar(state->sumX, &num);
- temp = DirectFunctionCall1(numeric_send,
- NumericGetDatum(make_result(&num)));
- free_var(&num);
- sumX = DatumGetByteaP(temp);
#else
+ accum_sum_final(&state->sumX, &num);
+#endif
temp = DirectFunctionCall1(numeric_send,
- NumericGetDatum(make_result(&state->sumX)));
+ NumericGetDatum(make_result(&num)));
sumX = DatumGetByteaP(temp);
-#endif
+
+ free_var(&num);
}
pq_begintypsend(&buf);
@@ -4310,6 +4371,7 @@ int8_avg_deserialize(PG_FUNCTION_ARGS)
PolyNumAggState *result;
StringInfoData buf;
Datum temp;
+ NumericVar num;
if (!AggCheckCallContext(fcinfo, NULL))
elog(ERROR, "aggregate function called in non-aggregate context");
@@ -4333,18 +4395,11 @@ int8_avg_deserialize(PG_FUNCTION_ARGS)
PointerGetDatum(&buf),
InvalidOid,
-1);
-
+ init_var_from_num(DatumGetNumeric(temp), &num);
#ifdef HAVE_INT128
- {
- NumericVar num;
-
- init_var(&num);
- set_var_from_num(DatumGetNumeric(temp), &num);
- numericvar_to_int128(&num, &result->sumX);
- free_var(&num);
- }
+ numericvar_to_int128(&num, &result->sumX);
#else
- set_var_from_num(DatumGetNumeric(temp), &result->sumX);
+ accum_sum_add(&result->sumX, &num);
#endif
pq_getmsgend(&buf);
@@ -4538,6 +4593,7 @@ numeric_avg(PG_FUNCTION_ARGS)
NumericAggState *state;
Datum N_datum;
Datum sumX_datum;
+ NumericVar sumX_var;
state = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
@@ -4549,7 +4605,11 @@ numeric_avg(PG_FUNCTION_ARGS)
PG_RETURN_NUMERIC(make_result(&const_nan));
N_datum = DirectFunctionCall1(int8_numeric, Int64GetDatum(state->N));
- sumX_datum = NumericGetDatum(make_result(&state->sumX));
+
+ init_var(&sumX_var);
+ accum_sum_final(&state->sumX, &sumX_var);
+ sumX_datum = NumericGetDatum(make_result(&sumX_var));
+ free_var(&sumX_var);
PG_RETURN_DATUM(DirectFunctionCall2(numeric_div, sumX_datum, N_datum));
}
@@ -4558,6 +4618,8 @@ Datum
numeric_sum(PG_FUNCTION_ARGS)
{
NumericAggState *state;
+ NumericVar sumX_var;
+ Numeric result;
state = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
@@ -4568,7 +4630,12 @@ numeric_sum(PG_FUNCTION_ARGS)
if (state->NaNcount > 0) /* there was at least one NaN input */
PG_RETURN_NUMERIC(make_result(&const_nan));
- PG_RETURN_NUMERIC(make_result(&(state->sumX)));
+ init_var(&sumX_var);
+ accum_sum_final(&state->sumX, &sumX_var);
+ result = make_result(&sumX_var);
+ free_var(&sumX_var);
+
+ PG_RETURN_NUMERIC(result);
}
/*
@@ -4612,8 +4679,8 @@ numeric_stddev_internal(NumericAggState *state,
init_var(&vsumX2);
int64_to_numericvar(state->N, &vN);
- set_var_from_var(&(state->sumX), &vsumX);
- set_var_from_var(&(state->sumX2), &vsumX2);
+ accum_sum_final(&(state->sumX), &vsumX);
+ accum_sum_final(&(state->sumX2), &vsumX2);
/*
* Sample stddev and variance are undefined when N <= 1; population stddev
@@ -4743,26 +4810,38 @@ numeric_poly_stddev_internal(Int128AggState *state,
NumericAggState numstate;
Numeric res;
- init_var(&numstate.sumX);
- init_var(&numstate.sumX2);
- numstate.NaNcount = 0;
- numstate.agg_context = NULL;
+ /* Initialize an empty agg state */
+ memset(&numstate, 0, sizeof(NumericAggState));
if (state)
{
+ NumericVar tmp_var;
+
numstate.N = state->N;
- int128_to_numericvar(state->sumX, &numstate.sumX);
- int128_to_numericvar(state->sumX2, &numstate.sumX2);
- }
- else
- {
- numstate.N = 0;
+
+ init_var(&tmp_var);
+
+ int128_to_numericvar(state->sumX, &tmp_var);
+ accum_sum_add(&numstate.sumX, &tmp_var);
+
+ int128_to_numericvar(state->sumX2, &tmp_var);
+ accum_sum_add(&numstate.sumX2, &tmp_var);
+
+ free_var(&tmp_var);
}
res = numeric_stddev_internal(&numstate, variance, sample, is_null);
- free_var(&numstate.sumX);
- free_var(&numstate.sumX2);
+ if (numstate.sumX.ndigits > 0)
+ {
+ pfree(numstate.sumX.pos_digits);
+ pfree(numstate.sumX.neg_digits);
+ }
+ if (numstate.sumX2.ndigits > 0)
+ {
+ pfree(numstate.sumX2.pos_digits);
+ pfree(numstate.sumX2.neg_digits);
+ }
return res;
}
@@ -8699,3 +8778,312 @@ strip_var(NumericVar *var)
var->digits = digits;
var->ndigits = ndigits;
}
+
+
+/* ----------------------------------------------------------------------
+ *
+ * Fast sum accumulator functions
+ *
+ * ----------------------------------------------------------------------
+ */
+
+/*
+ * Reset the accumulator's value to zero. The buffers to hold
+ * the digits are not free'd.
+ */
+static void
+accum_sum_reset(NumericSumAccum *accum)
+{
+ int i;
+
+ accum->dscale = 0;
+ for (i = 0; i < accum->ndigits; i++)
+ {
+ accum->pos_digits[i] = 0;
+ accum->neg_digits[i] = 0;
+ }
+}
+
+/*
+ * Accumulate a new value.
+ */
+static void
+accum_sum_add(NumericSumAccum *accum, NumericVar *val)
+{
+ int32 *accum_digits;
+ int i,
+ val_i;
+ int val_ndigits;
+ NumericDigit *val_digits;
+
+ /*
+ * If we have accumulated too many values since the last carry
+ * propagation, do it now, to avoid overflowing.
+ */
+ if (accum->num_uncarried++ == 9999)
+ accum_sum_carry(accum);
+
+ /*
+ * Adjust the weight or scale of the old value, so that it can accommodate
+ * the new value.
+ */
+ accum_sum_rescale(accum, val);
+
+ /* */
+ if (val->sign == NUMERIC_POS)
+ accum_digits = accum->pos_digits;
+ else
+ accum_digits = accum->neg_digits;
+
+ /* copy these values into local vars for speed in loop */
+ val_ndigits = val->ndigits;
+ val_digits = val->digits;
+
+ i = accum->weight - val->weight;
+ for (val_i = 0; val_i < val_ndigits; val_i++)
+ {
+ accum_digits[i] += (int32) val_digits[val_i];
+ i++;
+ }
+}
+
+/*
+ * Propagate carries.
+ */
+static void
+accum_sum_carry(NumericSumAccum *accum)
+{
+ int i;
+ int ndigits;
+ int32 *dig;
+ int32 carry;
+ int32 newdig = 0;
+
+ /*
+ * If no new values have been added since last carry propagation, nothing
+ * to do.
+ */
+ if (accum->num_uncarried == 0)
+ return;
+
+ /*
+ * We maintain that the weight of the accumulator is always one larger
+ * than needed to hold the current value, before carrying, to make sure
+ * there is enough space for the possible extra digit when carry is
+ * propagated. We cannot expand the buffer here, unless we require callers
+ * of accum_sum_final() to switch to the right memory context.
+ */
+ Assert(accum->pos_digits[0] == 0 && accum->neg_digits[0] == 0);
+
+ ndigits = accum->ndigits;
+
+ /* Propagate carry in the positive sum */
+ dig = accum->pos_digits;
+ carry = 0;
+ for (i = ndigits - 1; i >= 0; i--)
+ {
+ newdig = dig[i] + carry;
+ if (newdig >= NBASE)
+ {
+ carry = newdig / NBASE;
+ newdig -= carry * NBASE;
+ }
+ else
+ carry = 0;
+ dig[i] = newdig;
+ }
+ if (newdig > 0)
+ accum->have_carry_space = false;
+
+ /* And the same for the negative sum */
+ dig = accum->neg_digits;
+ carry = 0;
+ for (i = ndigits - 1; i >= 0; i--)
+ {
+ newdig = dig[i] + carry;
+ if (newdig >= NBASE)
+ {
+ carry = newdig / NBASE;
+ newdig -= carry * NBASE;
+ }
+ else
+ carry = 0;
+ dig[i] = newdig;
+ }
+ if (newdig > 0)
+ accum->have_carry_space = false;
+
+ accum->num_uncarried = 0;
+}
+
+/*
+ * Re-scale accumulator to accommodate new value.
+ *
+ * If the new value has more digits on than the current digit
+ * buffers in the accumulator, enlarge the buffers.
+ */
+static void
+accum_sum_rescale(NumericSumAccum *accum, NumericVar *val)
+{
+ int old_weight = accum->weight;
+ int old_ndigits = accum->ndigits;
+ int accum_ndigits;
+ int accum_weight;
+ int accum_rscale;
+ int val_rscale;
+
+ accum_weight = old_weight;
+ accum_ndigits = old_ndigits;
+
+ /*
+ * Does the new value have a larger weight? If so, enlarge the buffers,
+ * and shift the existing value to the new weight, by adding leading
+ * zeros.
+ *
+ * We enforce that the accumulator always has a weight one larger than
+ * needed for the inputs, so that we have space for the final
+ * carry-propagation phase, if necessary.
+ */
+ if (val->weight >= accum_weight)
+ {
+ accum_weight = val->weight + 1;
+ accum_ndigits = accum_ndigits + (accum_weight - old_weight);
+ }
+
+ /*
+ * Even though the new value is small, we might've used up the space
+ * reserved for the carry digit in the last call to accum_sum_carry(). If
+ * so, enlarge to make room for another one.
+ */
+ else if (!accum->have_carry_space)
+ {
+ accum_weight++;
+ accum_ndigits++;
+ }
+
+ /* Is the new value wider on the right side? */
+ accum_rscale = accum_ndigits - accum_weight - 1;
+ val_rscale = val->ndigits - val->weight - 1;
+ if (val_rscale > accum_rscale)
+ accum_ndigits = accum_ndigits + (val_rscale - accum_rscale);
+
+ if (accum_ndigits != old_ndigits ||
+ accum_weight != old_weight)
+ {
+ int32 *new_pos_digits;
+ int32 *new_neg_digits;
+ int weightdiff;
+
+ weightdiff = accum_weight - old_weight;
+
+ new_pos_digits = palloc0(accum_ndigits * sizeof(int32));
+ new_neg_digits = palloc0(accum_ndigits * sizeof(int32));
+
+ if (accum->pos_digits)
+ {
+ memcpy(&new_pos_digits[weightdiff], accum->pos_digits, old_ndigits * sizeof(int32));
+ pfree(accum->pos_digits);
+
+ memcpy(&new_neg_digits[weightdiff], accum->neg_digits, old_ndigits * sizeof(int32));
+ pfree(accum->neg_digits);
+ }
+
+ accum->pos_digits = new_pos_digits;
+ accum->neg_digits = new_neg_digits;
+
+ accum->weight = accum_weight;
+ accum->ndigits = accum_ndigits;
+
+ Assert(accum->pos_digits[0] == 0 && accum->neg_digits[0] == 0);
+ accum->have_carry_space = true;
+ }
+
+ if (val->dscale > accum->dscale)
+ accum->dscale = val->dscale;
+}
+
+/*
+ * Return the current value of the accumulator. This perform final carry
+ * propagation, and adds together the positive and negative sums.
+ *
+ * Unlike all the other routines, the caller is not required to switch to
+ * the memory context that holds the accumulator.
+ */
+static void
+accum_sum_final(NumericSumAccum *accum, NumericVar *result)
+{
+ int i;
+ NumericVar pos_var;
+ NumericVar neg_var;
+
+ if (accum->ndigits == 0)
+ {
+ set_var_from_var(&const_zero, result);
+ return;
+ }
+
+ accum_sum_carry(accum);
+
+ init_var(&pos_var);
+ init_var(&neg_var);
+
+ pos_var.ndigits = neg_var.ndigits = accum->ndigits;
+ pos_var.weight = neg_var.weight = accum->weight;
+ pos_var.dscale = neg_var.dscale = accum->dscale;
+ pos_var.sign = NUMERIC_POS;
+ neg_var.sign = NUMERIC_NEG;
+
+ pos_var.buf = pos_var.digits = digitbuf_alloc(accum->ndigits);
+ neg_var.buf = neg_var.digits = digitbuf_alloc(accum->ndigits);
+
+ for (i = 0; i < accum->ndigits; i++)
+ {
+ Assert(accum->pos_digits[i] < NBASE);
+ pos_var.digits[i] = (int16) accum->pos_digits[i];
+
+ Assert(accum->neg_digits[i] < NBASE);
+ neg_var.digits[i] = (int16) accum->neg_digits[i];
+ }
+
+ /* Add together the positive and negative sums */
+ add_var(&pos_var, &neg_var, result);
+
+ /* Remove leading/trailing zeroes */
+ strip_var(result);
+}
+
+/*
+ * Copy an accumulator's state.
+ *
+ * 'dst' is assumed to be uninitialized beforehand. No attempt is
+ * made at freeing old values.
+ */
+static void
+accum_sum_copy(NumericSumAccum *dst, NumericSumAccum *src)
+{
+ dst->pos_digits = palloc(src->ndigits * sizeof(int32));
+ dst->neg_digits = palloc(src->ndigits * sizeof(int32));
+
+ memcpy(dst->pos_digits, src->pos_digits, src->ndigits * sizeof(int32));
+ memcpy(dst->neg_digits, src->neg_digits, src->ndigits * sizeof(int32));
+ dst->num_uncarried = src->num_uncarried;
+ dst->ndigits = src->ndigits;
+ dst->weight = src->weight;
+ dst->dscale = src->dscale;
+}
+
+/*
+ * Add the current value of 'accum2' into 'accum'.
+ */
+static void
+accum_sum_combine(NumericSumAccum *accum, NumericSumAccum *accum2)
+{
+ NumericVar tmp_var;
+
+ init_var(&tmp_var);
+
+ accum_sum_final(accum2, &tmp_var);
+ accum_sum_add(accum, &tmp_var);
+
+ free_var(&tmp_var);
+}
--
2.8.1
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers