Update of /cvsroot/monetdb/pathfinder/compiler/algebra/opt
In directory 23jxhf1.ch3.sourceforge.com:/tmp/cvs-serv3949/compiler/algebra/opt

Modified Files:
      Tag: M5XQ
        opt_complex.c 
Log Message:
propagated changes of Friday Jun 12 2009 - Monday Jun 15 2009
from the development trunk to the M5XQ branch

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
2009/06/12 - tsheyar: compiler/algebra/opt/opt_complex.c,1.80
-- Replaced aggregate operators count, min, max, avg, sum, prod, seqty1,
   and all in the algebra by a single aggregate operator ``aggr''
   that can handle multiple aggregates. The aggregate entries
   are of kind count, min, max, avg, sum, prod, seqty1, all, and dist.

-- Added new aggregate kind ``dist'' that allows to represent group by
   columns that functionally depend on the partitioning criterion
   in the result of the grouping aggregate.

-- Added rewrite that merges aggregates.

-- Added rewrite that removes superfluous aggregates.

-- Added rewrite that pushes a rank operator through an aggregate.

-- Extended the XML import to cope with the old
   as well as the new representation of aggregates.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~


U opt_complex.c
Index: opt_complex.c
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/compiler/algebra/opt/opt_complex.c,v
retrieving revision 1.74.2.4
retrieving revision 1.74.2.5
diff -u -d -r1.74.2.4 -r1.74.2.5
--- opt_complex.c       16 May 2009 08:23:31 -0000      1.74.2.4
+++ opt_complex.c       15 Jun 2009 12:44:45 -0000      1.74.2.5
@@ -75,6 +75,305 @@
 }
 
 /**
+ * Find a equi-join -- aggregate pattern and turn it into
+ * an aggregate operator with multiple aggregates.
+ *
+ * We look for the following pattern, where we ensure that
+ * all columns functionally depend on the join columns, the
+ * aggregates partition their input by the join columns, and
+ * at least operator agg_l or operator agg_r is present.
+ *
+ *          |X|
+ *     _____/ \_______
+ *    /               \
+ * (pi_l1)?         (pi_r1)?
+ *   |                |
+ * (agg_l)?         (agg_r)?
+ *   |                |
+ * (pi_l2)?         (pi_r2)?
+ *   |                |
+ *   \______   _______/
+ *          \ /
+ *           op
+ */
+PFla_op_t *
+aggregate_pattern (PFla_op_t *p)
+{
+    PFalg_col_t   lcol,
+                  rcol;
+    PFla_op_t    *lp,
+                 *rp,
+                 *lproj1 = NULL,
+                 *laggr  = NULL,
+                 *lproj2 = NULL,
+                 *rproj1 = NULL,
+                 *raggr  = NULL,
+                 *rproj2 = NULL,
+                 *res    = NULL;
+    unsigned int  i,
+                  j,
+                  acount = 0,
+                  pcount = 0;
+    PFalg_aggr_t *aggr;
+    PFalg_proj_t *proj;
+
+    /* make sure that we start with an equi-join */
+    if (p->kind != la_eqjoin) return NULL;
+
+    /* make sure that all columns depend on the join argument */
+    for (i = 0; i < p->schema.count; i++)
+        if (!PFprop_fd (p->prop,
+                        p->sem.eqjoin.col1,
+                        p->schema.items[i].name))
+            return NULL;
+
+    /* make sure that the result may return the join argument
+       as a key column */
+    if (!PFprop_key (p->prop, p->sem.eqjoin.col1) &&
+        !PFprop_set (p->prop))
+        return NULL;
+    
+    /* checks for the left join side */
+
+    lp   = L(p);
+    lcol = p->sem.eqjoin.col1;
+    /* record projection information */
+    if (lp->kind == la_project) {
+        lproj1 = lp;
+        lp     = L(lp);
+        lcol   = find_old_name (lproj1, lcol);
+    }
+
+    /* check for a matching aggregate */
+    if (lp->kind == la_aggr &&
+        lp->sem.aggr.part == lcol) {
+        laggr = lp;
+        lp = L(lp);
+    }
+
+    /* record projection information */
+    if (lp->kind == la_project) {
+        lproj2 = lp;
+        lp     = L(lp);
+        lcol   = find_old_name (lproj2, lcol);
+    }
+
+    /* checks for the right join side */
+
+    rp   = R(p);
+    rcol = p->sem.eqjoin.col2;
+    /* record projection information */
+    if (rp->kind == la_project) {
+        rproj1 = rp;
+        rp     = L(rp);
+        rcol   = find_old_name (rproj1, rcol);
+    }
+
+    /* check for a matching aggregate */
+    if (rp->kind == la_aggr &&
+        rp->sem.aggr.part == rcol) {
+        raggr = rp;
+        rp = L(rp);
+    }
+
+    /* record projection information */
+    if (rp->kind == la_project) {
+        rproj2 = rp;
+        rp     = L(rp);
+        rcol   = find_old_name (rproj2, rcol);
+    }
+
+    /* check for the remainder of the pattern */
+    if (lp != rp ||
+        lcol != rcol ||
+        (!laggr && !raggr))
+        return NULL;
+
+    /* We have to create for all column but the partition columns
+       an aggregate. */
+    aggr = PFmalloc ((p->schema.count - 2) * sizeof (PFalg_aggr_t));
+
+    /* In case we have an aggregate we have to copy the aggregates
+       to the result list. */
+    if (laggr) {
+        /* Merge the upper projection with the aggregate. For every
+           result column (except for the partition column) an aggregate
+           is generated (possibly leading to duplicate aggregates). */
+        if (lproj1)
+            for (i = 0; i < lproj1->sem.proj.count; i++) {
+                PFalg_col_t new = lproj1->sem.proj.items[i].new,
+                            old = lproj1->sem.proj.items[i].old;
+                if (new != p->sem.eqjoin.col1) {
+                    /* find matching index */
+                    for (j = 0; j < laggr->sem.aggr.count; j++)
+                        if (old == laggr->sem.aggr.aggr[j].res)
+                            break;
+                    /* index not found -- bail out */
+                    if (j == laggr->sem.aggr.count)
+                        return NULL;
+
+                    aggr[acount++] = PFalg_aggr (laggr->sem.aggr.aggr[j].kind,
+                                                 new,
+                                                 laggr->sem.aggr.aggr[j].col);
+                }
+            }
+        else
+            /* copy the aggregate list */
+            for (j = 0; j < laggr->sem.aggr.count; j++)
+                aggr[acount++] = laggr->sem.aggr.aggr[j];
+
+        /* Merge the lower projection with the aggregate by renaming
+           the input columns of the aggregate. */
+        if (lproj2)
+            for (j = 0; j < acount; j++) {
+                /* don't rename missing input columns */
+                if (!aggr[j].col)
+                    continue;
+                for (i = 0; i < lproj2->sem.proj.count; i++)
+                    if (aggr[j].col == lproj2->sem.proj.items[i].new) {
+                        aggr[j].col = lproj2->sem.proj.items[i].old;
+                        break;
+                    }
+                assert (i < lproj2->sem.proj.count);
+            }
+    }
+    /* In case we have no aggregate we need to create for all columns
+       (except for the join column) distinct aggregates. */
+    else {
+        /* we don't know how to handle two adjacent projections here */
+        if (lproj2)
+           return NULL;
+
+        /* Turn renaming projections into aggregates with differently
+           named input and output columns. */
+        if (lproj1)
+            for (i = 0; i < lproj1->sem.proj.count; i++) {
+                PFalg_col_t new = lproj1->sem.proj.items[i].new,
+                            old = lproj1->sem.proj.items[i].old;
+                if (new != p->sem.eqjoin.col1)
+                    aggr[acount++] = PFalg_aggr (alg_aggr_dist, new, old);
+            }
+        else
+            for (i = 0; i < lp->schema.count; i++) {
+                PFalg_col_t col = lp->schema.items[i].name;
+                if (col != p->sem.eqjoin.col1)
+                    aggr[acount++] = PFalg_aggr (alg_aggr_dist, col, col);
+            }
+    }
+
+    /* In case we have an aggregate we have to copy the aggregates
+       to the result list. */
+    if (raggr) {
+        /* Store the current offset to avoid renaming the aggregates
+           of the left side. */
+        unsigned int offset = acount;
+
+        /* Merge the upper projection with the aggregate. For every
+           result column (except for the partition column) an aggregate
+           is generated (possibly leading to duplicate aggregates). */
+        if (rproj1)
+            for (i = 0; i < rproj1->sem.proj.count; i++) {
+                PFalg_col_t new = rproj1->sem.proj.items[i].new,
+                            old = rproj1->sem.proj.items[i].old;
+                if (new != p->sem.eqjoin.col2) {
+                    /* find matching index */
+                    for (j = 0; j < raggr->sem.aggr.count; j++)
+                        if (old == raggr->sem.aggr.aggr[j].res)
+                            break;
+                    /* index not found -- bail out */
+                    if (j == raggr->sem.aggr.count)
+                        return NULL;
+
+                    aggr[acount++] = PFalg_aggr (raggr->sem.aggr.aggr[j].kind,
+                                                 new,
+                                                 raggr->sem.aggr.aggr[j].col);
+                }
+            }
+        else
+            /* copy the aggregate list */
+            for (j = 0; j < raggr->sem.aggr.count; j++)
+                aggr[acount++] = raggr->sem.aggr.aggr[j];
+
+        /* Merge the lower projection with the aggregate by renaming
+           the input columns of the aggregate. */
+        if (rproj2)
+            for (j = offset; j < acount; j++) {
+                /* don't rename missing input columns */
+                if (!aggr[j].col)
+                    continue;
+                for (i = 0; i < rproj2->sem.proj.count; i++)
+                    if (aggr[j].col == rproj2->sem.proj.items[i].new) {
+                        aggr[j].col = rproj2->sem.proj.items[i].old;
+                        break;
+                    }
+                assert (i < rproj2->sem.proj.count);
+            }
+    }
+    /* In case we have no aggregate we need to create for all columns
+       (except for the join column) distinct aggregates. */
+    else {
+        /* we don't know how to handle two adjacent projections here */
+        if (rproj2)
+           return NULL;
+
+        /* Turn renaming projections into aggregates with differently
+           named input and output columns. */
+        if (rproj1)
+            for (i = 0; i < rproj1->sem.proj.count; i++) {
+                PFalg_col_t new = rproj1->sem.proj.items[i].new,
+                            old = rproj1->sem.proj.items[i].old;
+                if (new != p->sem.eqjoin.col2)
+                    aggr[acount++] = PFalg_aggr (alg_aggr_dist, new, old);
+            }
+        else
+            for (i = 0; i < rp->schema.count; i++) {
+                PFalg_col_t col = rp->schema.items[i].name;
+                if (col != p->sem.eqjoin.col2)
+                    aggr[acount++] = PFalg_aggr (alg_aggr_dist, col, col);
+            }
+    }
+
+    /* create a projection list to correctly reflect the output schema */
+    proj = PFmalloc (p->schema.count * sizeof (PFalg_proj_t));
+
+    /* fill the projection list and remove duplicate aggregates */
+    i = 0;
+    while (i < acount) {
+        /* check if we saw the same aggregate already */
+        for (j = 0; j < i; j++)
+            if (aggr[i].kind == aggr[j].kind &&
+                aggr[i].col  == aggr[j].col) {
+                break;
+            }
+        /* We have found the same aggregate and replace it by
+           the last aggregate in the aggregate list. The projection
+           ensures that the output references the found aggregate. */
+        if (i != j) {
+            proj[pcount++] = PFalg_proj (aggr[i].res, aggr[j].res);
+            acount--;
+            aggr[i] = aggr[acount];
+        }
+        /* We found a new aggregate and add its output to the projection
+           list. */
+        else {
+            proj[pcount++] = PFalg_proj (aggr[i].res, aggr[i].res);
+            i++;
+        }
+    }
+
+    /* add the join (aka partition) columns to the projection list */
+    proj[pcount++] = PFalg_proj (p->sem.eqjoin.col1, lcol);
+    proj[pcount++] = PFalg_proj (p->sem.eqjoin.col2, lcol);
+
+    assert (pcount == p->schema.count);
+
+    /* build the new aggregate */
+    res = PFla_project_ (aggr (lp, lcol, acount, aggr), pcount, proj);
+
+    return res;
+}
+
+/**
  * Replace a thetajoin with a one-row literal table by a selection.
  */
 static PFla_op_t *
@@ -479,10 +778,12 @@
     }
 
     /* check for count aggregate */
-    if (op->kind == la_count &&
+    if (op->kind == la_aggr &&
+        op->sem.aggr.count == 1 &&
+        op->sem.aggr.aggr[0].kind == alg_aggr_count &&
         op->sem.aggr.part &&
         op->sem.aggr.part == iter &&
-        op->sem.aggr.res  == last) {
+        op->sem.aggr.aggr[0].res == last) {
         /* follow the projection list to the base operator */
         while (base->kind == la_project) {
             base_iter = find_old_name (base, base_iter);
@@ -1258,6 +1559,14 @@
                 break;
             }
 
+            /* aggregate patterns */
+            PFla_op_t *res = aggregate_pattern (p);
+            if (res) {
+                *p = *res;
+                modified = true;
+                break;
+            }
+
 #if 0 /* disable join -> semijoin rewrites */
             /* introduce semi-join operator if possible */
             if (!left_arg_req &&
@@ -1775,20 +2084,18 @@
             }
             break;
 
-        case la_min:
-        case la_max:
-        case la_avg:
+        case la_aggr:
             /* In case the aggregated value is the same for all rows in a group
                we can replace the aggregate by a duplicate elimination. */
-            if (p->sem.aggr.part &&
-                PFprop_fd (L(p)->prop, p->sem.aggr.part, p->sem.aggr.col)) {
-                *p = *distinct (
-                          project (
-                              L(p),
-                              PFalg_proj (p->sem.aggr.res, p->sem.aggr.col),
-                              PFalg_proj (p->sem.aggr.part, 
p->sem.aggr.part)));
-                break;
-            }
+            if (p->sem.aggr.part)
+                for (unsigned int i = 0; i < p->sem.aggr.count; i++)
+                    if (p->sem.aggr.aggr[i].col &&
+                        PFprop_fd (L(p)->prop,
+                                   p->sem.aggr.part,
+                                   p->sem.aggr.aggr[i].col))
+                        p->sem.aggr.aggr[i].kind = alg_aggr_dist;
+            /* A further rewrite in opt_general.brg will introduce distinct
+               operators if no other aggregates exist. */
             break;
 
         case la_rownum:
@@ -2602,18 +2909,11 @@
         case la_bool_or:
         case la_bool_not:
         case la_to:
-        case la_avg:
-        case la_max:
-        case la_min:
-        case la_sum:
-        case la_prod:
-        case la_count:
+        case la_aggr:
         case la_rank:
         case la_type:
         case la_type_assert:
         case la_cast:
-        case la_seqty1:
-        case la_all:
         case la_doc_tbl:
         case la_roots:
         case la_dummy:


------------------------------------------------------------------------------
Crystal Reports - New Free Runtime and 30 Day Trial
Check out the new simplified licensing option that enables unlimited
royalty-free distribution of the report engine for externally facing 
server and web deployment.
http://p.sf.net/sfu/businessobjects
_______________________________________________
Monetdb-pf-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-pf-checkins

Reply via email to