Update of /cvsroot/monetdb/pathfinder/compiler/algebra/opt
In directory 23jxhf1.ch3.sourceforge.com:/tmp/cvs-serv3949/compiler/algebra/opt
Modified Files:
Tag: M5XQ
opt_complex.c
Log Message:
propagated changes of Friday Jun 12 2009 - Monday Jun 15 2009
from the development trunk to the M5XQ branch
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
2009/06/12 - tsheyar: compiler/algebra/opt/opt_complex.c,1.80
-- Replaced aggregate operators count, min, max, avg, sum, prod, seqty1,
and all in the algebra by a single aggregate operator ``aggr''
that can handle multiple aggregates. The aggregate entries
are of kind count, min, max, avg, sum, prod, seqty1, all, and dist.
-- Added new aggregate kind ``dist'' that allows to represent group by
columns that functionally depend on the partitioning criterion
in the result of the grouping aggregate.
-- Added rewrite that merges aggregates.
-- Added rewrite that removes superfluous aggregates.
-- Added rewrite that pushes a rank operator through an aggregate.
-- Extended the XML import to cope with the old
as well as the new representation of aggregates.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
U opt_complex.c
Index: opt_complex.c
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/compiler/algebra/opt/opt_complex.c,v
retrieving revision 1.74.2.4
retrieving revision 1.74.2.5
diff -u -d -r1.74.2.4 -r1.74.2.5
--- opt_complex.c 16 May 2009 08:23:31 -0000 1.74.2.4
+++ opt_complex.c 15 Jun 2009 12:44:45 -0000 1.74.2.5
@@ -75,6 +75,305 @@
}
/**
+ * Find a equi-join -- aggregate pattern and turn it into
+ * an aggregate operator with multiple aggregates.
+ *
+ * We look for the following pattern, where we ensure that
+ * all columns functionally depend on the join columns, the
+ * aggregates partition their input by the join columns, and
+ * at least operator agg_l or operator agg_r is present.
+ *
+ * |X|
+ * _____/ \_______
+ * / \
+ * (pi_l1)? (pi_r1)?
+ * | |
+ * (agg_l)? (agg_r)?
+ * | |
+ * (pi_l2)? (pi_r2)?
+ * | |
+ * \______ _______/
+ * \ /
+ * op
+ */
+PFla_op_t *
+aggregate_pattern (PFla_op_t *p)
+{
+ PFalg_col_t lcol,
+ rcol;
+ PFla_op_t *lp,
+ *rp,
+ *lproj1 = NULL,
+ *laggr = NULL,
+ *lproj2 = NULL,
+ *rproj1 = NULL,
+ *raggr = NULL,
+ *rproj2 = NULL,
+ *res = NULL;
+ unsigned int i,
+ j,
+ acount = 0,
+ pcount = 0;
+ PFalg_aggr_t *aggr;
+ PFalg_proj_t *proj;
+
+ /* make sure that we start with an equi-join */
+ if (p->kind != la_eqjoin) return NULL;
+
+ /* make sure that all columns depend on the join argument */
+ for (i = 0; i < p->schema.count; i++)
+ if (!PFprop_fd (p->prop,
+ p->sem.eqjoin.col1,
+ p->schema.items[i].name))
+ return NULL;
+
+ /* make sure that the result may return the join argument
+ as a key column */
+ if (!PFprop_key (p->prop, p->sem.eqjoin.col1) &&
+ !PFprop_set (p->prop))
+ return NULL;
+
+ /* checks for the left join side */
+
+ lp = L(p);
+ lcol = p->sem.eqjoin.col1;
+ /* record projection information */
+ if (lp->kind == la_project) {
+ lproj1 = lp;
+ lp = L(lp);
+ lcol = find_old_name (lproj1, lcol);
+ }
+
+ /* check for a matching aggregate */
+ if (lp->kind == la_aggr &&
+ lp->sem.aggr.part == lcol) {
+ laggr = lp;
+ lp = L(lp);
+ }
+
+ /* record projection information */
+ if (lp->kind == la_project) {
+ lproj2 = lp;
+ lp = L(lp);
+ lcol = find_old_name (lproj2, lcol);
+ }
+
+ /* checks for the right join side */
+
+ rp = R(p);
+ rcol = p->sem.eqjoin.col2;
+ /* record projection information */
+ if (rp->kind == la_project) {
+ rproj1 = rp;
+ rp = L(rp);
+ rcol = find_old_name (rproj1, rcol);
+ }
+
+ /* check for a matching aggregate */
+ if (rp->kind == la_aggr &&
+ rp->sem.aggr.part == rcol) {
+ raggr = rp;
+ rp = L(rp);
+ }
+
+ /* record projection information */
+ if (rp->kind == la_project) {
+ rproj2 = rp;
+ rp = L(rp);
+ rcol = find_old_name (rproj2, rcol);
+ }
+
+ /* check for the remainder of the pattern */
+ if (lp != rp ||
+ lcol != rcol ||
+ (!laggr && !raggr))
+ return NULL;
+
+ /* We have to create for all column but the partition columns
+ an aggregate. */
+ aggr = PFmalloc ((p->schema.count - 2) * sizeof (PFalg_aggr_t));
+
+ /* In case we have an aggregate we have to copy the aggregates
+ to the result list. */
+ if (laggr) {
+ /* Merge the upper projection with the aggregate. For every
+ result column (except for the partition column) an aggregate
+ is generated (possibly leading to duplicate aggregates). */
+ if (lproj1)
+ for (i = 0; i < lproj1->sem.proj.count; i++) {
+ PFalg_col_t new = lproj1->sem.proj.items[i].new,
+ old = lproj1->sem.proj.items[i].old;
+ if (new != p->sem.eqjoin.col1) {
+ /* find matching index */
+ for (j = 0; j < laggr->sem.aggr.count; j++)
+ if (old == laggr->sem.aggr.aggr[j].res)
+ break;
+ /* index not found -- bail out */
+ if (j == laggr->sem.aggr.count)
+ return NULL;
+
+ aggr[acount++] = PFalg_aggr (laggr->sem.aggr.aggr[j].kind,
+ new,
+ laggr->sem.aggr.aggr[j].col);
+ }
+ }
+ else
+ /* copy the aggregate list */
+ for (j = 0; j < laggr->sem.aggr.count; j++)
+ aggr[acount++] = laggr->sem.aggr.aggr[j];
+
+ /* Merge the lower projection with the aggregate by renaming
+ the input columns of the aggregate. */
+ if (lproj2)
+ for (j = 0; j < acount; j++) {
+ /* don't rename missing input columns */
+ if (!aggr[j].col)
+ continue;
+ for (i = 0; i < lproj2->sem.proj.count; i++)
+ if (aggr[j].col == lproj2->sem.proj.items[i].new) {
+ aggr[j].col = lproj2->sem.proj.items[i].old;
+ break;
+ }
+ assert (i < lproj2->sem.proj.count);
+ }
+ }
+ /* In case we have no aggregate we need to create for all columns
+ (except for the join column) distinct aggregates. */
+ else {
+ /* we don't know how to handle two adjacent projections here */
+ if (lproj2)
+ return NULL;
+
+ /* Turn renaming projections into aggregates with differently
+ named input and output columns. */
+ if (lproj1)
+ for (i = 0; i < lproj1->sem.proj.count; i++) {
+ PFalg_col_t new = lproj1->sem.proj.items[i].new,
+ old = lproj1->sem.proj.items[i].old;
+ if (new != p->sem.eqjoin.col1)
+ aggr[acount++] = PFalg_aggr (alg_aggr_dist, new, old);
+ }
+ else
+ for (i = 0; i < lp->schema.count; i++) {
+ PFalg_col_t col = lp->schema.items[i].name;
+ if (col != p->sem.eqjoin.col1)
+ aggr[acount++] = PFalg_aggr (alg_aggr_dist, col, col);
+ }
+ }
+
+ /* In case we have an aggregate we have to copy the aggregates
+ to the result list. */
+ if (raggr) {
+ /* Store the current offset to avoid renaming the aggregates
+ of the left side. */
+ unsigned int offset = acount;
+
+ /* Merge the upper projection with the aggregate. For every
+ result column (except for the partition column) an aggregate
+ is generated (possibly leading to duplicate aggregates). */
+ if (rproj1)
+ for (i = 0; i < rproj1->sem.proj.count; i++) {
+ PFalg_col_t new = rproj1->sem.proj.items[i].new,
+ old = rproj1->sem.proj.items[i].old;
+ if (new != p->sem.eqjoin.col2) {
+ /* find matching index */
+ for (j = 0; j < raggr->sem.aggr.count; j++)
+ if (old == raggr->sem.aggr.aggr[j].res)
+ break;
+ /* index not found -- bail out */
+ if (j == raggr->sem.aggr.count)
+ return NULL;
+
+ aggr[acount++] = PFalg_aggr (raggr->sem.aggr.aggr[j].kind,
+ new,
+ raggr->sem.aggr.aggr[j].col);
+ }
+ }
+ else
+ /* copy the aggregate list */
+ for (j = 0; j < raggr->sem.aggr.count; j++)
+ aggr[acount++] = raggr->sem.aggr.aggr[j];
+
+ /* Merge the lower projection with the aggregate by renaming
+ the input columns of the aggregate. */
+ if (rproj2)
+ for (j = offset; j < acount; j++) {
+ /* don't rename missing input columns */
+ if (!aggr[j].col)
+ continue;
+ for (i = 0; i < rproj2->sem.proj.count; i++)
+ if (aggr[j].col == rproj2->sem.proj.items[i].new) {
+ aggr[j].col = rproj2->sem.proj.items[i].old;
+ break;
+ }
+ assert (i < rproj2->sem.proj.count);
+ }
+ }
+ /* In case we have no aggregate we need to create for all columns
+ (except for the join column) distinct aggregates. */
+ else {
+ /* we don't know how to handle two adjacent projections here */
+ if (rproj2)
+ return NULL;
+
+ /* Turn renaming projections into aggregates with differently
+ named input and output columns. */
+ if (rproj1)
+ for (i = 0; i < rproj1->sem.proj.count; i++) {
+ PFalg_col_t new = rproj1->sem.proj.items[i].new,
+ old = rproj1->sem.proj.items[i].old;
+ if (new != p->sem.eqjoin.col2)
+ aggr[acount++] = PFalg_aggr (alg_aggr_dist, new, old);
+ }
+ else
+ for (i = 0; i < rp->schema.count; i++) {
+ PFalg_col_t col = rp->schema.items[i].name;
+ if (col != p->sem.eqjoin.col2)
+ aggr[acount++] = PFalg_aggr (alg_aggr_dist, col, col);
+ }
+ }
+
+ /* create a projection list to correctly reflect the output schema */
+ proj = PFmalloc (p->schema.count * sizeof (PFalg_proj_t));
+
+ /* fill the projection list and remove duplicate aggregates */
+ i = 0;
+ while (i < acount) {
+ /* check if we saw the same aggregate already */
+ for (j = 0; j < i; j++)
+ if (aggr[i].kind == aggr[j].kind &&
+ aggr[i].col == aggr[j].col) {
+ break;
+ }
+ /* We have found the same aggregate and replace it by
+ the last aggregate in the aggregate list. The projection
+ ensures that the output references the found aggregate. */
+ if (i != j) {
+ proj[pcount++] = PFalg_proj (aggr[i].res, aggr[j].res);
+ acount--;
+ aggr[i] = aggr[acount];
+ }
+ /* We found a new aggregate and add its output to the projection
+ list. */
+ else {
+ proj[pcount++] = PFalg_proj (aggr[i].res, aggr[i].res);
+ i++;
+ }
+ }
+
+ /* add the join (aka partition) columns to the projection list */
+ proj[pcount++] = PFalg_proj (p->sem.eqjoin.col1, lcol);
+ proj[pcount++] = PFalg_proj (p->sem.eqjoin.col2, lcol);
+
+ assert (pcount == p->schema.count);
+
+ /* build the new aggregate */
+ res = PFla_project_ (aggr (lp, lcol, acount, aggr), pcount, proj);
+
+ return res;
+}
+
+/**
* Replace a thetajoin with a one-row literal table by a selection.
*/
static PFla_op_t *
@@ -479,10 +778,12 @@
}
/* check for count aggregate */
- if (op->kind == la_count &&
+ if (op->kind == la_aggr &&
+ op->sem.aggr.count == 1 &&
+ op->sem.aggr.aggr[0].kind == alg_aggr_count &&
op->sem.aggr.part &&
op->sem.aggr.part == iter &&
- op->sem.aggr.res == last) {
+ op->sem.aggr.aggr[0].res == last) {
/* follow the projection list to the base operator */
while (base->kind == la_project) {
base_iter = find_old_name (base, base_iter);
@@ -1258,6 +1559,14 @@
break;
}
+ /* aggregate patterns */
+ PFla_op_t *res = aggregate_pattern (p);
+ if (res) {
+ *p = *res;
+ modified = true;
+ break;
+ }
+
#if 0 /* disable join -> semijoin rewrites */
/* introduce semi-join operator if possible */
if (!left_arg_req &&
@@ -1775,20 +2084,18 @@
}
break;
- case la_min:
- case la_max:
- case la_avg:
+ case la_aggr:
/* In case the aggregated value is the same for all rows in a group
we can replace the aggregate by a duplicate elimination. */
- if (p->sem.aggr.part &&
- PFprop_fd (L(p)->prop, p->sem.aggr.part, p->sem.aggr.col)) {
- *p = *distinct (
- project (
- L(p),
- PFalg_proj (p->sem.aggr.res, p->sem.aggr.col),
- PFalg_proj (p->sem.aggr.part,
p->sem.aggr.part)));
- break;
- }
+ if (p->sem.aggr.part)
+ for (unsigned int i = 0; i < p->sem.aggr.count; i++)
+ if (p->sem.aggr.aggr[i].col &&
+ PFprop_fd (L(p)->prop,
+ p->sem.aggr.part,
+ p->sem.aggr.aggr[i].col))
+ p->sem.aggr.aggr[i].kind = alg_aggr_dist;
+ /* A further rewrite in opt_general.brg will introduce distinct
+ operators if no other aggregates exist. */
break;
case la_rownum:
@@ -2602,18 +2909,11 @@
case la_bool_or:
case la_bool_not:
case la_to:
- case la_avg:
- case la_max:
- case la_min:
- case la_sum:
- case la_prod:
- case la_count:
+ case la_aggr:
case la_rank:
case la_type:
case la_type_assert:
case la_cast:
- case la_seqty1:
- case la_all:
case la_doc_tbl:
case la_roots:
case la_dummy:
------------------------------------------------------------------------------
Crystal Reports - New Free Runtime and 30 Day Trial
Check out the new simplified licensing option that enables unlimited
royalty-free distribution of the report engine for externally facing
server and web deployment.
http://p.sf.net/sfu/businessobjects
_______________________________________________
Monetdb-pf-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-pf-checkins