Wenhai Li has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/530
Change subject: Initial fuzzy join for inci's verification ...................................................................... Initial fuzzy join for inci's verification commit d9feb3ee13f907a0dcc27c0ad91dce92ad16c9a0 Author: Michael <[email protected]> Date: Wed Dec 2 03:22:49 2015 -0800 Initial fuzzyjoin setup for inci Change-Id: I2487078d1821d7ad85bb745bfa31024bcdbea1f1 --- M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java M algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java M hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/tokenizers/AbstractUTF8Token.java 6 files changed, 516 insertions(+), 33 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/hyracks refs/changes/30/530/1 diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java index 672e6d0..092750d 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java @@ -219,7 +219,8 @@ IOperatorDescriptorRegistry spec = builder.getJobSpec(); IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(gbyCols, aggOpInputEnv, context); - RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context); + RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, + context); IBinaryHashFunctionFactory[] hashFunctionFactories = JobGenHelper.variablesToBinaryHashFunctionFactories( gbyCols, aggOpInputEnv, context); diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java index 186ac6f..9185cba 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java @@ -25,7 +25,6 @@ import java.util.Map.Entry; import org.apache.commons.lang3.mutable.Mutable; - import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; @@ -75,6 +74,34 @@ ILogicalOperator op = opRef.getValue(); int n = op.getInputs().size(); IOperatorSchema[] schemas = new IOperatorSchema[n]; + for (int i = n - 1; i >= 0; i--) { + Mutable<ILogicalOperator> opRef2 = op.getInputs().get(i); + List<Mutable<ILogicalOperator>> parents = operatorVisitedToParents.get(opRef2); + if (parents == null) { + parents = new ArrayList<Mutable<ILogicalOperator>>(); + operatorVisitedToParents.put(opRef2, parents); + parents.add(opRef); + compileOpRef(opRef2, spec, builder, outerPlanSchema); + schemas[i] = context.getSchema(opRef2.getValue()); + } else { + if (!parents.contains(opRef)) + parents.add(opRef); + schemas[i] = context.getSchema(opRef2.getValue()); + continue; + } + } + IOperatorSchema opSchema = new OperatorSchemaImpl(); + context.putSchema(op, opSchema); + op.getVariablePropagationPolicy().propagateVariables(opSchema, schemas); + op.contributeRuntimeOperator(builder, context, opSchema, schemas, outerPlanSchema); + } + + @SuppressWarnings("unused") + private void compileOpRef1(Mutable<ILogicalOperator> opRef, IOperatorDescriptorRegistry spec, + IHyracksJobBuilder builder, IOperatorSchema outerPlanSchema) throws AlgebricksException { + ILogicalOperator op = opRef.getValue(); + int n = op.getInputs().size(); + IOperatorSchema[] schemas = new IOperatorSchema[n]; int i = 0; for (Mutable<ILogicalOperator> opRef2 : op.getInputs()) { List<Mutable<ILogicalOperator>> parents = operatorVisitedToParents.get(opRef2); diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java index 92a3691..6664f28 100644 --- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java +++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java @@ -22,13 +22,14 @@ import java.util.BitSet; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import org.apache.commons.lang3.mutable.Mutable; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.lang3.mutable.MutableObject; - import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; @@ -40,6 +41,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; @@ -58,7 +60,10 @@ private final List<List<Mutable<ILogicalOperator>>> equivalenceClasses = new ArrayList<List<Mutable<ILogicalOperator>>>(); private final HashMap<Mutable<ILogicalOperator>, BitSet> opToCandidateInputs = new HashMap<Mutable<ILogicalOperator>, BitSet>(); private final HashMap<Mutable<ILogicalOperator>, MutableInt> clusterMap = new HashMap<Mutable<ILogicalOperator>, MutableInt>(); + private final Map<MutableInt, List<Pair<Mutable<ILogicalOperator>, MutableInt>>> clusterOpMap = new HashMap<MutableInt, List<Pair<Mutable<ILogicalOperator>, MutableInt>>>(); + private final Map<LogicalOperatorTag, Integer> opTypeCount = new HashMap<LogicalOperatorTag, Integer>(); private final HashMap<Integer, BitSet> clusterWaitForMap = new HashMap<Integer, BitSet>(); + private final HashMap<Mutable<ILogicalOperator>, Pair<Boolean, BitSet>> selectedMOMap = new HashMap<Mutable<ILogicalOperator>, Pair<Boolean, BitSet>>(); private int lastUsedClusterId = 0; @Override @@ -99,6 +104,8 @@ opToCandidateInputs.clear(); clusterMap.clear(); clusterWaitForMap.clear(); + selectedMOMap.clear(); + clusterOpMap.clear(); lastUsedClusterId = 0; } while (changed); roots.clear(); @@ -125,8 +132,10 @@ private boolean rewrite(IOptimizationContext context) throws AlgebricksException { boolean changed = false; for (List<Mutable<ILogicalOperator>> members : equivalenceClasses) { - if (rewriteForOneEquivalentClass(members, context)) + if (rewriteForOneEquivalentClass(members, context)) { changed = true; + break; + } } return changed; } @@ -137,6 +146,7 @@ boolean rewritten = false; while (members.size() > 0) { group.clear(); + //adjustForDerivation(members); Mutable<ILogicalOperator> candidate = members.remove(members.size() - 1); group.add(candidate); for (int i = members.size() - 1; i >= 0; i--) { @@ -149,6 +159,13 @@ boolean[] materializationFlags = computeMaterilizationFlags(group); if (group.isEmpty()) { continue; + } + List<Pair<int[], int[]>> prevLabels = new ArrayList<Pair<int[], int[]>>(); + List<List<List<Pair<int[], int[]>>>> laterLabels = new ArrayList<List<List<Pair<int[], int[]>>>>(); + int level1 = 0; + for (int i = 0; i < group.size(); i++) { + AbstractLogicalOperator op = (AbstractLogicalOperator) group.get(i).getValue(); + prevLabels.add(op.getPhysicalOperator().getInputOutputDependencyLabels(group.get(i).getValue())); } candidate = group.get(0); ReplicateOperator rop = new ReplicateOperator(group.size(), materializationFlags); @@ -220,6 +237,7 @@ context.computeAndSetTypeEnvironmentForOperator(projectOperator); List<Mutable<ILogicalOperator>> parentOpList = childrenToParents.get(ref); + List<Integer> indexes = new ArrayList<Integer>(); for (Mutable<ILogicalOperator> parentOpRef : parentOpList) { AbstractLogicalOperator parentOp = (AbstractLogicalOperator) parentOpRef.getValue(); int index = parentOp.getInputs().indexOf(ref); @@ -231,7 +249,7 @@ parentOp = parentOpNext; } } - + indexes.add(index); ILogicalOperator childOp = parentOp.getOperatorTag() == LogicalOperatorTag.PROJECT ? assignOperator : projectOperator; if (parentOp.isMap()) { @@ -245,6 +263,31 @@ } context.computeAndSetTypeEnvironmentForOperator(parentOp); } + laterLabels.add(new ArrayList<List<Pair<int[], int[]>>>()); + int level2 = 0; + for (Mutable<ILogicalOperator> parentOpRef : parentOpList) { + AbstractLogicalOperator parentOp = (AbstractLogicalOperator) parentOpRef.getValue(); + int index = indexes.get(level2); + laterLabels.get(level1).add(new ArrayList<Pair<int[], int[]>>()); + laterLabels.get(level1).get(level2) + .add(parentOp.getPhysicalOperator().getInputOutputDependencyLabels(parentOp)); + ILogicalOperator next = parentOp.getInputs().get(index).getValue(); + do { + laterLabels + .get(level1) + .get(level2) + .add(((AbstractLogicalOperator) next).getPhysicalOperator() + .getInputOutputDependencyLabels(next)); + next = next.getInputs().get(0).getValue(); + } while (!next.equals(ropRef.getValue())); + laterLabels + .get(level1) + .get(level2) + .add(((AbstractLogicalOperator) ropRef.getValue()).getPhysicalOperator() + .getInputOutputDependencyLabels(ropRef.getValue())); + level2++; + } + level1++; } rewritten = true; } @@ -346,6 +389,127 @@ } } + private ILogicalOperator extractPKProduction(ILogicalOperator root, LogicalVariable pk) throws AlgebricksException { + ILogicalOperator prodOp = null; + for (Mutable<ILogicalOperator> opRef : root.getInputs()) { + List<LogicalVariable> producedVars = new ArrayList<LogicalVariable>(); + VariableUtilities.getProducedVariables(opRef.getValue(), producedVars); + if (producedVars != null && producedVars.contains(pk)) { + prodOp = opRef.getValue(); + break; + } else if (opRef.getValue().getInputs().size() > 0) { + prodOp = extractPKProduction(opRef.getValue(), pk); + if (prodOp != null) + break; + } + } + return prodOp; + } + + private void mergeHomogeneousPK(ILogicalOperator op, List<LogicalVariable> pks) throws AlgebricksException { + Map<LogicalVariable, ILogicalOperator> varOpMap = new HashMap<LogicalVariable, ILogicalOperator>(); + for (LogicalVariable pk : pks) { + ILogicalOperator mOp = extractPKProduction(op, pk); + if (mOp == null || !mOp.getOperatorTag().equals(LogicalOperatorTag.DATASOURCESCAN)) + throw new AlgebricksException("Illegal variable production."); + varOpMap.put(pk, mOp); + } + Map<LogicalVariable, LogicalVariable> variableMapping = new HashMap<LogicalVariable, LogicalVariable>(); + for (int i = 0; i < pks.size() - 1; i++) { + for (int j = i + 1; j < pks.size(); j++) { + DataSourceScanOperator dsopi = (DataSourceScanOperator) (varOpMap.get(pks.get(i))); + DataSourceScanOperator dsopj = (DataSourceScanOperator) (varOpMap.get(pks.get(j))); + if (dsopi.getDataSource().equals(dsopj.getDataSource())) + IsomorphismUtilities.mapVariablesTopDown(varOpMap.get(pks.get(i)), varOpMap.get(pks.get(j)), + variableMapping); + } + } + Iterator<LogicalVariable> itr = pks.iterator(); + while (itr.hasNext()) { + LogicalVariable pk = itr.next(); + if (variableMapping.containsKey(pk)) { + variableMapping.remove(pk); + itr.remove(); + } + } + } + + private void extractPrimaryKeys(IOptimizationContext context, ILogicalOperator root, List<LogicalVariable> prodVars) + throws AlgebricksException { + for (Mutable<ILogicalOperator> opRef : root.getInputs()) { + if (opRef.getValue().getOperatorTag().equals(LogicalOperatorTag.DATASOURCESCAN)) { + List<LogicalVariable> vars = new ArrayList<LogicalVariable>(); + VariableUtilities.getProducedVariables(opRef.getValue(), vars); + prodVars.addAll(vars); + } else + extractPrimaryKeys(context, opRef.getValue(), prodVars); + } + } + + private boolean isBinaryHomogeneous(IOptimizationContext context, ILogicalOperator candidate, ILogicalOperator peer) + throws AlgebricksException { + //Currently, support single pk only. + boolean isHomo = false; + if (candidate.getInputs().size() != peer.getInputs().size()) + throw new AlgebricksException("Unexpected isomophic result on: " + candidate + "-" + peer); + if (candidate.getInputs().size() < 2) + return true; + if (candidate.getOperatorTag().equals(LogicalOperatorTag.INNERJOIN)) { + if (peer.getOperatorTag().equals(LogicalOperatorTag.INNERJOIN) + && candidate.getSchema().size() == peer.getSchema().size()) + isHomo = true; + } else if (candidate.getOperatorTag().equals(LogicalOperatorTag.LEFTOUTERJOIN)) { + if (!peer.getOperatorTag().equals(LogicalOperatorTag.LEFTOUTERJOIN)) + throw new AlgebricksException("Unexpected isomophic lojs of: " + candidate + "-" + peer); + for (int i = 0; i < 2; i++) { + ILogicalOperator candOp = candidate.getInputs().get(i).getValue(); + ILogicalOperator peerOp = peer.getInputs().get(i).getValue(); + List<LogicalVariable> candProduce = new ArrayList<LogicalVariable>(); + extractPrimaryKeys(context, candOp, candProduce); + List<LogicalVariable> peerProduce = new ArrayList<LogicalVariable>(); + extractPrimaryKeys(context, peerOp, peerProduce); + List<LogicalVariable> candPKs = new ArrayList<LogicalVariable>(); + List<LogicalVariable> peerPKs = new ArrayList<LogicalVariable>(); + for (LogicalVariable var : candProduce) { + List<LogicalVariable> pks = context.findPrimaryKey(var); + if (pks == null) + continue; + for (LogicalVariable pk : pks) { + if (!candPKs.contains(pk)) + candPKs.add(pk); + } + } + for (LogicalVariable var : peerProduce) { + List<LogicalVariable> pks = context.findPrimaryKey(var); + if (pks == null) + continue; + for (LogicalVariable pk : pks) { + if (!peerPKs.contains(pk)) + peerPKs.add(pk); + } + } + mergeHomogeneousPK(candOp, candPKs); + mergeHomogeneousPK(peerOp, peerPKs); + if (candPKs.size() != peerPKs.size()) { + isHomo = false; + break; + } + List<LogicalVariable> biPKs = new ArrayList<LogicalVariable>(); + biPKs.addAll(candPKs); + biPKs.addAll(peerPKs); + for (Mutable<ILogicalOperator> root : roots) + mergeHomogeneousPK(root.getValue(), biPKs); + if (biPKs.size() == candPKs.size()) + isHomo = true; + else { + isHomo = false; + break; + } + } + } + return isHomo; + } + private void prune(IOptimizationContext context) throws AlgebricksException { List<List<Mutable<ILogicalOperator>>> previousEquivalenceClasses = new ArrayList<List<Mutable<ILogicalOperator>>>(); for (List<Mutable<ILogicalOperator>> candidates : equivalenceClasses) { @@ -365,7 +529,8 @@ equivalentClass.add(candidates.get(i)); for (int j = i - 1; j >= 0; j--) { ILogicalOperator peer = candidates.get(j).getValue(); - if (IsomorphismUtilities.isOperatorIsomorphic(candidate, peer)) { + if (isBinaryHomogeneous(context, candidate, peer) + && IsomorphismUtilities.isOperatorIsomorphic(candidate, peer)) { reserved[i] = true; reserved[j] = true; equivalentClass.add(candidates.get(j)); @@ -385,11 +550,210 @@ } } - private boolean[] computeMaterilizationFlags(List<Mutable<ILogicalOperator>> group) { + private boolean derivedFrom(ILogicalOperator father, ILogicalOperator child) { + boolean derived = false; + if (father.getInputs().size() > 0) { + for (Mutable<ILogicalOperator> next : father.getInputs()) { + if (next.getValue().equals(child)) + derived = true; + else + derived |= derivedFrom(next.getValue(), child); + } + } + return derived; + } + + private void removeIsomophicShared(List<Mutable<ILogicalOperator>> candidates) { + boolean existsIsomophicShared = false; + do { + for (int i = 0; i < candidates.size() - 1; i++) { + for (int j = i + 1; j < candidates.size(); j++) { + + } + } + } while (existsIsomophicShared); + } + + /*private void removeIsomophicShared(List<Mutable<ILogicalOperator>> candidates) { + //for (List<Mutable<ILogicalOperator>> candidates : equivalenceClasses) { + //adjustForDerivation(candidates); + boolean existsIsomophicShared = false; + do { + for (int i = 0; i < candidates.size() - 1; i++) { + for (int j = i + 1; j < candidates.size(); j++) { + MutableInt first = clusterMap.get(candidates.get(i)); + MutableInt second = clusterMap.get(candidates.get(j)); + if (clusterWaitForMap.get(first) != null + && clusterWaitForMap.get(first.getValue()).get(second.getValue())) + candidates.remove(i); + else if (clusterWaitForMap.get(second) != null + && clusterWaitForMap.get(second.getValue()).get(first.getValue())) + candidates.remove(j); + if (clusterMap.get(candidates.get(i)).equals(clusterMap.get(candidates.get(j)))) { + if (derivedFrom(candidates.get(i).getValue(), candidates.get(j).getValue())) { + candidates.remove(j); + existsIsomophicShared = true; + break; + } + } + } + } + } while (existsIsomophicShared); + //} + }*/ + + @SuppressWarnings("unused") + private void adjustForDerivation(List<Mutable<ILogicalOperator>> equivalentClass) { + int i = equivalentClass.size() - 1; + for (int j = 0; j < equivalentClass.size() - 1; j++) { + if (derivedFrom(equivalentClass.get(i).getValue(), equivalentClass.get(j).getValue())) { + Mutable<ILogicalOperator> op = equivalentClass.remove(j); + equivalentClass.add(op); + adjustForDerivation(equivalentClass); + } + } + } + + private boolean dependedOn(Integer father, Integer child) { + boolean depOn = false; + BitSet bs = clusterWaitForMap.get(father); + if (bs == null || bs.cardinality() == 0) + depOn = false; + else { + for (int i = bs.nextSetBit(0); i >= 0; i = bs.nextSetBit(i + 1)) { + if (child == i) { + depOn = true; + break; + } else + depOn |= dependedOn(i, child); + } + } + return depOn; + } + + private boolean isLeafActivity(Mutable<ILogicalOperator> father) { + int outputArity = 0; + if (father.getValue().getOperatorTag() == LogicalOperatorTag.REPLICATE) { + ReplicateOperator rop = (ReplicateOperator) father.getValue(); + List<Mutable<ILogicalOperator>> outputs = rop.getOutputs(); + for (outputArity = 0; outputArity < outputs.size(); outputArity++) { + if (outputs.get(outputArity).equals(father)) { + break; + } + } + } + boolean directToETS = true; + int inputArity = 0; + for (Mutable<ILogicalOperator> child : father.getValue().getInputs()) { + AbstractLogicalOperator aop = (AbstractLogicalOperator) child.getValue(); + Pair<int[], int[]> labels = aop.getPhysicalOperator().getInputOutputDependencyLabels(child.getValue()); + if (labels.second[outputArity] == 1 && labels.first[inputArity++] == 0) + directToETS = false; + else + directToETS &= isLeafActivity(child); + + } + return directToETS; + } + + private void loadAndVerifyOperators(Mutable<ILogicalOperator> root) { + for (Mutable<ILogicalOperator> opRef : root.getValue().getInputs()) { + if (!clusterMap.keySet().contains(opRef)) { + if (isLeafActivity(opRef)) + clusterOpMap.get(new MutableInt(0)).add( + new Pair<Mutable<ILogicalOperator>, MutableInt>(opRef, clusterMap.get(opRef))); + else + clusterOpMap.get(new MutableInt(lastUsedClusterId + 1)).add( + new Pair<Mutable<ILogicalOperator>, MutableInt>(opRef, clusterMap.get(opRef))); + } + } + } + + private void tmpVerifyForOneJoinTree(Mutable<ILogicalOperator> parent) { + for (Mutable<ILogicalOperator> child : parent.getValue().getInputs()) { + if (child.getValue().getOperatorTag().equals(LogicalOperatorTag.INNERJOIN)) + opTypeCount.put(LogicalOperatorTag.SUBPLAN, opTypeCount.get(LogicalOperatorTag.SUBPLAN) + 1); + if (child.getValue().getOperatorTag().equals(LogicalOperatorTag.LEFTOUTERJOIN)) + opTypeCount.put(LogicalOperatorTag.SCRIPT, opTypeCount.get(LogicalOperatorTag.SCRIPT) + 1); + if (child.getValue().getOperatorTag().equals(LogicalOperatorTag.INNERJOIN) + || child.getValue().getOperatorTag().equals(LogicalOperatorTag.LEFTOUTERJOIN)) { + if (!clusterMap.containsKey(child)) { + opTypeCount.put(LogicalOperatorTag.EMPTYTUPLESOURCE, + opTypeCount.get(LogicalOperatorTag.EMPTYTUPLESOURCE) + 1); + } + } + if (child.getValue().getInputs().size() > 0) + tmpVerifyForOneJoinTree(child); + } + } + + private void tmpVerifyJoinOp() { + for (Mutable<ILogicalOperator> root : roots) { + opTypeCount.put(LogicalOperatorTag.SUBPLAN, 0); + opTypeCount.put(LogicalOperatorTag.SCRIPT, 0); + opTypeCount.put(LogicalOperatorTag.EMPTYTUPLESOURCE, 0); + tmpVerifyForOneJoinTree(root); + } + } + + private void adjustGroupByDependency(List<Mutable<ILogicalOperator>> group) throws AlgebricksException { + //For sake of debugging the cover of the plan + opTypeCount.clear(); + tmpVerifyJoinOp(); + for (Entry<Mutable<ILogicalOperator>, MutableInt> entry : clusterMap.entrySet()) { + if (entry.getKey().getValue().getOperatorTag().equals(LogicalOperatorTag.INNERJOIN)) { + if (!opTypeCount.containsKey(LogicalOperatorTag.INNERJOIN)) + opTypeCount.put(LogicalOperatorTag.INNERJOIN, 0); + opTypeCount.put(LogicalOperatorTag.INNERJOIN, opTypeCount.get(LogicalOperatorTag.INNERJOIN) + 1); + } + if (entry.getKey().getValue().getOperatorTag().equals(LogicalOperatorTag.LEFTOUTERJOIN)) { + if (!opTypeCount.containsKey(LogicalOperatorTag.LEFTOUTERJOIN)) + opTypeCount.put(LogicalOperatorTag.LEFTOUTERJOIN, 0); + opTypeCount + .put(LogicalOperatorTag.LEFTOUTERJOIN, opTypeCount.get(LogicalOperatorTag.LEFTOUTERJOIN) + 1); + } + if (!clusterOpMap.containsKey(entry.getValue())) { + clusterOpMap.put(entry.getValue(), new ArrayList<Pair<Mutable<ILogicalOperator>, MutableInt>>()); + } + if (!clusterOpMap.get(entry.getValue()).contains( + new Pair<Mutable<ILogicalOperator>, MutableInt>(entry.getKey(), entry.getValue()))) + clusterOpMap.get(entry.getValue()).add( + new Pair<Mutable<ILogicalOperator>, MutableInt>(entry.getKey(), entry.getValue())); + } + if (clusterOpMap.containsKey(new MutableInt(0)) && clusterOpMap.get(new MutableInt(0)).size() > 0) + throw new AlgebricksException("Unexpected clusterOpMap." + clusterOpMap.get(new MutableInt(0))); + clusterOpMap.put(new MutableInt(0), new ArrayList<Pair<Mutable<ILogicalOperator>, MutableInt>>()); + clusterOpMap.put(new MutableInt(lastUsedClusterId + 1), + new ArrayList<Pair<Mutable<ILogicalOperator>, MutableInt>>()); + for (Mutable<ILogicalOperator> root : roots) { + loadAndVerifyOperators(root); + } + + List<Integer> groupClusterIds = new ArrayList<Integer>(group.size()); + for (int i = 0; i < group.size(); i++) { + groupClusterIds.add(clusterMap.get(group.get(i)).getValue()); + } + for (int i = groupClusterIds.size() - 1; i > 0; i--) { + for (int j = 0; j < i; j++) { + if (dependedOn(groupClusterIds.get(j), groupClusterIds.get(i))) { + Integer father = groupClusterIds.get(i); + groupClusterIds.set(i, groupClusterIds.get(j)); + groupClusterIds.set(j, father); + ILogicalOperator fatherOp = group.get(i).getValue(); + group.get(i).setValue(group.get(j).getValue()); + group.get(i).setValue(fatherOp); + } + } + } + } + + private boolean[] computeMaterilizationFlags(List<Mutable<ILogicalOperator>> group) throws AlgebricksException { lastUsedClusterId = 0; for (Mutable<ILogicalOperator> root : roots) { - computeClusters(null, root, new MutableInt(++lastUsedClusterId)); + computeClusters(null, root, new MutableInt(++lastUsedClusterId), true); } + removeIsomophicShared(group); + adjustGroupByDependency(group); boolean[] materializationFlags = new boolean[group.size()]; boolean worthMaterialization = worthMaterialization(group.get(0)); boolean requiresMaterialization; @@ -409,6 +773,16 @@ if (group.size() < 2) { group.clear(); } + /*for (int i = 0; i < group.size() - 1; i++) { + for (int j = i + 1; j < group.size(); j++) { + if (clusterMap.get(group.get(i)).equals(clusterMap.get(group.get(j))) + && (materializationFlags[i] == true || materializationFlags[j] == true)) { + materializationFlags[i] = true; + materializationFlags[j] = true; + throw new AlgebricksException("Unexpected isomophic ops." + group.get(i) + " and " + group.get(j)); + } + } + }*/ // if does not worth materialization, the flags for the remaining candidates should be false return worthMaterialization ? materializationFlags : new boolean[group.size()]; } @@ -441,14 +815,16 @@ } private void computeClusters(Mutable<ILogicalOperator> parentRef, Mutable<ILogicalOperator> opRef, - MutableInt currentClusterId) { + MutableInt currentClusterId, boolean isMaterializationBranch) throws AlgebricksException { // only replicate operator has multiple outputs + boolean needMOMapping = false; int outputIndex = 0; if (opRef.getValue().getOperatorTag() == LogicalOperatorTag.REPLICATE) { ReplicateOperator rop = (ReplicateOperator) opRef.getValue(); List<Mutable<ILogicalOperator>> outputs = rop.getOutputs(); for (outputIndex = 0; outputIndex < outputs.size(); outputIndex++) { if (outputs.get(outputIndex).equals(parentRef)) { + needMOMapping = true; break; } } @@ -456,35 +832,98 @@ AbstractLogicalOperator aop = (AbstractLogicalOperator) opRef.getValue(); Pair<int[], int[]> labels = aop.getPhysicalOperator().getInputOutputDependencyLabels(opRef.getValue()); List<Mutable<ILogicalOperator>> inputs = opRef.getValue().getInputs(); - for (int i = 0; i < inputs.size(); i++) { - Mutable<ILogicalOperator> inputRef = inputs.get(i); - if (labels.second[outputIndex] == 1 && labels.first[i] == 0) { // 1 -> 0 - if (labels.second.length == 1) { - clusterMap.put(opRef, currentClusterId); - // start a new cluster + MutableInt prevClusterId = clusterMap.get(opRef); + if (prevClusterId == null) { + if (selectedMOMap.containsKey(opRef)) + throw new AlgebricksException("Unexpected mapping replicate." + opRef.getValue().getOperatorTag()); + clusterMap.put(opRef, currentClusterId); + for (int i = inputs.size() - 1; i >= 0; i--) { + Mutable<ILogicalOperator> inputRef = inputs.get(i); + if (labels.second[outputIndex] == 1 && labels.first[i] == 0) { MutableInt newClusterId = new MutableInt(++lastUsedClusterId); - computeClusters(opRef, inputRef, newClusterId); BitSet waitForList = clusterWaitForMap.get(currentClusterId.getValue()); if (waitForList == null) { waitForList = new BitSet(); clusterWaitForMap.put(currentClusterId.getValue(), waitForList); } waitForList.set(newClusterId.getValue()); - } - } else { // 0 -> 0 and 1 -> 1 - MutableInt prevClusterId = clusterMap.get(opRef); - if (prevClusterId == null || prevClusterId.getValue().equals(currentClusterId.getValue())) { - clusterMap.put(opRef, currentClusterId); - computeClusters(opRef, inputRef, currentClusterId); + computeClusters(opRef, inputRef, newClusterId, true); + if (needMOMapping) { + clusterMap.put(opRef, newClusterId); + // for replicate operator <materialized, dependencies> + selectedMOMap.put(opRef, + new Pair<Boolean, BitSet>(true, clusterWaitForMap.get(newClusterId.getValue()))); + } } else { - // merge prevClusterId and currentClusterId: update all the map entries that has currentClusterId to prevClusterId + BitSet buildSideBinaryDependencies = new BitSet(); + if (needMOMapping && !isMaterializationBranch) + buildSideBinaryDependencies = clusterWaitForMap.get(currentClusterId.getValue()); + if (inputs.size() == 2) + isMaterializationBranch = false; + //For binary join, the 1->0 branch will be retrieved firstly, and then 1->1. + computeClusters(opRef, inputRef, currentClusterId, isMaterializationBranch); + if (needMOMapping) { + BitSet probeSideBinaryDependencies = new BitSet(); + if (null != clusterWaitForMap.get(currentClusterId.getValue())) + probeSideBinaryDependencies.or(clusterWaitForMap.get(currentClusterId.getValue())); + if (null != buildSideBinaryDependencies) + probeSideBinaryDependencies.andNot(buildSideBinaryDependencies); + selectedMOMap.put(opRef, new Pair<Boolean, BitSet>(false, probeSideBinaryDependencies)); + } + } + } + } else { + if (opRef.getValue().getOperatorTag() != LogicalOperatorTag.REPLICATE + && opRef.getValue().getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) + throw new AlgebricksException("Unexpected replicate operator." + opRef.getValue().getOperatorTag()); + if (opRef.getValue().getInputs().size() != 1) + throw new AlgebricksException("Unexpected multiple output's input." + opRef.getValue().getOperatorTag()); + if (needMOMapping && !selectedMOMap.containsKey(opRef)) + throw new AlgebricksException("Unexpected seen raplicate operator." + opRef.getValue().getOperatorTag()); + BitSet waitForList = clusterWaitForMap.get(currentClusterId.getValue()); + BitSet dependedList = clusterWaitForMap.get(prevClusterId.getValue()); + if (waitForList == null) { + waitForList = new BitSet(); + clusterWaitForMap.put(currentClusterId.getValue(), waitForList); + } + if (labels.second[outputIndex] == 1 && labels.first[0] == 0) { + if (needMOMapping) { + //selectedMOMap->replicate->second always maintains the replicate's dependencies. + /*MutableInt newClusterId = new MutableInt(++lastUsedClusterId); + waitForList.set(newClusterId.getValue());*/ + if (!isMaterializationBranch) { + MutableInt newClusterId = new MutableInt(++lastUsedClusterId); + waitForList.set(newClusterId.getValue()); + BitSet newDependency = new BitSet(); + newDependency.set(prevClusterId.getValue()); + clusterWaitForMap.put(newClusterId.getValue(), newDependency); + } else { + BitSet newDependency = new BitSet(); + newDependency.or(waitForList); + newDependency.set(prevClusterId.getValue()); + clusterWaitForMap.put(currentClusterId.getValue(), newDependency); + } + } else + throw new AlgebricksException("Unexpected distribution." + opRef.getValue().getOperatorTag()); + + } else { //Merge the clusters if it is in father's non-materialization branch + if (needMOMapping) { + if (null == dependedList) + clusterWaitForMap.put(prevClusterId.getValue(), waitForList); + else if (waitForList != null) { + dependedList.or(waitForList); + clusterWaitForMap.put(prevClusterId.getValue(), dependedList); + } for (BitSet bs : clusterWaitForMap.values()) { if (bs.get(currentClusterId.getValue())) { bs.clear(currentClusterId.getValue()); bs.set(prevClusterId.getValue()); } } - currentClusterId.setValue(prevClusterId.getValue()); + for (Entry<Mutable<ILogicalOperator>, MutableInt> entry : clusterMap.entrySet()) { + if (entry.getValue().equals(currentClusterId)) + clusterMap.put(entry.getKey(), prevClusterId); + } } } } diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java index b9c2fb1..aa5e0c9 100644 --- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java +++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java @@ -36,9 +36,18 @@ public class MaterializerTaskState extends AbstractStateObject { private RunFileWriter out; + static private long fileCreatedCount = 0; + static private String taskInfo; + @SuppressWarnings("static-access") public MaterializerTaskState(JobId jobId, TaskId taskId) { super(jobId, taskId); + if (this.taskInfo == null) + this.taskInfo = taskId.getActivityId().toString(); + else if (this.taskInfo != null && this.taskInfo.equals(taskId.getActivityId().toString())) + fileCreatedCount++; + else + this.taskInfo = taskId.getActivityId().toString(); } @Override @@ -51,9 +60,13 @@ } + public String getRefName() { + return out.getFileReference().getFile().getAbsolutePath(); + } + public void open(IHyracksTaskContext ctx) throws HyracksDataException { FileReference file = ctx.getJobletContext().createManagedWorkspaceFile( - MaterializerTaskState.class.getSimpleName()); + MaterializerTaskState.class.getSimpleName() + "-" + taskInfo + "-" + fileCreatedCount + "-"); out = new RunFileWriter(file, ctx.getIOManager()); out.open(); } @@ -67,7 +80,7 @@ } public void writeOut(IFrameWriter writer, IFrame frame) throws HyracksDataException { - RunFileReader in = out.createDeleteOnCloseReader(); + RunFileReader in = out.createReader(); writer.open(); try { in.open(); diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java index 04b893a..ea48e49 100644 --- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java +++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java @@ -47,6 +47,7 @@ private boolean requiresMaterialization; private int numberOfNonMaterializedOutputs = 0; private int numberOfActiveMaterializeReaders = 0; + private int[] activeMaterializedCount = null; public SplitOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity) { this(spec, rDesc, outputArity, new boolean[outputArity]); @@ -66,7 +67,6 @@ break; } } - } @Override @@ -121,6 +121,9 @@ getActivityId(), partition)); state.open(ctx); } + if (activeMaterializedCount == null) + activeMaterializedCount = new int[nPartitions]; + activeMaterializedCount[partition] = numberOfActiveMaterializeReaders; for (int i = 0; i < numberOfNonMaterializedOutputs; i++) { writers[i].open(); } @@ -184,12 +187,11 @@ @Override public void deinitialize() throws HyracksDataException { - numberOfActiveMaterializeReaders--; + activeMaterializedCount[partition]--; MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(new TaskId(new ActivityId( getOperatorId(), SPLITTER_MATERIALIZER_ACTIVITY_ID), partition)); - if (numberOfActiveMaterializeReaders == 0) { + if (activeMaterializedCount[partition] == 0) state.deleteFile(); - } } }; } diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/tokenizers/AbstractUTF8Token.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/tokenizers/AbstractUTF8Token.java index 9613fb9..cd0b0ce 100644 --- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/tokenizers/AbstractUTF8Token.java +++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/tokenizers/AbstractUTF8Token.java @@ -96,7 +96,8 @@ * @param startOffset * @param endOffset * @param tokenLength - * @param tokenCount the count of this token in a document , or a record, or something else. + * @param tokenCount + * the count of this token in a document , or a record, or something else. */ @Override public void reset(byte[] data, int startOffset, int endOffset, int tokenLength, int tokenCount) { @@ -115,13 +116,13 @@ // The preChar and postChar are required to be a single byte utf8 char, e.g. ASCII char. protected void serializeToken(UTF8StringBuilder builder, GrowableArray out, int numPreChars, int numPostChars, - char preChar, char postChar) - throws IOException { + char preChar, char postChar) throws IOException { handleTokenTypeTag(out.getDataOutput()); assert UTF8StringUtil.getModifiedUTF8Len(preChar) == 1 && UTF8StringUtil.getModifiedUTF8Len(postChar) == 1; int actualUtfLen = endOffset - startOffset; + actualUtfLen = (actualUtfLen >= 0) ? actualUtfLen : 0; builder.reset(out, actualUtfLen + numPreChars + numPostChars); // pre chars -- To view, visit https://asterix-gerrit.ics.uci.edu/530 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I2487078d1821d7ad85bb745bfa31024bcdbea1f1 Gerrit-PatchSet: 1 Gerrit-Project: hyracks Gerrit-Branch: master Gerrit-Owner: Wenhai Li <[email protected]>
