Wenhai Li has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/530

Change subject: Initial fuzzy join for inci's verification
......................................................................

Initial fuzzy join for inci's verification

commit d9feb3ee13f907a0dcc27c0ad91dce92ad16c9a0
Author: Michael <[email protected]>
Date:   Wed Dec 2 03:22:49 2015 -0800

    Initial fuzzyjoin setup for inci

Change-Id: I2487078d1821d7ad85bb745bfa31024bcdbea1f1
---
M 
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
M 
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
M 
algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
M 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
M 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
M 
hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/tokenizers/AbstractUTF8Token.java
6 files changed, 516 insertions(+), 33 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/hyracks refs/changes/30/530/1

diff --git 
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
 
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
index 672e6d0..092750d 100644
--- 
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
+++ 
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
@@ -219,7 +219,8 @@
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
         IBinaryComparatorFactory[] comparatorFactories = 
JobGenHelper.variablesToAscBinaryComparatorFactories(gbyCols,
                 aggOpInputEnv, context);
-        RecordDescriptor recordDescriptor = 
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, 
context);
+        RecordDescriptor recordDescriptor = 
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
+                context);
         IBinaryHashFunctionFactory[] hashFunctionFactories = 
JobGenHelper.variablesToBinaryHashFunctionFactories(
                 gbyCols, aggOpInputEnv, context);
 
diff --git 
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
 
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
index 186ac6f..9185cba 100644
--- 
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
+++ 
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
@@ -25,7 +25,6 @@
 import java.util.Map.Entry;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -75,6 +74,34 @@
         ILogicalOperator op = opRef.getValue();
         int n = op.getInputs().size();
         IOperatorSchema[] schemas = new IOperatorSchema[n];
+        for (int i = n - 1; i >= 0; i--) {
+            Mutable<ILogicalOperator> opRef2 = op.getInputs().get(i);
+            List<Mutable<ILogicalOperator>> parents = 
operatorVisitedToParents.get(opRef2);
+            if (parents == null) {
+                parents = new ArrayList<Mutable<ILogicalOperator>>();
+                operatorVisitedToParents.put(opRef2, parents);
+                parents.add(opRef);
+                compileOpRef(opRef2, spec, builder, outerPlanSchema);
+                schemas[i] = context.getSchema(opRef2.getValue());
+            } else {
+                if (!parents.contains(opRef))
+                    parents.add(opRef);
+                schemas[i] = context.getSchema(opRef2.getValue());
+                continue;
+            }
+        }
+        IOperatorSchema opSchema = new OperatorSchemaImpl();
+        context.putSchema(op, opSchema);
+        op.getVariablePropagationPolicy().propagateVariables(opSchema, 
schemas);
+        op.contributeRuntimeOperator(builder, context, opSchema, schemas, 
outerPlanSchema);
+    }
+
+    @SuppressWarnings("unused")
+    private void compileOpRef1(Mutable<ILogicalOperator> opRef, 
IOperatorDescriptorRegistry spec,
+            IHyracksJobBuilder builder, IOperatorSchema outerPlanSchema) 
throws AlgebricksException {
+        ILogicalOperator op = opRef.getValue();
+        int n = op.getInputs().size();
+        IOperatorSchema[] schemas = new IOperatorSchema[n];
         int i = 0;
         for (Mutable<ILogicalOperator> opRef2 : op.getInputs()) {
             List<Mutable<ILogicalOperator>> parents = 
operatorVisitedToParents.get(opRef2);
diff --git 
a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
 
b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
index 92a3691..6664f28 100644
--- 
a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
+++ 
b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
@@ -22,13 +22,14 @@
 import java.util.BitSet;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.commons.lang3.mutable.MutableObject;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -40,6 +41,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
@@ -58,7 +60,10 @@
     private final List<List<Mutable<ILogicalOperator>>> equivalenceClasses = 
new ArrayList<List<Mutable<ILogicalOperator>>>();
     private final HashMap<Mutable<ILogicalOperator>, BitSet> 
opToCandidateInputs = new HashMap<Mutable<ILogicalOperator>, BitSet>();
     private final HashMap<Mutable<ILogicalOperator>, MutableInt> clusterMap = 
new HashMap<Mutable<ILogicalOperator>, MutableInt>();
+    private final Map<MutableInt, List<Pair<Mutable<ILogicalOperator>, 
MutableInt>>> clusterOpMap = new HashMap<MutableInt, 
List<Pair<Mutable<ILogicalOperator>, MutableInt>>>();
+    private final Map<LogicalOperatorTag, Integer> opTypeCount = new 
HashMap<LogicalOperatorTag, Integer>();
     private final HashMap<Integer, BitSet> clusterWaitForMap = new 
HashMap<Integer, BitSet>();
+    private final HashMap<Mutable<ILogicalOperator>, Pair<Boolean, BitSet>> 
selectedMOMap = new HashMap<Mutable<ILogicalOperator>, Pair<Boolean, BitSet>>();
     private int lastUsedClusterId = 0;
 
     @Override
@@ -99,6 +104,8 @@
                 opToCandidateInputs.clear();
                 clusterMap.clear();
                 clusterWaitForMap.clear();
+                selectedMOMap.clear();
+                clusterOpMap.clear();
                 lastUsedClusterId = 0;
             } while (changed);
             roots.clear();
@@ -125,8 +132,10 @@
     private boolean rewrite(IOptimizationContext context) throws 
AlgebricksException {
         boolean changed = false;
         for (List<Mutable<ILogicalOperator>> members : equivalenceClasses) {
-            if (rewriteForOneEquivalentClass(members, context))
+            if (rewriteForOneEquivalentClass(members, context)) {
                 changed = true;
+                break;
+            }
         }
         return changed;
     }
@@ -137,6 +146,7 @@
         boolean rewritten = false;
         while (members.size() > 0) {
             group.clear();
+            //adjustForDerivation(members);
             Mutable<ILogicalOperator> candidate = 
members.remove(members.size() - 1);
             group.add(candidate);
             for (int i = members.size() - 1; i >= 0; i--) {
@@ -149,6 +159,13 @@
             boolean[] materializationFlags = computeMaterilizationFlags(group);
             if (group.isEmpty()) {
                 continue;
+            }
+            List<Pair<int[], int[]>> prevLabels = new ArrayList<Pair<int[], 
int[]>>();
+            List<List<List<Pair<int[], int[]>>>> laterLabels = new 
ArrayList<List<List<Pair<int[], int[]>>>>();
+            int level1 = 0;
+            for (int i = 0; i < group.size(); i++) {
+                AbstractLogicalOperator op = (AbstractLogicalOperator) 
group.get(i).getValue();
+                
prevLabels.add(op.getPhysicalOperator().getInputOutputDependencyLabels(group.get(i).getValue()));
             }
             candidate = group.get(0);
             ReplicateOperator rop = new ReplicateOperator(group.size(), 
materializationFlags);
@@ -220,6 +237,7 @@
                 
context.computeAndSetTypeEnvironmentForOperator(projectOperator);
 
                 List<Mutable<ILogicalOperator>> parentOpList = 
childrenToParents.get(ref);
+                List<Integer> indexes = new ArrayList<Integer>();
                 for (Mutable<ILogicalOperator> parentOpRef : parentOpList) {
                     AbstractLogicalOperator parentOp = 
(AbstractLogicalOperator) parentOpRef.getValue();
                     int index = parentOp.getInputs().indexOf(ref);
@@ -231,7 +249,7 @@
                             parentOp = parentOpNext;
                         }
                     }
-
+                    indexes.add(index);
                     ILogicalOperator childOp = parentOp.getOperatorTag() == 
LogicalOperatorTag.PROJECT ? assignOperator
                             : projectOperator;
                     if (parentOp.isMap()) {
@@ -245,6 +263,31 @@
                     }
                     context.computeAndSetTypeEnvironmentForOperator(parentOp);
                 }
+                laterLabels.add(new ArrayList<List<Pair<int[], int[]>>>());
+                int level2 = 0;
+                for (Mutable<ILogicalOperator> parentOpRef : parentOpList) {
+                    AbstractLogicalOperator parentOp = 
(AbstractLogicalOperator) parentOpRef.getValue();
+                    int index = indexes.get(level2);
+                    laterLabels.get(level1).add(new ArrayList<Pair<int[], 
int[]>>());
+                    laterLabels.get(level1).get(level2)
+                            
.add(parentOp.getPhysicalOperator().getInputOutputDependencyLabels(parentOp));
+                    ILogicalOperator next = 
parentOp.getInputs().get(index).getValue();
+                    do {
+                        laterLabels
+                                .get(level1)
+                                .get(level2)
+                                .add(((AbstractLogicalOperator) 
next).getPhysicalOperator()
+                                        .getInputOutputDependencyLabels(next));
+                        next = next.getInputs().get(0).getValue();
+                    } while (!next.equals(ropRef.getValue()));
+                    laterLabels
+                            .get(level1)
+                            .get(level2)
+                            .add(((AbstractLogicalOperator) 
ropRef.getValue()).getPhysicalOperator()
+                                    
.getInputOutputDependencyLabels(ropRef.getValue()));
+                    level2++;
+                }
+                level1++;
             }
             rewritten = true;
         }
@@ -346,6 +389,127 @@
         }
     }
 
+    private ILogicalOperator extractPKProduction(ILogicalOperator root, 
LogicalVariable pk) throws AlgebricksException {
+        ILogicalOperator prodOp = null;
+        for (Mutable<ILogicalOperator> opRef : root.getInputs()) {
+            List<LogicalVariable> producedVars = new 
ArrayList<LogicalVariable>();
+            VariableUtilities.getProducedVariables(opRef.getValue(), 
producedVars);
+            if (producedVars != null && producedVars.contains(pk)) {
+                prodOp = opRef.getValue();
+                break;
+            } else if (opRef.getValue().getInputs().size() > 0) {
+                prodOp = extractPKProduction(opRef.getValue(), pk);
+                if (prodOp != null)
+                    break;
+            }
+        }
+        return prodOp;
+    }
+
+    private void mergeHomogeneousPK(ILogicalOperator op, List<LogicalVariable> 
pks) throws AlgebricksException {
+        Map<LogicalVariable, ILogicalOperator> varOpMap = new 
HashMap<LogicalVariable, ILogicalOperator>();
+        for (LogicalVariable pk : pks) {
+            ILogicalOperator mOp = extractPKProduction(op, pk);
+            if (mOp == null || 
!mOp.getOperatorTag().equals(LogicalOperatorTag.DATASOURCESCAN))
+                throw new AlgebricksException("Illegal variable production.");
+            varOpMap.put(pk, mOp);
+        }
+        Map<LogicalVariable, LogicalVariable> variableMapping = new 
HashMap<LogicalVariable, LogicalVariable>();
+        for (int i = 0; i < pks.size() - 1; i++) {
+            for (int j = i + 1; j < pks.size(); j++) {
+                DataSourceScanOperator dsopi = (DataSourceScanOperator) 
(varOpMap.get(pks.get(i)));
+                DataSourceScanOperator dsopj = (DataSourceScanOperator) 
(varOpMap.get(pks.get(j)));
+                if (dsopi.getDataSource().equals(dsopj.getDataSource()))
+                    
IsomorphismUtilities.mapVariablesTopDown(varOpMap.get(pks.get(i)), 
varOpMap.get(pks.get(j)),
+                            variableMapping);
+            }
+        }
+        Iterator<LogicalVariable> itr = pks.iterator();
+        while (itr.hasNext()) {
+            LogicalVariable pk = itr.next();
+            if (variableMapping.containsKey(pk)) {
+                variableMapping.remove(pk);
+                itr.remove();
+            }
+        }
+    }
+
+    private void extractPrimaryKeys(IOptimizationContext context, 
ILogicalOperator root, List<LogicalVariable> prodVars)
+            throws AlgebricksException {
+        for (Mutable<ILogicalOperator> opRef : root.getInputs()) {
+            if 
(opRef.getValue().getOperatorTag().equals(LogicalOperatorTag.DATASOURCESCAN)) {
+                List<LogicalVariable> vars = new ArrayList<LogicalVariable>();
+                VariableUtilities.getProducedVariables(opRef.getValue(), vars);
+                prodVars.addAll(vars);
+            } else
+                extractPrimaryKeys(context, opRef.getValue(), prodVars);
+        }
+    }
+
+    private boolean isBinaryHomogeneous(IOptimizationContext context, 
ILogicalOperator candidate, ILogicalOperator peer)
+            throws AlgebricksException {
+        //Currently, support single pk only.
+        boolean isHomo = false;
+        if (candidate.getInputs().size() != peer.getInputs().size())
+            throw new AlgebricksException("Unexpected isomophic result on: " + 
candidate + "-" + peer);
+        if (candidate.getInputs().size() < 2)
+            return true;
+        if (candidate.getOperatorTag().equals(LogicalOperatorTag.INNERJOIN)) {
+            if (peer.getOperatorTag().equals(LogicalOperatorTag.INNERJOIN)
+                    && candidate.getSchema().size() == peer.getSchema().size())
+                isHomo = true;
+        } else if 
(candidate.getOperatorTag().equals(LogicalOperatorTag.LEFTOUTERJOIN)) {
+            if 
(!peer.getOperatorTag().equals(LogicalOperatorTag.LEFTOUTERJOIN))
+                throw new AlgebricksException("Unexpected isomophic lojs of: " 
+ candidate + "-" + peer);
+            for (int i = 0; i < 2; i++) {
+                ILogicalOperator candOp = 
candidate.getInputs().get(i).getValue();
+                ILogicalOperator peerOp = peer.getInputs().get(i).getValue();
+                List<LogicalVariable> candProduce = new 
ArrayList<LogicalVariable>();
+                extractPrimaryKeys(context, candOp, candProduce);
+                List<LogicalVariable> peerProduce = new 
ArrayList<LogicalVariable>();
+                extractPrimaryKeys(context, peerOp, peerProduce);
+                List<LogicalVariable> candPKs = new 
ArrayList<LogicalVariable>();
+                List<LogicalVariable> peerPKs = new 
ArrayList<LogicalVariable>();
+                for (LogicalVariable var : candProduce) {
+                    List<LogicalVariable> pks = context.findPrimaryKey(var);
+                    if (pks == null)
+                        continue;
+                    for (LogicalVariable pk : pks) {
+                        if (!candPKs.contains(pk))
+                            candPKs.add(pk);
+                    }
+                }
+                for (LogicalVariable var : peerProduce) {
+                    List<LogicalVariable> pks = context.findPrimaryKey(var);
+                    if (pks == null)
+                        continue;
+                    for (LogicalVariable pk : pks) {
+                        if (!peerPKs.contains(pk))
+                            peerPKs.add(pk);
+                    }
+                }
+                mergeHomogeneousPK(candOp, candPKs);
+                mergeHomogeneousPK(peerOp, peerPKs);
+                if (candPKs.size() != peerPKs.size()) {
+                    isHomo = false;
+                    break;
+                }
+                List<LogicalVariable> biPKs = new ArrayList<LogicalVariable>();
+                biPKs.addAll(candPKs);
+                biPKs.addAll(peerPKs);
+                for (Mutable<ILogicalOperator> root : roots)
+                    mergeHomogeneousPK(root.getValue(), biPKs);
+                if (biPKs.size() == candPKs.size())
+                    isHomo = true;
+                else {
+                    isHomo = false;
+                    break;
+                }
+            }
+        }
+        return isHomo;
+    }
+
     private void prune(IOptimizationContext context) throws 
AlgebricksException {
         List<List<Mutable<ILogicalOperator>>> previousEquivalenceClasses = new 
ArrayList<List<Mutable<ILogicalOperator>>>();
         for (List<Mutable<ILogicalOperator>> candidates : equivalenceClasses) {
@@ -365,7 +529,8 @@
                     equivalentClass.add(candidates.get(i));
                     for (int j = i - 1; j >= 0; j--) {
                         ILogicalOperator peer = candidates.get(j).getValue();
-                        if 
(IsomorphismUtilities.isOperatorIsomorphic(candidate, peer)) {
+                        if (isBinaryHomogeneous(context, candidate, peer)
+                                && 
IsomorphismUtilities.isOperatorIsomorphic(candidate, peer)) {
                             reserved[i] = true;
                             reserved[j] = true;
                             equivalentClass.add(candidates.get(j));
@@ -385,11 +550,210 @@
         }
     }
 
-    private boolean[] 
computeMaterilizationFlags(List<Mutable<ILogicalOperator>> group) {
+    private boolean derivedFrom(ILogicalOperator father, ILogicalOperator 
child) {
+        boolean derived = false;
+        if (father.getInputs().size() > 0) {
+            for (Mutable<ILogicalOperator> next : father.getInputs()) {
+                if (next.getValue().equals(child))
+                    derived = true;
+                else
+                    derived |= derivedFrom(next.getValue(), child);
+            }
+        }
+        return derived;
+    }
+
+    private void removeIsomophicShared(List<Mutable<ILogicalOperator>> 
candidates) {
+        boolean existsIsomophicShared = false;
+        do {
+            for (int i = 0; i < candidates.size() - 1; i++) {
+                for (int j = i + 1; j < candidates.size(); j++) {
+
+                }
+            }
+        } while (existsIsomophicShared);
+    }
+
+    /*private void removeIsomophicShared(List<Mutable<ILogicalOperator>> 
candidates) {
+        //for (List<Mutable<ILogicalOperator>> candidates : 
equivalenceClasses) {
+        //adjustForDerivation(candidates);
+        boolean existsIsomophicShared = false;
+        do {
+            for (int i = 0; i < candidates.size() - 1; i++) {
+                for (int j = i + 1; j < candidates.size(); j++) {
+                    MutableInt first = clusterMap.get(candidates.get(i));
+                    MutableInt second = clusterMap.get(candidates.get(j));
+                    if (clusterWaitForMap.get(first) != null
+                            && 
clusterWaitForMap.get(first.getValue()).get(second.getValue()))
+                        candidates.remove(i);
+                    else if (clusterWaitForMap.get(second) != null
+                            && 
clusterWaitForMap.get(second.getValue()).get(first.getValue()))
+                        candidates.remove(j);
+                    if 
(clusterMap.get(candidates.get(i)).equals(clusterMap.get(candidates.get(j)))) {
+                        if (derivedFrom(candidates.get(i).getValue(), 
candidates.get(j).getValue())) {
+                            candidates.remove(j);
+                            existsIsomophicShared = true;
+                            break;
+                        }
+                    }
+                }
+            }
+        } while (existsIsomophicShared);
+        //}
+    }*/
+
+    @SuppressWarnings("unused")
+    private void adjustForDerivation(List<Mutable<ILogicalOperator>> 
equivalentClass) {
+        int i = equivalentClass.size() - 1;
+        for (int j = 0; j < equivalentClass.size() - 1; j++) {
+            if (derivedFrom(equivalentClass.get(i).getValue(), 
equivalentClass.get(j).getValue())) {
+                Mutable<ILogicalOperator> op = equivalentClass.remove(j);
+                equivalentClass.add(op);
+                adjustForDerivation(equivalentClass);
+            }
+        }
+    }
+
+    private boolean dependedOn(Integer father, Integer child) {
+        boolean depOn = false;
+        BitSet bs = clusterWaitForMap.get(father);
+        if (bs == null || bs.cardinality() == 0)
+            depOn = false;
+        else {
+            for (int i = bs.nextSetBit(0); i >= 0; i = bs.nextSetBit(i + 1)) {
+                if (child == i) {
+                    depOn = true;
+                    break;
+                } else
+                    depOn |= dependedOn(i, child);
+            }
+        }
+        return depOn;
+    }
+
+    private boolean isLeafActivity(Mutable<ILogicalOperator> father) {
+        int outputArity = 0;
+        if (father.getValue().getOperatorTag() == 
LogicalOperatorTag.REPLICATE) {
+            ReplicateOperator rop = (ReplicateOperator) father.getValue();
+            List<Mutable<ILogicalOperator>> outputs = rop.getOutputs();
+            for (outputArity = 0; outputArity < outputs.size(); outputArity++) 
{
+                if (outputs.get(outputArity).equals(father)) {
+                    break;
+                }
+            }
+        }
+        boolean directToETS = true;
+        int inputArity = 0;
+        for (Mutable<ILogicalOperator> child : father.getValue().getInputs()) {
+            AbstractLogicalOperator aop = (AbstractLogicalOperator) 
child.getValue();
+            Pair<int[], int[]> labels = 
aop.getPhysicalOperator().getInputOutputDependencyLabels(child.getValue());
+            if (labels.second[outputArity] == 1 && labels.first[inputArity++] 
== 0)
+                directToETS = false;
+            else
+                directToETS &= isLeafActivity(child);
+
+        }
+        return directToETS;
+    }
+
+    private void loadAndVerifyOperators(Mutable<ILogicalOperator> root) {
+        for (Mutable<ILogicalOperator> opRef : root.getValue().getInputs()) {
+            if (!clusterMap.keySet().contains(opRef)) {
+                if (isLeafActivity(opRef))
+                    clusterOpMap.get(new MutableInt(0)).add(
+                            new Pair<Mutable<ILogicalOperator>, 
MutableInt>(opRef, clusterMap.get(opRef)));
+                else
+                    clusterOpMap.get(new MutableInt(lastUsedClusterId + 
1)).add(
+                            new Pair<Mutable<ILogicalOperator>, 
MutableInt>(opRef, clusterMap.get(opRef)));
+            }
+        }
+    }
+
+    private void tmpVerifyForOneJoinTree(Mutable<ILogicalOperator> parent) {
+        for (Mutable<ILogicalOperator> child : parent.getValue().getInputs()) {
+            if 
(child.getValue().getOperatorTag().equals(LogicalOperatorTag.INNERJOIN))
+                opTypeCount.put(LogicalOperatorTag.SUBPLAN, 
opTypeCount.get(LogicalOperatorTag.SUBPLAN) + 1);
+            if 
(child.getValue().getOperatorTag().equals(LogicalOperatorTag.LEFTOUTERJOIN))
+                opTypeCount.put(LogicalOperatorTag.SCRIPT, 
opTypeCount.get(LogicalOperatorTag.SCRIPT) + 1);
+            if 
(child.getValue().getOperatorTag().equals(LogicalOperatorTag.INNERJOIN)
+                    || 
child.getValue().getOperatorTag().equals(LogicalOperatorTag.LEFTOUTERJOIN)) {
+                if (!clusterMap.containsKey(child)) {
+                    opTypeCount.put(LogicalOperatorTag.EMPTYTUPLESOURCE,
+                            
opTypeCount.get(LogicalOperatorTag.EMPTYTUPLESOURCE) + 1);
+                }
+            }
+            if (child.getValue().getInputs().size() > 0)
+                tmpVerifyForOneJoinTree(child);
+        }
+    }
+
+    private void tmpVerifyJoinOp() {
+        for (Mutable<ILogicalOperator> root : roots) {
+            opTypeCount.put(LogicalOperatorTag.SUBPLAN, 0);
+            opTypeCount.put(LogicalOperatorTag.SCRIPT, 0);
+            opTypeCount.put(LogicalOperatorTag.EMPTYTUPLESOURCE, 0);
+            tmpVerifyForOneJoinTree(root);
+        }
+    }
+
+    private void adjustGroupByDependency(List<Mutable<ILogicalOperator>> 
group) throws AlgebricksException {
+        //For sake of debugging the cover of the plan
+        opTypeCount.clear();
+        tmpVerifyJoinOp();
+        for (Entry<Mutable<ILogicalOperator>, MutableInt> entry : 
clusterMap.entrySet()) {
+            if 
(entry.getKey().getValue().getOperatorTag().equals(LogicalOperatorTag.INNERJOIN))
 {
+                if (!opTypeCount.containsKey(LogicalOperatorTag.INNERJOIN))
+                    opTypeCount.put(LogicalOperatorTag.INNERJOIN, 0);
+                opTypeCount.put(LogicalOperatorTag.INNERJOIN, 
opTypeCount.get(LogicalOperatorTag.INNERJOIN) + 1);
+            }
+            if 
(entry.getKey().getValue().getOperatorTag().equals(LogicalOperatorTag.LEFTOUTERJOIN))
 {
+                if (!opTypeCount.containsKey(LogicalOperatorTag.LEFTOUTERJOIN))
+                    opTypeCount.put(LogicalOperatorTag.LEFTOUTERJOIN, 0);
+                opTypeCount
+                        .put(LogicalOperatorTag.LEFTOUTERJOIN, 
opTypeCount.get(LogicalOperatorTag.LEFTOUTERJOIN) + 1);
+            }
+            if (!clusterOpMap.containsKey(entry.getValue())) {
+                clusterOpMap.put(entry.getValue(), new 
ArrayList<Pair<Mutable<ILogicalOperator>, MutableInt>>());
+            }
+            if (!clusterOpMap.get(entry.getValue()).contains(
+                    new Pair<Mutable<ILogicalOperator>, 
MutableInt>(entry.getKey(), entry.getValue())))
+                clusterOpMap.get(entry.getValue()).add(
+                        new Pair<Mutable<ILogicalOperator>, 
MutableInt>(entry.getKey(), entry.getValue()));
+        }
+        if (clusterOpMap.containsKey(new MutableInt(0)) && 
clusterOpMap.get(new MutableInt(0)).size() > 0)
+            throw new AlgebricksException("Unexpected clusterOpMap." + 
clusterOpMap.get(new MutableInt(0)));
+        clusterOpMap.put(new MutableInt(0), new 
ArrayList<Pair<Mutable<ILogicalOperator>, MutableInt>>());
+        clusterOpMap.put(new MutableInt(lastUsedClusterId + 1),
+                new ArrayList<Pair<Mutable<ILogicalOperator>, MutableInt>>());
+        for (Mutable<ILogicalOperator> root : roots) {
+            loadAndVerifyOperators(root);
+        }
+
+        List<Integer> groupClusterIds = new ArrayList<Integer>(group.size());
+        for (int i = 0; i < group.size(); i++) {
+            groupClusterIds.add(clusterMap.get(group.get(i)).getValue());
+        }
+        for (int i = groupClusterIds.size() - 1; i > 0; i--) {
+            for (int j = 0; j < i; j++) {
+                if (dependedOn(groupClusterIds.get(j), 
groupClusterIds.get(i))) {
+                    Integer father = groupClusterIds.get(i);
+                    groupClusterIds.set(i, groupClusterIds.get(j));
+                    groupClusterIds.set(j, father);
+                    ILogicalOperator fatherOp = group.get(i).getValue();
+                    group.get(i).setValue(group.get(j).getValue());
+                    group.get(i).setValue(fatherOp);
+                }
+            }
+        }
+    }
+
+    private boolean[] 
computeMaterilizationFlags(List<Mutable<ILogicalOperator>> group) throws 
AlgebricksException {
         lastUsedClusterId = 0;
         for (Mutable<ILogicalOperator> root : roots) {
-            computeClusters(null, root, new MutableInt(++lastUsedClusterId));
+            computeClusters(null, root, new MutableInt(++lastUsedClusterId), 
true);
         }
+        removeIsomophicShared(group);
+        adjustGroupByDependency(group);
         boolean[] materializationFlags = new boolean[group.size()];
         boolean worthMaterialization = worthMaterialization(group.get(0));
         boolean requiresMaterialization;
@@ -409,6 +773,16 @@
         if (group.size() < 2) {
             group.clear();
         }
+        /*for (int i = 0; i < group.size() - 1; i++) {
+            for (int j = i + 1; j < group.size(); j++) {
+                if 
(clusterMap.get(group.get(i)).equals(clusterMap.get(group.get(j)))
+                        && (materializationFlags[i] == true || 
materializationFlags[j] == true)) {
+                    materializationFlags[i] = true;
+                    materializationFlags[j] = true;
+                    throw new AlgebricksException("Unexpected isomophic ops." 
+ group.get(i) + " and " + group.get(j));
+                }
+            }
+        }*/
         // if does not worth materialization, the flags for the remaining 
candidates should be false
         return worthMaterialization ? materializationFlags : new 
boolean[group.size()];
     }
@@ -441,14 +815,16 @@
     }
 
     private void computeClusters(Mutable<ILogicalOperator> parentRef, 
Mutable<ILogicalOperator> opRef,
-            MutableInt currentClusterId) {
+            MutableInt currentClusterId, boolean isMaterializationBranch) 
throws AlgebricksException {
         // only replicate operator has multiple outputs
+        boolean needMOMapping = false;
         int outputIndex = 0;
         if (opRef.getValue().getOperatorTag() == LogicalOperatorTag.REPLICATE) 
{
             ReplicateOperator rop = (ReplicateOperator) opRef.getValue();
             List<Mutable<ILogicalOperator>> outputs = rop.getOutputs();
             for (outputIndex = 0; outputIndex < outputs.size(); outputIndex++) 
{
                 if (outputs.get(outputIndex).equals(parentRef)) {
+                    needMOMapping = true;
                     break;
                 }
             }
@@ -456,35 +832,98 @@
         AbstractLogicalOperator aop = (AbstractLogicalOperator) 
opRef.getValue();
         Pair<int[], int[]> labels = 
aop.getPhysicalOperator().getInputOutputDependencyLabels(opRef.getValue());
         List<Mutable<ILogicalOperator>> inputs = opRef.getValue().getInputs();
-        for (int i = 0; i < inputs.size(); i++) {
-            Mutable<ILogicalOperator> inputRef = inputs.get(i);
-            if (labels.second[outputIndex] == 1 && labels.first[i] == 0) { // 
1 -> 0
-                if (labels.second.length == 1) {
-                    clusterMap.put(opRef, currentClusterId);
-                    // start a new cluster
+        MutableInt prevClusterId = clusterMap.get(opRef);
+        if (prevClusterId == null) {
+            if (selectedMOMap.containsKey(opRef))
+                throw new AlgebricksException("Unexpected mapping replicate." 
+ opRef.getValue().getOperatorTag());
+            clusterMap.put(opRef, currentClusterId);
+            for (int i = inputs.size() - 1; i >= 0; i--) {
+                Mutable<ILogicalOperator> inputRef = inputs.get(i);
+                if (labels.second[outputIndex] == 1 && labels.first[i] == 0) {
                     MutableInt newClusterId = new 
MutableInt(++lastUsedClusterId);
-                    computeClusters(opRef, inputRef, newClusterId);
                     BitSet waitForList = 
clusterWaitForMap.get(currentClusterId.getValue());
                     if (waitForList == null) {
                         waitForList = new BitSet();
                         clusterWaitForMap.put(currentClusterId.getValue(), 
waitForList);
                     }
                     waitForList.set(newClusterId.getValue());
-                }
-            } else { // 0 -> 0 and 1 -> 1
-                MutableInt prevClusterId = clusterMap.get(opRef);
-                if (prevClusterId == null || 
prevClusterId.getValue().equals(currentClusterId.getValue())) {
-                    clusterMap.put(opRef, currentClusterId);
-                    computeClusters(opRef, inputRef, currentClusterId);
+                    computeClusters(opRef, inputRef, newClusterId, true);
+                    if (needMOMapping) {
+                        clusterMap.put(opRef, newClusterId);
+                        // for replicate operator <materialized, dependencies>
+                        selectedMOMap.put(opRef,
+                                new Pair<Boolean, BitSet>(true, 
clusterWaitForMap.get(newClusterId.getValue())));
+                    }
                 } else {
-                    // merge prevClusterId and currentClusterId: update all 
the map entries that has currentClusterId to prevClusterId
+                    BitSet buildSideBinaryDependencies = new BitSet();
+                    if (needMOMapping && !isMaterializationBranch)
+                        buildSideBinaryDependencies = 
clusterWaitForMap.get(currentClusterId.getValue());
+                    if (inputs.size() == 2)
+                        isMaterializationBranch = false;
+                    //For binary join, the 1->0 branch will be retrieved 
firstly, and then 1->1.
+                    computeClusters(opRef, inputRef, currentClusterId, 
isMaterializationBranch);
+                    if (needMOMapping) {
+                        BitSet probeSideBinaryDependencies = new BitSet();
+                        if (null != 
clusterWaitForMap.get(currentClusterId.getValue()))
+                            
probeSideBinaryDependencies.or(clusterWaitForMap.get(currentClusterId.getValue()));
+                        if (null != buildSideBinaryDependencies)
+                            
probeSideBinaryDependencies.andNot(buildSideBinaryDependencies);
+                        selectedMOMap.put(opRef, new Pair<Boolean, 
BitSet>(false, probeSideBinaryDependencies));
+                    }
+                }
+            }
+        } else {
+            if (opRef.getValue().getOperatorTag() != 
LogicalOperatorTag.REPLICATE
+                    && opRef.getValue().getOperatorTag() != 
LogicalOperatorTag.DISTRIBUTE_RESULT)
+                throw new AlgebricksException("Unexpected replicate operator." 
+ opRef.getValue().getOperatorTag());
+            if (opRef.getValue().getInputs().size() != 1)
+                throw new AlgebricksException("Unexpected multiple output's 
input." + opRef.getValue().getOperatorTag());
+            if (needMOMapping && !selectedMOMap.containsKey(opRef))
+                throw new AlgebricksException("Unexpected seen raplicate 
operator." + opRef.getValue().getOperatorTag());
+            BitSet waitForList = 
clusterWaitForMap.get(currentClusterId.getValue());
+            BitSet dependedList = 
clusterWaitForMap.get(prevClusterId.getValue());
+            if (waitForList == null) {
+                waitForList = new BitSet();
+                clusterWaitForMap.put(currentClusterId.getValue(), 
waitForList);
+            }
+            if (labels.second[outputIndex] == 1 && labels.first[0] == 0) {
+                if (needMOMapping) {
+                    //selectedMOMap->replicate->second always maintains the 
replicate's dependencies.
+                    /*MutableInt newClusterId = new 
MutableInt(++lastUsedClusterId);
+                    waitForList.set(newClusterId.getValue());*/
+                    if (!isMaterializationBranch) {
+                        MutableInt newClusterId = new 
MutableInt(++lastUsedClusterId);
+                        waitForList.set(newClusterId.getValue());
+                        BitSet newDependency = new BitSet();
+                        newDependency.set(prevClusterId.getValue());
+                        clusterWaitForMap.put(newClusterId.getValue(), 
newDependency);
+                    } else {
+                        BitSet newDependency = new BitSet();
+                        newDependency.or(waitForList);
+                        newDependency.set(prevClusterId.getValue());
+                        clusterWaitForMap.put(currentClusterId.getValue(), 
newDependency);
+                    }
+                } else
+                    throw new AlgebricksException("Unexpected distribution." + 
opRef.getValue().getOperatorTag());
+
+            } else { //Merge the clusters if it is in father's 
non-materialization branch
+                if (needMOMapping) {
+                    if (null == dependedList)
+                        clusterWaitForMap.put(prevClusterId.getValue(), 
waitForList);
+                    else if (waitForList != null) {
+                        dependedList.or(waitForList);
+                        clusterWaitForMap.put(prevClusterId.getValue(), 
dependedList);
+                    }
                     for (BitSet bs : clusterWaitForMap.values()) {
                         if (bs.get(currentClusterId.getValue())) {
                             bs.clear(currentClusterId.getValue());
                             bs.set(prevClusterId.getValue());
                         }
                     }
-                    currentClusterId.setValue(prevClusterId.getValue());
+                    for (Entry<Mutable<ILogicalOperator>, MutableInt> entry : 
clusterMap.entrySet()) {
+                        if (entry.getValue().equals(currentClusterId))
+                            clusterMap.put(entry.getKey(), prevClusterId);
+                    }
                 }
             }
         }
diff --git 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
index b9c2fb1..aa5e0c9 100644
--- 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
+++ 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
@@ -36,9 +36,18 @@
 
 public class MaterializerTaskState extends AbstractStateObject {
     private RunFileWriter out;
+    static private long fileCreatedCount = 0;
+    static private String taskInfo;
 
+    @SuppressWarnings("static-access")
     public MaterializerTaskState(JobId jobId, TaskId taskId) {
         super(jobId, taskId);
+        if (this.taskInfo == null)
+            this.taskInfo = taskId.getActivityId().toString();
+        else if (this.taskInfo != null && 
this.taskInfo.equals(taskId.getActivityId().toString()))
+            fileCreatedCount++;
+        else
+            this.taskInfo = taskId.getActivityId().toString();
     }
 
     @Override
@@ -51,9 +60,13 @@
 
     }
 
+    public String getRefName() {
+        return out.getFileReference().getFile().getAbsolutePath();
+    }
+
     public void open(IHyracksTaskContext ctx) throws HyracksDataException {
         FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
-                MaterializerTaskState.class.getSimpleName());
+                MaterializerTaskState.class.getSimpleName() + "-" + taskInfo + 
"-" + fileCreatedCount + "-");
         out = new RunFileWriter(file, ctx.getIOManager());
         out.open();
     }
@@ -67,7 +80,7 @@
     }
 
     public void writeOut(IFrameWriter writer, IFrame frame) throws 
HyracksDataException {
-        RunFileReader in = out.createDeleteOnCloseReader();
+        RunFileReader in = out.createReader();
         writer.open();
         try {
             in.open();
diff --git 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
index 04b893a..ea48e49 100644
--- 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
+++ 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
@@ -47,6 +47,7 @@
     private boolean requiresMaterialization;
     private int numberOfNonMaterializedOutputs = 0;
     private int numberOfActiveMaterializeReaders = 0;
+    private int[] activeMaterializedCount = null;
 
     public SplitOperatorDescriptor(IOperatorDescriptorRegistry spec, 
RecordDescriptor rDesc, int outputArity) {
         this(spec, rDesc, outputArity, new boolean[outputArity]);
@@ -66,7 +67,6 @@
                 break;
             }
         }
-
     }
 
     @Override
@@ -121,6 +121,9 @@
                                 getActivityId(), partition));
                         state.open(ctx);
                     }
+                    if (activeMaterializedCount == null)
+                        activeMaterializedCount = new int[nPartitions];
+                    activeMaterializedCount[partition] = 
numberOfActiveMaterializeReaders;
                     for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
                         writers[i].open();
                     }
@@ -184,12 +187,11 @@
 
                 @Override
                 public void deinitialize() throws HyracksDataException {
-                    numberOfActiveMaterializeReaders--;
+                    activeMaterializedCount[partition]--;
                     MaterializerTaskState state = (MaterializerTaskState) 
ctx.getStateObject(new TaskId(new ActivityId(
                             getOperatorId(), 
SPLITTER_MATERIALIZER_ACTIVITY_ID), partition));
-                    if (numberOfActiveMaterializeReaders == 0) {
+                    if (activeMaterializedCount[partition] == 0)
                         state.deleteFile();
-                    }
                 }
             };
         }
diff --git 
a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/tokenizers/AbstractUTF8Token.java
 
b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/tokenizers/AbstractUTF8Token.java
index 9613fb9..cd0b0ce 100644
--- 
a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/tokenizers/AbstractUTF8Token.java
+++ 
b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/tokenizers/AbstractUTF8Token.java
@@ -96,7 +96,8 @@
      * @param startOffset
      * @param endOffset
      * @param tokenLength
-     * @param tokenCount  the count of this token in a document , or a record, 
or something else.
+     * @param tokenCount
+     *            the count of this token in a document , or a record, or 
something else.
      */
     @Override
     public void reset(byte[] data, int startOffset, int endOffset, int 
tokenLength, int tokenCount) {
@@ -115,13 +116,13 @@
 
     // The preChar and postChar are required to be a single byte utf8 char, 
e.g. ASCII char.
     protected void serializeToken(UTF8StringBuilder builder, GrowableArray 
out, int numPreChars, int numPostChars,
-            char preChar, char postChar)
-            throws IOException {
+            char preChar, char postChar) throws IOException {
 
         handleTokenTypeTag(out.getDataOutput());
 
         assert UTF8StringUtil.getModifiedUTF8Len(preChar) == 1 && 
UTF8StringUtil.getModifiedUTF8Len(postChar) == 1;
         int actualUtfLen = endOffset - startOffset;
+        actualUtfLen = (actualUtfLen >= 0) ? actualUtfLen : 0;
 
         builder.reset(out, actualUtfLen + numPreChars + numPostChars);
         // pre chars

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/530
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I2487078d1821d7ad85bb745bfa31024bcdbea1f1
Gerrit-PatchSet: 1
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: Wenhai Li <[email protected]>

Reply via email to