KYLIN-2783 refactor CuboidScheduler
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/79711218 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/79711218 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/79711218 Branch: refs/heads/2.1.x Commit: 79711218f14c74eee127c911aaeeca513e6a8793 Parents: bf887a7 Author: Li Yang <liy...@apache.org> Authored: Thu Aug 10 16:19:25 2017 +0800 Committer: Li Yang <liy...@apache.org> Committed: Thu Aug 10 16:43:03 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 4 + .../main/resources/kylin-defaults.properties | 2 + .../org/apache/kylin/cube/cuboid/Cuboid.java | 2 +- .../org/apache/kylin/cube/cuboid/CuboidCLI.java | 4 +- .../kylin/cube/cuboid/CuboidScheduler.java | 266 +++---------------- .../cube/cuboid/DefaultCuboidScheduler.java | 263 ++++++++++++++++++ .../cube/inmemcubing/InMemCubeBuilder.java | 4 +- .../org/apache/kylin/cube/model/CubeDesc.java | 36 +-- .../org/apache/kylin/cube/util/CubingUtils.java | 25 +- .../kylin/cube/cuboid/CuboidSchedulerTest.java | 34 +-- .../kylin/engine/mr/common/CubeStatsReader.java | 2 +- .../mr/steps/FactDistinctColumnsMapper.java | 2 +- .../kylin/engine/mr/steps/NDCuboidMapper.java | 8 +- .../engine/spark/KylinKryoRegistrator.java | 2 +- .../apache/kylin/engine/spark/SparkCubing.java | 5 +- .../kylin/engine/spark/SparkCubingByLayer.java | 21 +- 16 files changed, 357 insertions(+), 323 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/79711218/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 179d61f..3a06571 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -329,6 +329,10 @@ abstract public class KylinConfigBase implements Serializable { // CUBE // ============================================================================ + public String getCuboidScheduler() { + return getOptional("kylin.cube.cuboid-scheduler", "org.apache.kylin.cube.cuboid.DefaultCuboidScheduler"); + } + public double getJobCuboidSizeRatio() { return Double.parseDouble(getOptional("kylin.cube.size-estimate-ratio", "0.25")); } http://git-wip-us.apache.org/repos/asf/kylin/blob/79711218/core-common/src/main/resources/kylin-defaults.properties ---------------------------------------------------------------------- diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties index ee25637..7c421f9 100644 --- a/core-common/src/main/resources/kylin-defaults.properties +++ b/core-common/src/main/resources/kylin-defaults.properties @@ -145,6 +145,8 @@ kylin.engine.mr.uhc-reducer-count=1 ### CUBE | DICTIONARY ### +kylin.cube.cuboid-scheduler=org.apache.kylin.cube.cuboid.DefaultCuboidScheduler + # 'auto', 'inmem', 'layer' or 'random' for testing kylin.cube.algorithm=layer http://git-wip-us.apache.org/repos/asf/kylin/blob/79711218/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java index 76cb511..b71608c 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java @@ -135,7 +135,7 @@ public class Cuboid implements Comparable<Cuboid>, Serializable { return onTreeCandi; } - return new CuboidScheduler(cubeDesc).getValidParent(onTreeCandi); + return cubeDesc.getCuboidScheduler().findBestMatchCuboid(onTreeCandi); } private static Long translateToOnTreeCuboid(AggregationGroup agg, long cuboidID) { http://git-wip-us.apache.org/repos/asf/kylin/blob/79711218/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidCLI.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidCLI.java index e2ff97e..d657a43 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidCLI.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidCLI.java @@ -44,7 +44,7 @@ public class CuboidCLI { } public static int simulateCuboidGeneration(CubeDesc cubeDesc, boolean validate) { - CuboidScheduler scheduler = new CuboidScheduler(cubeDesc); + CuboidScheduler scheduler = cubeDesc.getCuboidScheduler(); long baseCuboid = Cuboid.getBaseCuboidId(cubeDesc); Collection<Long> cuboidSet = new TreeSet<Long>(); cuboidSet.add(baseCuboid); @@ -136,7 +136,7 @@ public class CuboidCLI { int levels = cube.getBuildLevel(); int[] allLevelCounts = new int[levels + 1]; - CuboidScheduler scheduler = new CuboidScheduler(cube); + CuboidScheduler scheduler = cube.getCuboidScheduler(); LinkedList<Long> nextQueue = new LinkedList<Long>(); LinkedList<Long> currentQueue = new LinkedList<Long>(); long baseCuboid = Cuboid.getBaseCuboidId(cube); http://git-wip-us.apache.org/repos/asf/kylin/blob/79711218/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java index def3f03..1d8f589 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,73 +18,54 @@ package org.apache.kylin.cube.cuboid; -import java.io.Serializable; - -/** - */ - -import java.util.ArrayDeque; -import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Queue; import java.util.Set; -import javax.annotation.Nullable; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.cube.model.AggregationGroup; +import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.cube.model.CubeDesc; import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.collect.Iterators; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -@SuppressWarnings("serial") -public class CuboidScheduler implements Serializable { - private final CubeDesc cubeDesc; - private final long max; - private List<List<Long>> cuboidsByLayer; - - public CuboidScheduler(CubeDesc cubeDesc) { - this.cubeDesc = cubeDesc; - int size = this.cubeDesc.getRowkey().getRowKeyColumns().length; - this.max = (long) Math.pow(2, size) - 1; - } - - public int getCuboidCount() { - return cubeDesc.getAllCuboids().size(); - } - - public List<Long> getSpanningCuboid(long cuboid) { - if (cuboid > max || cuboid < 0) { - throw new IllegalArgumentException("Cuboid " + cuboid + " is out of scope 0-" + max); - } - List<Long> spanning = cubeDesc.getParent2Child().get(cuboid); - if (spanning == null) { - return Collections.EMPTY_LIST; +/** + * Defines a cuboid tree, rooted by the base cuboid. A parent cuboid generates its child cuboids. + */ +abstract public class CuboidScheduler { + + public static CuboidScheduler getInstance(CubeDesc cubeDesc) { + String clzName = cubeDesc.getConfig().getCuboidScheduler(); + try { + Class<? extends CuboidScheduler> clz = ClassUtil.forName(clzName, CuboidScheduler.class); + return clz.getConstructor(CubeDesc.class).newInstance(cubeDesc); + } catch (Exception e) { + throw new RuntimeException(e); } - return spanning; } - - public int getCardinality(long cuboid) { - if (cuboid > max || cuboid < 0) { - throw new IllegalArgumentException("Cubiod " + cuboid + " is out of scope 0-" + max); - } - - return Long.bitCount(cuboid); + + // ============================================================================ + + final protected CubeDesc cubeDesc; + + protected CuboidScheduler(CubeDesc cubeDesc) { + this.cubeDesc = cubeDesc; } - public List<Long> getAllCuboidIds() { - return Lists.newArrayList(cubeDesc.getAllCuboids()); - } + /** Returns all cuboids on the tree. */ + abstract public Set<Long> getAllCuboidIds(); + + /** Returns the number of all cuboids. */ + abstract public int getCuboidCount(); + + /** Returns the child cuboids of a parent. */ + abstract public List<Long> getSpanningCuboid(long parentCuboid); + + /** Returns a cuboid on the tree that best matches the request cuboid. */ + abstract public long findBestMatchCuboid(long requestCuboid); + + // ============================================================================ + + private transient List<List<Long>> cuboidsByLayer; /** * Get cuboids by layer. It's built from pre-expanding tree. @@ -120,177 +101,4 @@ public class CuboidScheduler implements Serializable { return cuboidsByLayer; } - /** - * Collect cuboid from bottom up, considering all factor including black list - * Build tree steps: - * 1. Build tree from bottom up considering dim capping - * 2. Kick out blacked-out cuboids from the tree - * 3. Make sure all the cuboids have proper "parent", if not add it to the tree. - * Direct parent is not necessary, can jump *forward* steps to find in-direct parent. - * For example, forward = 1, grandparent can also be the parent. Only if both parent - * and grandparent are missing, add grandparent to the tree. - * @return Cuboid collection - */ - public Pair<Set<Long>, Map<Long, List<Long>>> buildTreeBottomUp() { - int forward = cubeDesc.getParentForward(); - KylinConfig config = cubeDesc.getConfig(); - - Set<Long> cuboidHolder = new HashSet<>(); - - // build tree structure - Set<Long> children = getLowestCuboids(); - long maxCombination = config.getCubeAggrGroupMaxCombination() * 10; - maxCombination = maxCombination < 0 ? Long.MAX_VALUE : maxCombination; - while (!children.isEmpty()) { - if (cuboidHolder.size() > maxCombination) { - throw new IllegalStateException("Too many cuboids for the cube. Cuboid combination reached " + cuboidHolder.size() + " and limit is " + maxCombination + ". Abort calculation."); - } - cuboidHolder.addAll(children); - children = getOnTreeParentsByLayer(children); - } - cuboidHolder.add(Cuboid.getBaseCuboidId(cubeDesc)); - - // kick off blacked - cuboidHolder = Sets.newHashSet(Iterators.filter(cuboidHolder.iterator(), new Predicate<Long>() { - @Override - public boolean apply(@Nullable Long cuboidId) { - return !cubeDesc.isBlackedCuboid(cuboidId); - } - })); - - // fill padding cuboids - Map<Long, List<Long>> parent2Child = Maps.newHashMap(); - Queue<Long> cuboidScan = new ArrayDeque<>(); - cuboidScan.addAll(cuboidHolder); - while (!cuboidScan.isEmpty()) { - long current = cuboidScan.poll(); - long parent = getParentOnPromise(current, cuboidHolder, forward); - - if (parent > 0) { - if (!cuboidHolder.contains(parent)) { - cuboidScan.add(parent); - cuboidHolder.add(parent); - } - if (parent2Child.containsKey(parent)) { - parent2Child.get(parent).add(current); - } else { - parent2Child.put(parent, Lists.newArrayList(current)); - } - } - } - - return Pair.newPair(cuboidHolder, parent2Child); - } - - private long getParentOnPromise(long child, Set<Long> coll, int forward) { - long parent = getOnTreeParent(child); - if (parent < 0) { - return -1; - } - - if (coll.contains(parent) || forward == 0) { - return parent; - } - - return getParentOnPromise(parent, coll, forward - 1); - } - - /** - * Get the parent cuboid really on the spanning tree. - * @param child an on-tree cuboid - * @return - */ - public long getValidParent(long child) { - long parent = getOnTreeParent(child); - while (parent > 0) { - if (cubeDesc.getAllCuboids().contains(parent)) { - break; - } - parent = getOnTreeParent(parent); - } - - if (parent <= 0) { - throw new IllegalStateException("Can't find valid parent for Cuboid " + child); - } - return parent; - } - - private long getOnTreeParent(long child) { - Collection<Long> candidates = getOnTreeParents(child); - if (candidates == null || candidates.isEmpty()) { - return -1; - } - return Collections.min(candidates, Cuboid.cuboidSelectComparator); - } - - /** - * Get all parent for children cuboids, considering dim cap. - * @param children children cuboids - * @return all parents cuboids - */ - private Set<Long> getOnTreeParentsByLayer(Collection<Long> children) { - Set<Long> parents = new HashSet<>(); - for (long child : children) { - parents.addAll(getOnTreeParents(child)); - } - parents = Sets.newHashSet(Iterators.filter(parents.iterator(), new Predicate<Long>() { - @Override - public boolean apply(@Nullable Long cuboidId) { - if (cuboidId == Cuboid.getBaseCuboidId(cubeDesc)) { - return true; - } - - for (AggregationGroup agg : cubeDesc.getAggregationGroups()) { - if (agg.isOnTree(cuboidId) && agg.checkDimCap(cuboidId)) { - return true; - } - } - - return false; - } - })); - return parents; - } - - /** - * Get all *possible* parent for a child cuboid - * @param child Child cuboid ID - * @return all *possible* parent cuboids - */ - private Set<Long> getOnTreeParents(long child) { - List<AggregationGroup> aggrs = Lists.newArrayList(); - for (AggregationGroup agg : cubeDesc.getAggregationGroups()) { - if (agg.isOnTree(child)) { - aggrs.add(agg); - } - } - - return getOnTreeParents(child, aggrs); - } - - /** - * Get lowest (not Cube building level) Cuboids for every Agg group - * @return lowest level cuboids - */ - private Set<Long> getLowestCuboids() { - return getOnTreeParents(0L, cubeDesc.getAggregationGroups()); - } - - private Set<Long> getOnTreeParents(long child, Collection<AggregationGroup> groups) { - Set<Long> parentCandidate = new HashSet<>(); - - if (child == Cuboid.getBaseCuboidId(cubeDesc)) { - return parentCandidate; - } - - for (AggregationGroup agg : groups) { - if (child == agg.getPartialCubeFullMask()) { - parentCandidate.add(Cuboid.getBaseCuboidId(cubeDesc)); - return parentCandidate; - } - parentCandidate.addAll(AggregationGroupScheduler.getOnTreeParents(child, agg)); - } - - return parentCandidate; - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/79711218/core-cube/src/main/java/org/apache/kylin/cube/cuboid/DefaultCuboidScheduler.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/DefaultCuboidScheduler.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/DefaultCuboidScheduler.java new file mode 100644 index 0000000..859ad20 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/DefaultCuboidScheduler.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.cube.cuboid; + +import java.io.Serializable; + +/** + */ + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +import javax.annotation.Nullable; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.model.AggregationGroup; +import org.apache.kylin.cube.model.CubeDesc; + +import com.google.common.base.Predicate; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +@SuppressWarnings("serial") +public class DefaultCuboidScheduler extends CuboidScheduler implements Serializable { + private final long max; + private final Set<Long> allCuboidIds; + private final Map<Long, List<Long>> parent2child; + + + public DefaultCuboidScheduler(CubeDesc cubeDesc) { + super(cubeDesc); + + int size = this.cubeDesc.getRowkey().getRowKeyColumns().length; + this.max = (long) Math.pow(2, size) - 1; + + Pair<Set<Long>, Map<Long, List<Long>>> pair = buildTreeBottomUp(); + this.allCuboidIds = pair.getFirst(); + this.parent2child = pair.getSecond(); + } + + @Override + public int getCuboidCount() { + return allCuboidIds.size(); + } + + @Override + public List<Long> getSpanningCuboid(long cuboid) { + if (cuboid > max || cuboid < 0) { + throw new IllegalArgumentException("Cuboid " + cuboid + " is out of scope 0-" + max); + } + + List<Long> spanning = parent2child.get(cuboid); + if (spanning == null) { + return Collections.EMPTY_LIST; + } + return spanning; + } + + @Override + public Set<Long> getAllCuboidIds() { + return Sets.newHashSet(allCuboidIds); + } + + /** + * Collect cuboid from bottom up, considering all factor including black list + * Build tree steps: + * 1. Build tree from bottom up considering dim capping + * 2. Kick out blacked-out cuboids from the tree + * 3. Make sure all the cuboids have proper "parent", if not add it to the tree. + * Direct parent is not necessary, can jump *forward* steps to find in-direct parent. + * For example, forward = 1, grandparent can also be the parent. Only if both parent + * and grandparent are missing, add grandparent to the tree. + * @return Cuboid collection + */ + private Pair<Set<Long>, Map<Long, List<Long>>> buildTreeBottomUp() { + int forward = cubeDesc.getParentForward(); + KylinConfig config = cubeDesc.getConfig(); + + Set<Long> cuboidHolder = new HashSet<>(); + + // build tree structure + Set<Long> children = getLowestCuboids(); + long maxCombination = config.getCubeAggrGroupMaxCombination() * 10; + maxCombination = maxCombination < 0 ? Long.MAX_VALUE : maxCombination; + while (!children.isEmpty()) { + if (cuboidHolder.size() > maxCombination) { + throw new IllegalStateException("Too many cuboids for the cube. Cuboid combination reached " + cuboidHolder.size() + " and limit is " + maxCombination + ". Abort calculation."); + } + cuboidHolder.addAll(children); + children = getOnTreeParentsByLayer(children); + } + cuboidHolder.add(Cuboid.getBaseCuboidId(cubeDesc)); + + // kick off blacked + cuboidHolder = Sets.newHashSet(Iterators.filter(cuboidHolder.iterator(), new Predicate<Long>() { + @Override + public boolean apply(@Nullable Long cuboidId) { + return !cubeDesc.isBlackedCuboid(cuboidId); + } + })); + + // fill padding cuboids + Map<Long, List<Long>> parent2Child = Maps.newHashMap(); + Queue<Long> cuboidScan = new ArrayDeque<>(); + cuboidScan.addAll(cuboidHolder); + while (!cuboidScan.isEmpty()) { + long current = cuboidScan.poll(); + long parent = getParentOnPromise(current, cuboidHolder, forward); + + if (parent > 0) { + if (!cuboidHolder.contains(parent)) { + cuboidScan.add(parent); + cuboidHolder.add(parent); + } + if (parent2Child.containsKey(parent)) { + parent2Child.get(parent).add(current); + } else { + parent2Child.put(parent, Lists.newArrayList(current)); + } + } + } + + return Pair.newPair(cuboidHolder, parent2Child); + } + + private long getParentOnPromise(long child, Set<Long> coll, int forward) { + long parent = getOnTreeParent(child); + if (parent < 0) { + return -1; + } + + if (coll.contains(parent) || forward == 0) { + return parent; + } + + return getParentOnPromise(parent, coll, forward - 1); + } + + /** + * Get the parent cuboid really on the spanning tree. + * @param child an on-tree cuboid + * @return + */ + @Override + public long findBestMatchCuboid(long child) { + long parent = getOnTreeParent(child); + while (parent > 0) { + if (cubeDesc.getAllCuboids().contains(parent)) { + break; + } + parent = getOnTreeParent(parent); + } + + if (parent <= 0) { + throw new IllegalStateException("Can't find valid parent for Cuboid " + child); + } + return parent; + } + + private long getOnTreeParent(long child) { + Collection<Long> candidates = getOnTreeParents(child); + if (candidates == null || candidates.isEmpty()) { + return -1; + } + return Collections.min(candidates, Cuboid.cuboidSelectComparator); + } + + /** + * Get all parent for children cuboids, considering dim cap. + * @param children children cuboids + * @return all parents cuboids + */ + private Set<Long> getOnTreeParentsByLayer(Collection<Long> children) { + Set<Long> parents = new HashSet<>(); + for (long child : children) { + parents.addAll(getOnTreeParents(child)); + } + parents = Sets.newHashSet(Iterators.filter(parents.iterator(), new Predicate<Long>() { + @Override + public boolean apply(@Nullable Long cuboidId) { + if (cuboidId == Cuboid.getBaseCuboidId(cubeDesc)) { + return true; + } + + for (AggregationGroup agg : cubeDesc.getAggregationGroups()) { + if (agg.isOnTree(cuboidId) && agg.checkDimCap(cuboidId)) { + return true; + } + } + + return false; + } + })); + return parents; + } + + /** + * Get all *possible* parent for a child cuboid + * @param child Child cuboid ID + * @return all *possible* parent cuboids + */ + private Set<Long> getOnTreeParents(long child) { + List<AggregationGroup> aggrs = Lists.newArrayList(); + for (AggregationGroup agg : cubeDesc.getAggregationGroups()) { + if (agg.isOnTree(child)) { + aggrs.add(agg); + } + } + + return getOnTreeParents(child, aggrs); + } + + /** + * Get lowest (not Cube building level) Cuboids for every Agg group + * @return lowest level cuboids + */ + private Set<Long> getLowestCuboids() { + return getOnTreeParents(0L, cubeDesc.getAggregationGroups()); + } + + private Set<Long> getOnTreeParents(long child, Collection<AggregationGroup> groups) { + Set<Long> parentCandidate = new HashSet<>(); + + if (child == Cuboid.getBaseCuboidId(cubeDesc)) { + return parentCandidate; + } + + for (AggregationGroup agg : groups) { + if (child == agg.getPartialCubeFullMask()) { + parentCandidate.add(Cuboid.getBaseCuboidId(cubeDesc)); + return parentCandidate; + } + parentCandidate.addAll(AggregationGroupScheduler.getOnTreeParents(child, agg)); + } + + return parentCandidate; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/79711218/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java index a26e948..93736c9 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java @@ -33,8 +33,8 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.MemoryBudgetController; -import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.cuboid.CuboidScheduler; import org.apache.kylin.cube.gridtable.CubeGridTable; @@ -92,7 +92,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { public InMemCubeBuilder(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { super(cubeDesc, flatDesc, dictionaryMap); - this.cuboidScheduler = new CuboidScheduler(cubeDesc); + this.cuboidScheduler = cubeDesc.getCuboidScheduler(); this.baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); this.totalCuboidCount = cuboidScheduler.getCuboidCount(); http://git-wip-us.apache.org/repos/asf/kylin/blob/79711218/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index 56fc9fa..5d8503d 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -188,10 +188,8 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { @JsonInclude(JsonInclude.Include.NON_NULL) private int parentForward = 3; - // allCuboids and parent2Child lazy built - private Set<Long> allCuboids; - private Map<Long, List<Long>> parent2Child; - private byte[] cuboidTreeLock = new byte[0]; + // cuboid scheduler lazy built + transient private CuboidScheduler cuboidScheduler = null; public boolean isEnableSharding() { //in the future may extend to other storage that is shard-able @@ -465,7 +463,7 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { * @return */ public int getBuildLevel() { - return new CuboidScheduler(this).getCuboidsByLayer().size() - 1; + return getCuboidScheduler().getCuboidsByLayer().size() - 1; } @Override @@ -558,11 +556,7 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { hostToDerivedMap = Maps.newHashMap(); extendedColumnToHosts = Maps.newHashMap(); cuboidBlackSet = Sets.newHashSet(); - - synchronized (cuboidTreeLock) { - allCuboids = null; - parent2Child = null; - } + cuboidScheduler = null; } public void init(KylinConfig config) { @@ -618,13 +612,15 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { amendAllColumns(); } - private void buildCuboidTree() { - synchronized (cuboidTreeLock) { - if (allCuboids == null || parent2Child == null) { - Pair<Set<Long>, Map<Long, List<Long>>> ret = new CuboidScheduler(this).buildTreeBottomUp(); - allCuboids = ret.getFirst(); - parent2Child = ret.getSecond(); + public CuboidScheduler getCuboidScheduler() { + if (cuboidScheduler != null) + return cuboidScheduler; + + synchronized (this) { + if (cuboidScheduler == null) { + cuboidScheduler = CuboidScheduler.getInstance(this); } + return cuboidScheduler; } } @@ -1146,13 +1142,7 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { } public Set<Long> getAllCuboids() { - buildCuboidTree(); - return allCuboids; - } - - public Map<Long, List<Long>> getParent2Child() { - buildCuboidTree(); - return parent2Child; + return getCuboidScheduler().getAllCuboidIds(); } public int getParentForward() { http://git-wip-us.apache.org/repos/asf/kylin/blob/79711218/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java index b1b6bce..1c3c395 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java @@ -24,14 +24,11 @@ import java.util.List; import java.util.Map; import java.util.Set; -import javax.annotation.Nullable; - import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.cuboid.CuboidScheduler; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich; import org.apache.kylin.dict.DictionaryGenerator; @@ -45,9 +42,7 @@ import org.apache.kylin.source.IReadableTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Function; import com.google.common.collect.HashMultimap; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.hash.HashFunction; import com.google.common.hash.Hasher; @@ -62,28 +57,10 @@ public class CubingUtils { public static Map<Long, HLLCounter> sampling(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDescIn, Iterable<List<String>> streams) { final CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(flatDescIn, cubeDesc); final int rowkeyLength = cubeDesc.getRowkey().getRowKeyColumns().length; - final List<Long> allCuboidIds = new CuboidScheduler(cubeDesc).getAllCuboidIds(); + final Set<Long> allCuboidIds = cubeDesc.getCuboidScheduler().getAllCuboidIds(); final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMap(); - Lists.transform(allCuboidIds, new Function<Long, Integer[]>() { - @Nullable - @Override - public Integer[] apply(@Nullable Long cuboidId) { - Integer[] result = new Integer[Long.bitCount(cuboidId)]; - - long mask = Long.highestOneBit(baseCuboidId); - int position = 0; - for (int i = 0; i < rowkeyLength; i++) { - if ((mask & cuboidId) > 0) { - result[position] = i; - position++; - } - mask = mask >> 1; - } - return result; - } - }); final Map<Long, HLLCounter> result = Maps.newHashMapWithExpectedSize(allCuboidIds.size()); for (Long cuboidId : allCuboidIds) { result.put(cuboidId, new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision())); http://git-wip-us.apache.org/repos/asf/kylin/blob/79711218/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidSchedulerTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidSchedulerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidSchedulerTest.java index e2a71db..58c0edb 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidSchedulerTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidSchedulerTest.java @@ -128,7 +128,7 @@ public class CuboidSchedulerTest extends LocalFileMetadataTestCase { @Test public void testGetSpanningCuboid2() { CubeDesc cube = getTestKylinCubeWithSeller(); - CuboidScheduler scheduler = new CuboidScheduler(cube); + CuboidScheduler scheduler = cube.getCuboidScheduler(); // generate 8d System.out.println("Spanning for 8D Cuboids"); @@ -156,7 +156,7 @@ public class CuboidSchedulerTest extends LocalFileMetadataTestCase { @Test public void testGetSpanningCuboid1() { CubeDesc cube = getTestKylinCubeWithoutSeller(); - CuboidScheduler scheduler = new CuboidScheduler(cube); + CuboidScheduler scheduler = cube.getCuboidScheduler(); // generate 7d System.out.println("Spanning for 7D Cuboids"); @@ -183,18 +183,6 @@ public class CuboidSchedulerTest extends LocalFileMetadataTestCase { } @Test - public void testGetCardinality() { - CubeDesc cube = getTestKylinCubeWithSeller(); - CuboidScheduler scheduler = new CuboidScheduler(cube); - - assertEquals(0, scheduler.getCardinality(0)); - assertEquals(7, scheduler.getCardinality(127)); - assertEquals(1, scheduler.getCardinality(1)); - assertEquals(1, scheduler.getCardinality(8)); - assertEquals(6, scheduler.getCardinality(126)); - } - - @Test public void testCuboidGeneration1() { CubeDesc cube = getTestKylinCubeWithoutSeller(); @@ -240,7 +228,7 @@ public class CuboidSchedulerTest extends LocalFileMetadataTestCase { @Test public void testCuboidCounts1() { CubeDesc cube = getTestKylinCubeWithoutSeller(); - CuboidScheduler cuboidScheduler = new CuboidScheduler(cube); + CuboidScheduler cuboidScheduler = cube.getCuboidScheduler(); int[] counts = CuboidCLI.calculateAllLevelCount(cube); printCount(counts); int sum = 0; @@ -253,7 +241,7 @@ public class CuboidSchedulerTest extends LocalFileMetadataTestCase { @Test public void testCuboidCounts2() { CubeDesc cube = getTestKylinCubeWithoutSellerLeftJoin(); - CuboidScheduler cuboidScheduler = new CuboidScheduler(cube); + CuboidScheduler cuboidScheduler = cube.getCuboidScheduler(); int[] counts = CuboidCLI.calculateAllLevelCount(cube); printCount(counts); int sum = 0; @@ -266,7 +254,7 @@ public class CuboidSchedulerTest extends LocalFileMetadataTestCase { @Test public void testCuboidCounts3() { CubeDesc cube = getTestKylinCubeWithSeller(); - CuboidScheduler cuboidScheduler = new CuboidScheduler(cube); + CuboidScheduler cuboidScheduler = cube.getCuboidScheduler(); int[] counts = CuboidCLI.calculateAllLevelCount(cube); printCount(counts); int sum = 0; @@ -279,7 +267,7 @@ public class CuboidSchedulerTest extends LocalFileMetadataTestCase { @Test public void testCuboidCounts4() { CubeDesc cube = getTestKylinCubeWithSellerLeft(); - CuboidScheduler cuboidScheduler = new CuboidScheduler(cube); + CuboidScheduler cuboidScheduler = cube.getCuboidScheduler(); int[] counts = CuboidCLI.calculateAllLevelCount(cube); printCount(counts); int sum = 0; @@ -292,7 +280,7 @@ public class CuboidSchedulerTest extends LocalFileMetadataTestCase { @Test public void testCuboidCounts5() { CubeDesc cube = getStreamingCubeDesc(); - CuboidScheduler cuboidScheduler = new CuboidScheduler(cube); + CuboidScheduler cuboidScheduler = cube.getCuboidScheduler(); int[] counts = CuboidCLI.calculateAllLevelCount(cube); printCount(counts); int sum = 0; @@ -305,7 +293,7 @@ public class CuboidSchedulerTest extends LocalFileMetadataTestCase { @Test public void testCuboidCounts6() { CubeDesc cube = getCIInnerJoinCube(); - CuboidScheduler cuboidScheduler = new CuboidScheduler(cube); + CuboidScheduler cuboidScheduler = cube.getCuboidScheduler(); int[] counts = CuboidCLI.calculateAllLevelCount(cube); printCount(counts); int sum = 0; @@ -318,7 +306,7 @@ public class CuboidSchedulerTest extends LocalFileMetadataTestCase { @Test public void testLargeCube() { CubeDesc cube = getFiftyDimCubeDesc(); - CuboidScheduler cuboidScheduler = new CuboidScheduler(cube); + CuboidScheduler cuboidScheduler = cube.getCuboidScheduler(); long start = System.currentTimeMillis(); System.out.println(cuboidScheduler.getCuboidCount()); System.out.println("build tree takes: " + (System.currentTimeMillis() - start) + "ms"); @@ -329,7 +317,7 @@ public class CuboidSchedulerTest extends LocalFileMetadataTestCase { File twentyFile = new File(new File(LocalFileMetadataTestCase.LOCALMETA_TEMP_DATA, "cube_desc"), "twenty_dim"); twentyFile.renameTo(new File(twentyFile.getPath().substring(0, twentyFile.getPath().length() - 4))); CubeDesc cube = getTwentyDimCubeDesc(); - CuboidScheduler cuboidScheduler = new CuboidScheduler(cube); + CuboidScheduler cuboidScheduler = cube.getCuboidScheduler(); cuboidScheduler.getCuboidCount(); twentyFile.renameTo(new File(twentyFile.getPath() + ".bad")); } @@ -344,7 +332,7 @@ public class CuboidSchedulerTest extends LocalFileMetadataTestCase { } CubeDescManager.clearCache(); CubeDesc cube = getCubeDescManager().getCubeDesc("ut_large_dimension_number"); - CuboidScheduler scheduler = new CuboidScheduler(cube); + CuboidScheduler scheduler = cube.getCuboidScheduler(); Cuboid baseCuboid = Cuboid.getBaseCuboid(cube); assertTrue(Cuboid.isValid(cube, baseCuboid.getId())); http://git-wip-us.apache.org/repos/asf/kylin/blob/79711218/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java index a372c5b..e160d27 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java @@ -81,7 +81,7 @@ public class CubeStatsReader { public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig) throws IOException { ResourceStore store = ResourceStore.getStore(kylinConfig); - cuboidScheduler = new CuboidScheduler(cubeSegment.getCubeDesc()); + cuboidScheduler = cubeSegment.getCubeDesc().getCuboidScheduler(); String statsKey = cubeSegment.getStatisticsResourcePath(); File tmpSeqFile = writeTmpSeqFile(store.getResource(statsKey).inputStream); Reader reader = null; http://git-wip-us.apache.org/repos/asf/kylin/blob/79711218/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java index 713b7f7..8281759 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java @@ -88,7 +88,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED)); if (collectStatistics) { samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT)); - cuboidScheduler = new CuboidScheduler(cubeDesc); + cuboidScheduler = cubeDesc.getCuboidScheduler(); nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length; List<Long> cuboidIdList = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/kylin/blob/79711218/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java index b924edc..782ce72 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java @@ -18,6 +18,9 @@ package org.apache.kylin.engine.mr.steps; +import java.io.IOException; +import java.util.Collection; + import org.apache.hadoop.io.Text; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ByteArray; @@ -36,9 +39,6 @@ import org.apache.kylin.engine.mr.common.NDCuboidBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.Collection; - /** * @author George Song (ysong1) * @@ -75,7 +75,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { cubeDesc = cube.getDescriptor(); ndCuboidBuilder = new NDCuboidBuilder(cubeSegment); // initialize CubiodScheduler - cuboidScheduler = new CuboidScheduler(cubeDesc); + cuboidScheduler = cubeDesc.getCuboidScheduler(); rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256); } http://git-wip-us.apache.org/repos/asf/kylin/blob/79711218/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java index 1980343..ac56075 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java @@ -130,7 +130,7 @@ public class KylinKryoRegistrator implements KryoRegistrator { kyroClasses.add(org.apache.kylin.cube.CubeSegment.class); kyroClasses.add(org.apache.kylin.cube.common.RowKeySplitter.class); kyroClasses.add(org.apache.kylin.cube.cuboid.Cuboid.class); - kyroClasses.add(org.apache.kylin.cube.cuboid.CuboidScheduler.class); + kyroClasses.add(org.apache.kylin.cube.cuboid.DefaultCuboidScheduler.class); kyroClasses.add(org.apache.kylin.cube.gridtable.TrimmedDimensionSerializer.class); kyroClasses.add(org.apache.kylin.cube.kv.AbstractRowKeyEncoder.class); kyroClasses.add(org.apache.kylin.cube.kv.CubeDimEncMap.class); http://git-wip-us.apache.org/repos/asf/kylin/blob/79711218/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java index 57fd315..a094cc2 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java @@ -29,6 +29,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Set; import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -252,8 +253,8 @@ public class SparkCubing extends AbstractApplication { CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName); CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); CubeDesc cubeDesc = cubeInstance.getDescriptor(); - CuboidScheduler cuboidScheduler = new CuboidScheduler(cubeDesc); - List<Long> allCuboidIds = cuboidScheduler.getAllCuboidIds(); + CuboidScheduler cuboidScheduler = cubeDesc.getCuboidScheduler(); + Set<Long> allCuboidIds = cuboidScheduler.getAllCuboidIds(); final HashMap<Long, HLLCounter> zeroValue = Maps.newHashMap(); for (Long id : allCuboidIds) { zeroValue.put(id, new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision())); http://git-wip-us.apache.org/repos/asf/kylin/blob/79711218/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java index 587ff78..2f54e25 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java @@ -17,6 +17,15 @@ */ package org.apache.kylin.engine.spark; +import java.io.File; +import java.io.FileFilter; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; @@ -65,16 +74,8 @@ import org.apache.spark.sql.hive.HiveContext; import org.apache.spark.storage.StorageLevel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Tuple2; -import java.io.File; -import java.io.FileFilter; -import java.io.Serializable; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; +import scala.Tuple2; /** @@ -168,7 +169,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa final Broadcast<CubeSegment> vCubeSegment = sc.broadcast(cubeSegment); final NDCuboidBuilder ndCuboidBuilder = new NDCuboidBuilder(vCubeSegment.getValue(), new RowKeyEncoderProvider(vCubeSegment.getValue())); - final Broadcast<CuboidScheduler> vCuboidScheduler = sc.broadcast(new CuboidScheduler(vCubeDesc.getValue())); + final Broadcast<CuboidScheduler> vCuboidScheduler = sc.broadcast(vCubeDesc.getValue().getCuboidScheduler()); final int measureNum = cubeDesc.getMeasures().size(); int countMeasureIndex = 0;