wonook commented on a change in pull request #293:
URL: https://github.com/apache/incubator-nemo/pull/293#discussion_r529199653
##########
File path: compiler/backend/pom.xml
##########
@@ -47,5 +47,16 @@ under the License.
<artifactId>nemo-compiler-optimizer</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.nemo</groupId>
+ <artifactId>nemo-runtime-master</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nemo</groupId>
+ <artifactId>nemo-runtime-master</artifactId>
+ <version>0.3-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
Review comment:
Are these lines required?
##########
File path:
compiler/backend/src/main/java/org/apache/nemo/compiler/backend/nemo/prophet/ParallelismProphet.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.backend.nemo.prophet;
+
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.dag.DAG;
+import org.apache.nemo.common.dag.DAGBuilder;
+import org.apache.nemo.common.ir.IRDAG;
+import org.apache.nemo.common.ir.edge.IREdge;
+import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.apache.nemo.runtime.common.metric.JobMetric;
+import org.apache.nemo.runtime.common.plan.PhysicalPlan;
+import org.apache.nemo.runtime.common.plan.PhysicalPlanGenerator;
+import org.apache.nemo.runtime.common.plan.Stage;
+import org.apache.nemo.runtime.common.plan.StageEdge;
+import org.apache.nemo.runtime.master.metric.MetricStore;
+import org.apache.nemo.runtime.master.scheduler.SimulationScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * A prophet for Parallelism.
+ */
+public final class ParallelismProphet implements Prophet<String, Long> {
+ private static final Logger LOG =
LoggerFactory.getLogger(ParallelismProphet.class.getName());
+ private final SimulationScheduler simulationScheduler;
+ private final PhysicalPlanGenerator physicalPlanGenerator;
+ private final IRDAG currentIRDAG;
+ private final PhysicalPlan currentPhysicalPlan;
+ private final Set<StageEdge> edgesToOptimize;
+ private final Set<String> stageId;
+ private int partitionerProperty;
+
+ /**
+ * Default constructor for ParallelismProphet.
+ * @param irdag current IRDAG
+ * @param physicalPlan current PhysicalPlan
+ * @param simulationScheduler SimulationScheduler to launch
+ * @param physicalPlanGenerator PhysicalPlanGenerator to make physical
plan which will be launched by
+ * simulation scheduler
+ * @param edgesToOptimize edges to optimize at runtime pass
+ */
+ public ParallelismProphet(final IRDAG irdag, final PhysicalPlan physicalPlan,
+ final SimulationScheduler simulationScheduler,
+ final PhysicalPlanGenerator physicalPlanGenerator,
+ final Set<StageEdge> edgesToOptimize) {
+ this.currentIRDAG = irdag;
+ this.currentPhysicalPlan = physicalPlan;
+ this.simulationScheduler = simulationScheduler;
+ this.physicalPlanGenerator = physicalPlanGenerator;
+ this.edgesToOptimize = edgesToOptimize;
+ this.stageId =
edgesToOptimize.stream().map(StageEdge::getDst).map(Stage::getId).collect(Collectors.toSet());
+ calculatePartitionerProperty(edgesToOptimize);
+ }
+
+ @Override
+ public Map<String, Long> calculate() {
+ final Map<String, Long> result = new HashMap<>();
+ final List<PhysicalPlan> listOfPhysicalPlans = new ArrayList<>(); // when
to update here?
+ for (int i = 0; i < 7; i++) {
+ final int parallelism = (int) (partitionerProperty / Math.pow(2, i));
+ PhysicalPlan newPlan = makePhysicalPlanForSimulation(parallelism,
edgesToOptimize, currentIRDAG);
+ listOfPhysicalPlans.add(newPlan);
+ }
+ final List<Pair<Integer, Long>> listOfParallelismToDurationPair =
listOfPhysicalPlans.stream()
+ .map(this::launchSimulationForPlan)
+ .filter(pair -> pair.right() > 0.5)
+ .collect(Collectors.toList());
+ final Pair<Integer, Long> pairWithMinDuration =
+ Collections.min(listOfParallelismToDurationPair, Comparator.comparing(p
-> p.right()));
+ result.put("opt.parallelism", pairWithMinDuration.left().longValue());
+ return result;
+ }
+
+ /**
+ * Simulate the given physical plan.
+ * @param physicalPlan physical plan(with only one stage) to simulate
+ * @return Pair of Integer and Long. Integer value
indicates the simulated parallelism, and
+ * Long value is simulated job(=stage) duration.
+ */
+ private synchronized Pair<Integer, Long> launchSimulationForPlan(final
PhysicalPlan physicalPlan) {
+ this.simulationScheduler.schedulePlan(physicalPlan, 1);
+ final MetricStore resultingMetricStore =
this.simulationScheduler.collectMetricStore();
+ final List<Pair<Integer, Long>> taskSizeRatioToDuration = new
ArrayList<>();
+
resultingMetricStore.getMetricMap(JobMetric.class).values().forEach(jobMetric
-> {
+ final int taskSizeRatio = Integer.parseInt(((JobMetric)
jobMetric).getId().split("-")[1]);
+ taskSizeRatioToDuration.add(Pair.of(taskSizeRatio, ((JobMetric)
jobMetric).getJobDuration()));
+ });
+ return Collections.min(taskSizeRatioToDuration,
Comparator.comparing(Pair::right));
+ }
+
+ private void setPartitionerProperty(final int partitionerProperty) {
+ this.partitionerProperty = partitionerProperty;
+ }
+
+ private void calculatePartitionerProperty(final Set<StageEdge> edges) {
Review comment:
Can you provide a brief javadoc?
##########
File path:
compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/runtime/DynamicTaskSizingRuntimePass.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.optimizer.pass.runtime;
+
+import org.apache.nemo.common.HashRange;
+import org.apache.nemo.common.KeyRange;
+import org.apache.nemo.common.ir.IRDAG;
+import org.apache.nemo.common.ir.edge.IREdge;
+import
org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
+import
org.apache.nemo.common.ir.edge.executionproperty.SubPartitionSetProperty;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import
org.apache.nemo.common.ir.vertex.executionproperty.EnableDynamicTaskSizingProperty;
+import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Runtime pass for Dynamic Task Sizing policy.
+ */
+public final class DynamicTaskSizingRuntimePass extends
RunTimePass<Map<String, Long>> {
+ private static final Logger LOG =
LoggerFactory.getLogger(DynamicTaskSizingRuntimePass.class.getName());
+ private final String mapKey = "opt.parallelism";
+
+ public DynamicTaskSizingRuntimePass() {
+ }
+
+ @Override
+ public IRDAG apply(final IRDAG irdag, final Message<Map<String, Long>>
mapMessage) {
+ final Set<IREdge> edgesToOptimize = mapMessage.getExaminedEdges();
+ final Set<IRVertex> stageVertices =
edgesToOptimize.stream().map(IREdge::getDst).collect(Collectors.toSet());
+ irdag.topologicalDo(v -> {
+ if (stageVertices.contains(v)) {
+ edgesToOptimize.addAll(irdag.getIncomingEdgesOf(v));
+ }
+ });
+ LOG.info("Examined edges {}",
edgesToOptimize.stream().map(IREdge::getId).collect(Collectors.toList()));
+
+ final IREdge representativeEdge = edgesToOptimize.iterator().next();
+ // double check
+ if
(!representativeEdge.getDst().getPropertyValue(EnableDynamicTaskSizingProperty.class).get())
{
+ return irdag;
+ }
+ final Map<String, Long> messageValue = mapMessage.getMessageValue();
+ LOG.info("messageValue {}", messageValue);
+ final int optimizedTaskSizeRatio = messageValue.get(mapKey).intValue();
+ final int partitionerProperty = getPartitionerProperty(irdag);
+ for (IREdge edge : edgesToOptimize) {
+ if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
+ .equals(CommunicationPatternProperty.Value.SHUFFLE)
+ &&
!edge.getPropertyValue(PartitionerProperty.class).get().right().equals(partitionerProperty))
{
+ throw new IllegalArgumentException();
+ }
+ }
+ final int partitionUnit = partitionerProperty / optimizedTaskSizeRatio;
+ edgesToOptimize.forEach(irEdge -> setSubPartitionProperty(irEdge,
partitionUnit, partitionerProperty));
+ edgesToOptimize.forEach(irEdge -> setDstVertexParallelismProperty(irEdge,
partitionUnit, partitionerProperty));
+ return irdag;
+ }
+
+ private int getPartitionerProperty(final IRDAG dag) {
+ long jobSizeInBytes = dag.getInputSize();
+ long jobSizeInGB = jobSizeInBytes / (1024 * 1024 * 1024);
+ if (1 <= jobSizeInGB && jobSizeInGB < 10) {
+ return 1024;
+ } else if (10 <= jobSizeInGB && jobSizeInGB < 100) {
+ return 2048;
+ } else {
+ return 4096;
+ }
+ }
+
+ private void setSubPartitionProperty(final IREdge edge, final int
growingFactor, final int partitionerProperty) {
+ final int start = (int)
edge.getPropertyValue(SubPartitionSetProperty.class).get().get(0).rangeBeginInclusive();
Review comment:
orElse() instead of get()?
##########
File path:
compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/runtime/DynamicTaskSizingRuntimePassTest.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.optimizer.pass.runtime;
+
+import org.apache.nemo.client.JobLauncher;
+import org.apache.nemo.common.ir.IRDAG;
+import org.apache.nemo.common.ir.edge.IREdge;
+import org.apache.nemo.common.ir.edge.executionproperty.MessageIdEdgeProperty;
+import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.apache.nemo.compiler.CompilerTestUtil;
+import
org.apache.nemo.compiler.optimizer.pass.compiletime.reshaping.LoopUnrollingPass;
+import
org.apache.nemo.compiler.optimizer.pass.compiletime.reshaping.SamplingTaskSizingPass;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static junit.framework.TestCase.assertEquals;
+
+/**
+ * Test {@link DynamicTaskSizingRuntimePass}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(JobLauncher.class)
+public class DynamicTaskSizingRuntimePassTest {
+ private IRDAG compiledDAG;
+ private final String mapKey = "opt.parallelism";
+ private final Map<String, Long> message = new HashMap<>();
+
+ @Before
+ public void setUp() throws Exception {
+ compiledDAG = CompilerTestUtil.compileWordCountDAG();
+ message.put(mapKey, 8L);
+ }
+
+ /**
+ * Testing method of whether the runtime pass successfully changes the
parallelism of target vertices.
+ */
+ @Test
+ public void testDynamicTaskSizingRuntimePass() {
+ final IRDAG dagAfterCompileTimePass =
+ new LoopUnrollingPass().apply(new
SamplingTaskSizingPass().apply(compiledDAG));
+ final Set<IREdge> edgesToExamine =
dagAfterCompileTimePass.getEdges().stream()
+ .filter(irEdge ->
irEdge.getExecutionProperties().containsKey(MessageIdEdgeProperty.class))
+ .collect(Collectors.toSet());
+
+ if (!edgesToExamine.isEmpty()) {
+ final IRDAG afterRuntimePass = new DynamicTaskSizingRuntimePass()
+ .apply(dagAfterCompileTimePass, new Message<>(1, edgesToExamine,
message));
+ for (IREdge edge : afterRuntimePass.getEdges())
+ if (edgesToExamine.contains(edge)) {
+
assertEquals(edge.getDst().getPropertyValue(ParallelismProperty.class), 8);
Review comment:
It seems like .get() or .orElse() method is missing from the optional
int.
##########
File path:
compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/runtime/DynamicTaskSizingRuntimePass.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.optimizer.pass.runtime;
+
+import org.apache.nemo.common.HashRange;
+import org.apache.nemo.common.KeyRange;
+import org.apache.nemo.common.ir.IRDAG;
+import org.apache.nemo.common.ir.edge.IREdge;
+import
org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
+import
org.apache.nemo.common.ir.edge.executionproperty.SubPartitionSetProperty;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import
org.apache.nemo.common.ir.vertex.executionproperty.EnableDynamicTaskSizingProperty;
+import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Runtime pass for Dynamic Task Sizing policy.
+ */
+public final class DynamicTaskSizingRuntimePass extends
RunTimePass<Map<String, Long>> {
+ private static final Logger LOG =
LoggerFactory.getLogger(DynamicTaskSizingRuntimePass.class.getName());
+ private final String mapKey = "opt.parallelism";
+
+ public DynamicTaskSizingRuntimePass() {
Review comment:
Javadoc
just say that it's a default constructor.
##########
File path:
compiler/backend/src/main/java/org/apache/nemo/compiler/backend/nemo/prophet/ParallelismProphet.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.backend.nemo.prophet;
+
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.dag.DAG;
+import org.apache.nemo.common.dag.DAGBuilder;
+import org.apache.nemo.common.ir.IRDAG;
+import org.apache.nemo.common.ir.edge.IREdge;
+import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.apache.nemo.runtime.common.metric.JobMetric;
+import org.apache.nemo.runtime.common.plan.PhysicalPlan;
+import org.apache.nemo.runtime.common.plan.PhysicalPlanGenerator;
+import org.apache.nemo.runtime.common.plan.Stage;
+import org.apache.nemo.runtime.common.plan.StageEdge;
+import org.apache.nemo.runtime.master.metric.MetricStore;
+import org.apache.nemo.runtime.master.scheduler.SimulationScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * A prophet for Parallelism.
+ */
+public final class ParallelismProphet implements Prophet<String, Long> {
+ private static final Logger LOG =
LoggerFactory.getLogger(ParallelismProphet.class.getName());
+ private final SimulationScheduler simulationScheduler;
+ private final PhysicalPlanGenerator physicalPlanGenerator;
+ private final IRDAG currentIRDAG;
+ private final PhysicalPlan currentPhysicalPlan;
+ private final Set<StageEdge> edgesToOptimize;
+ private final Set<String> stageId;
+ private int partitionerProperty;
+
+ /**
+ * Default constructor for ParallelismProphet.
+ * @param irdag current IRDAG
+ * @param physicalPlan current PhysicalPlan
+ * @param simulationScheduler SimulationScheduler to launch
+ * @param physicalPlanGenerator PhysicalPlanGenerator to make physical
plan which will be launched by
+ * simulation scheduler
+ * @param edgesToOptimize edges to optimize at runtime pass
+ */
+ public ParallelismProphet(final IRDAG irdag, final PhysicalPlan physicalPlan,
+ final SimulationScheduler simulationScheduler,
+ final PhysicalPlanGenerator physicalPlanGenerator,
+ final Set<StageEdge> edgesToOptimize) {
+ this.currentIRDAG = irdag;
+ this.currentPhysicalPlan = physicalPlan;
+ this.simulationScheduler = simulationScheduler;
+ this.physicalPlanGenerator = physicalPlanGenerator;
+ this.edgesToOptimize = edgesToOptimize;
+ this.stageId =
edgesToOptimize.stream().map(StageEdge::getDst).map(Stage::getId).collect(Collectors.toSet());
+ calculatePartitionerProperty(edgesToOptimize);
+ }
+
+ @Override
+ public Map<String, Long> calculate() {
+ final Map<String, Long> result = new HashMap<>();
+ final List<PhysicalPlan> listOfPhysicalPlans = new ArrayList<>(); // when
to update here?
+ for (int i = 0; i < 7; i++) {
+ final int parallelism = (int) (partitionerProperty / Math.pow(2, i));
+ PhysicalPlan newPlan = makePhysicalPlanForSimulation(parallelism,
edgesToOptimize, currentIRDAG);
+ listOfPhysicalPlans.add(newPlan);
+ }
+ final List<Pair<Integer, Long>> listOfParallelismToDurationPair =
listOfPhysicalPlans.stream()
+ .map(this::launchSimulationForPlan)
+ .filter(pair -> pair.right() > 0.5)
+ .collect(Collectors.toList());
+ final Pair<Integer, Long> pairWithMinDuration =
+ Collections.min(listOfParallelismToDurationPair, Comparator.comparing(p
-> p.right()));
+ result.put("opt.parallelism", pairWithMinDuration.left().longValue());
+ return result;
+ }
+
+ /**
+ * Simulate the given physical plan.
+ * @param physicalPlan physical plan(with only one stage) to simulate
+ * @return Pair of Integer and Long. Integer value
indicates the simulated parallelism, and
+ * Long value is simulated job(=stage) duration.
+ */
+ private synchronized Pair<Integer, Long> launchSimulationForPlan(final
PhysicalPlan physicalPlan) {
+ this.simulationScheduler.schedulePlan(physicalPlan, 1);
+ final MetricStore resultingMetricStore =
this.simulationScheduler.collectMetricStore();
+ final List<Pair<Integer, Long>> taskSizeRatioToDuration = new
ArrayList<>();
+
resultingMetricStore.getMetricMap(JobMetric.class).values().forEach(jobMetric
-> {
+ final int taskSizeRatio = Integer.parseInt(((JobMetric)
jobMetric).getId().split("-")[1]);
+ taskSizeRatioToDuration.add(Pair.of(taskSizeRatio, ((JobMetric)
jobMetric).getJobDuration()));
+ });
+ return Collections.min(taskSizeRatioToDuration,
Comparator.comparing(Pair::right));
+ }
+
+ private void setPartitionerProperty(final int partitionerProperty) {
+ this.partitionerProperty = partitionerProperty;
+ }
+
+ private void calculatePartitionerProperty(final Set<StageEdge> edges) {
+
setPartitionerProperty(edges.iterator().next().getPropertyValue(PartitionerProperty.class).get().right());
+ }
+ /**
Review comment:
Newline
##########
File path:
runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/RunTimeMessageOutputCollector.java
##########
@@ -41,6 +41,7 @@
public final class RunTimeMessageOutputCollector<O> implements
OutputCollector<O> {
private static final Logger LOG =
LoggerFactory.getLogger(RunTimeMessageOutputCollector.class.getName());
private static final String NULL_KEY = "NULL";
+ private static final String NON_EXIST = "NONE";
Review comment:
NON_EXISTENT?
##########
File path:
compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/runtime/DynamicTaskSizingRuntimePass.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.optimizer.pass.runtime;
+
+import org.apache.nemo.common.HashRange;
+import org.apache.nemo.common.KeyRange;
+import org.apache.nemo.common.ir.IRDAG;
+import org.apache.nemo.common.ir.edge.IREdge;
+import
org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
+import
org.apache.nemo.common.ir.edge.executionproperty.SubPartitionSetProperty;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import
org.apache.nemo.common.ir.vertex.executionproperty.EnableDynamicTaskSizingProperty;
+import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Runtime pass for Dynamic Task Sizing policy.
+ */
+public final class DynamicTaskSizingRuntimePass extends
RunTimePass<Map<String, Long>> {
+ private static final Logger LOG =
LoggerFactory.getLogger(DynamicTaskSizingRuntimePass.class.getName());
+ private final String mapKey = "opt.parallelism";
+
+ public DynamicTaskSizingRuntimePass() {
+ }
+
+ @Override
+ public IRDAG apply(final IRDAG irdag, final Message<Map<String, Long>>
mapMessage) {
+ final Set<IREdge> edgesToOptimize = mapMessage.getExaminedEdges();
+ final Set<IRVertex> stageVertices =
edgesToOptimize.stream().map(IREdge::getDst).collect(Collectors.toSet());
+ irdag.topologicalDo(v -> {
+ if (stageVertices.contains(v)) {
+ edgesToOptimize.addAll(irdag.getIncomingEdgesOf(v));
+ }
+ });
+ LOG.info("Examined edges {}",
edgesToOptimize.stream().map(IREdge::getId).collect(Collectors.toList()));
+
+ final IREdge representativeEdge = edgesToOptimize.iterator().next();
+ // double check
+ if
(!representativeEdge.getDst().getPropertyValue(EnableDynamicTaskSizingProperty.class).get())
{
+ return irdag;
+ }
+ final Map<String, Long> messageValue = mapMessage.getMessageValue();
+ LOG.info("messageValue {}", messageValue);
+ final int optimizedTaskSizeRatio = messageValue.get(mapKey).intValue();
+ final int partitionerProperty = getPartitionerProperty(irdag);
+ for (IREdge edge : edgesToOptimize) {
+ if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
+ .equals(CommunicationPatternProperty.Value.SHUFFLE)
+ &&
!edge.getPropertyValue(PartitionerProperty.class).get().right().equals(partitionerProperty))
{
Review comment:
Let's flip the two around the `.equals()` method, since the former could
return `null`. (probably not, but it shows up as a sonar lint issue)
##########
File path:
compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/runtime/DynamicTaskSizingRuntimePass.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.optimizer.pass.runtime;
+
+import org.apache.nemo.common.HashRange;
+import org.apache.nemo.common.KeyRange;
+import org.apache.nemo.common.ir.IRDAG;
+import org.apache.nemo.common.ir.edge.IREdge;
+import
org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
+import
org.apache.nemo.common.ir.edge.executionproperty.SubPartitionSetProperty;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import
org.apache.nemo.common.ir.vertex.executionproperty.EnableDynamicTaskSizingProperty;
+import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Runtime pass for Dynamic Task Sizing policy.
+ */
+public final class DynamicTaskSizingRuntimePass extends
RunTimePass<Map<String, Long>> {
+ private static final Logger LOG =
LoggerFactory.getLogger(DynamicTaskSizingRuntimePass.class.getName());
+ private final String mapKey = "opt.parallelism";
+
+ public DynamicTaskSizingRuntimePass() {
+ }
+
+ @Override
+ public IRDAG apply(final IRDAG irdag, final Message<Map<String, Long>>
mapMessage) {
+ final Set<IREdge> edgesToOptimize = mapMessage.getExaminedEdges();
+ final Set<IRVertex> stageVertices =
edgesToOptimize.stream().map(IREdge::getDst).collect(Collectors.toSet());
+ irdag.topologicalDo(v -> {
+ if (stageVertices.contains(v)) {
+ edgesToOptimize.addAll(irdag.getIncomingEdgesOf(v));
+ }
+ });
+ LOG.info("Examined edges {}",
edgesToOptimize.stream().map(IREdge::getId).collect(Collectors.toList()));
+
+ final IREdge representativeEdge = edgesToOptimize.iterator().next();
+ // double check
+ if
(!representativeEdge.getDst().getPropertyValue(EnableDynamicTaskSizingProperty.class).get())
{
+ return irdag;
+ }
+ final Map<String, Long> messageValue = mapMessage.getMessageValue();
+ LOG.info("messageValue {}", messageValue);
+ final int optimizedTaskSizeRatio = messageValue.get(mapKey).intValue();
+ final int partitionerProperty = getPartitionerProperty(irdag);
+ for (IREdge edge : edgesToOptimize) {
+ if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
+ .equals(CommunicationPatternProperty.Value.SHUFFLE)
+ &&
!edge.getPropertyValue(PartitionerProperty.class).get().right().equals(partitionerProperty))
{
+ throw new IllegalArgumentException();
+ }
+ }
+ final int partitionUnit = partitionerProperty / optimizedTaskSizeRatio;
+ edgesToOptimize.forEach(irEdge -> setSubPartitionProperty(irEdge,
partitionUnit, partitionerProperty));
+ edgesToOptimize.forEach(irEdge -> setDstVertexParallelismProperty(irEdge,
partitionUnit, partitionerProperty));
+ return irdag;
+ }
+
+ private int getPartitionerProperty(final IRDAG dag) {
+ long jobSizeInBytes = dag.getInputSize();
+ long jobSizeInGB = jobSizeInBytes / (1024 * 1024 * 1024);
+ if (1 <= jobSizeInGB && jobSizeInGB < 10) {
+ return 1024;
+ } else if (10 <= jobSizeInGB && jobSizeInGB < 100) {
+ return 2048;
+ } else {
+ return 4096;
+ }
+ }
+
+ private void setSubPartitionProperty(final IREdge edge, final int
growingFactor, final int partitionerProperty) {
+ final int start = (int)
edge.getPropertyValue(SubPartitionSetProperty.class).get().get(0).rangeBeginInclusive();
+ final ArrayList<KeyRange> partitionSet = new ArrayList<>();
+ int taskIndex = 0;
+ for (int startIndex = start; startIndex < partitionerProperty; startIndex
+= growingFactor) {
+ partitionSet.add(taskIndex, HashRange.of(startIndex, startIndex +
growingFactor));
+ taskIndex++;
+ }
+ edge.setPropertyPermanently(SubPartitionSetProperty.of(partitionSet));
+ }
+
+ private void setDstVertexParallelismProperty(final IREdge edge,
+ final int partitionSize,
+ final int partitionerProperty) {
+ final int start = (int)
edge.getPropertyValue(SubPartitionSetProperty.class).get().get(0).rangeBeginInclusive();
Review comment:
orElse() instead of get()?
##########
File path:
compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/runtime/DynamicTaskSizingRuntimePass.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.optimizer.pass.runtime;
+
+import org.apache.nemo.common.HashRange;
+import org.apache.nemo.common.KeyRange;
+import org.apache.nemo.common.ir.IRDAG;
+import org.apache.nemo.common.ir.edge.IREdge;
+import
org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
+import
org.apache.nemo.common.ir.edge.executionproperty.SubPartitionSetProperty;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import
org.apache.nemo.common.ir.vertex.executionproperty.EnableDynamicTaskSizingProperty;
+import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Runtime pass for Dynamic Task Sizing policy.
+ */
+public final class DynamicTaskSizingRuntimePass extends
RunTimePass<Map<String, Long>> {
+ private static final Logger LOG =
LoggerFactory.getLogger(DynamicTaskSizingRuntimePass.class.getName());
+ private final String mapKey = "opt.parallelism";
+
+ public DynamicTaskSizingRuntimePass() {
+ }
+
+ @Override
+ public IRDAG apply(final IRDAG irdag, final Message<Map<String, Long>>
mapMessage) {
+ final Set<IREdge> edgesToOptimize = mapMessage.getExaminedEdges();
+ final Set<IRVertex> stageVertices =
edgesToOptimize.stream().map(IREdge::getDst).collect(Collectors.toSet());
+ irdag.topologicalDo(v -> {
+ if (stageVertices.contains(v)) {
+ edgesToOptimize.addAll(irdag.getIncomingEdgesOf(v));
+ }
+ });
+ LOG.info("Examined edges {}",
edgesToOptimize.stream().map(IREdge::getId).collect(Collectors.toList()));
+
+ final IREdge representativeEdge = edgesToOptimize.iterator().next();
+ // double check
+ if
(!representativeEdge.getDst().getPropertyValue(EnableDynamicTaskSizingProperty.class).get())
{
Review comment:
`.orElse()` method seems better than `.get()`
##########
File path:
compiler/backend/src/main/java/org/apache/nemo/compiler/backend/nemo/prophet/Prophet.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.backend.nemo.prophet;
+
+import java.util.Map;
+
+/**
+ * A prophet class for dynamic optimization.
Review comment:
Double check
##########
File path:
compiler/backend/src/main/java/org/apache/nemo/compiler/backend/nemo/prophet/ParallelismProphet.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.backend.nemo.prophet;
+
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.dag.DAG;
+import org.apache.nemo.common.dag.DAGBuilder;
+import org.apache.nemo.common.ir.IRDAG;
+import org.apache.nemo.common.ir.edge.IREdge;
+import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.apache.nemo.runtime.common.metric.JobMetric;
+import org.apache.nemo.runtime.common.plan.PhysicalPlan;
+import org.apache.nemo.runtime.common.plan.PhysicalPlanGenerator;
+import org.apache.nemo.runtime.common.plan.Stage;
+import org.apache.nemo.runtime.common.plan.StageEdge;
+import org.apache.nemo.runtime.master.metric.MetricStore;
+import org.apache.nemo.runtime.master.scheduler.SimulationScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * A prophet for Parallelism.
+ */
+public final class ParallelismProphet implements Prophet<String, Long> {
+ private static final Logger LOG =
LoggerFactory.getLogger(ParallelismProphet.class.getName());
+ private final SimulationScheduler simulationScheduler;
+ private final PhysicalPlanGenerator physicalPlanGenerator;
+ private final IRDAG currentIRDAG;
+ private final PhysicalPlan currentPhysicalPlan;
+ private final Set<StageEdge> edgesToOptimize;
+ private final Set<String> stageId;
+ private int partitionerProperty;
+
+ /**
+ * Default constructor for ParallelismProphet.
+ * @param irdag current IRDAG
+ * @param physicalPlan current PhysicalPlan
+ * @param simulationScheduler SimulationScheduler to launch
+ * @param physicalPlanGenerator PhysicalPlanGenerator to make physical
plan which will be launched by
+ * simulation scheduler
+ * @param edgesToOptimize edges to optimize at runtime pass
+ */
+ public ParallelismProphet(final IRDAG irdag, final PhysicalPlan physicalPlan,
+ final SimulationScheduler simulationScheduler,
+ final PhysicalPlanGenerator physicalPlanGenerator,
+ final Set<StageEdge> edgesToOptimize) {
+ this.currentIRDAG = irdag;
+ this.currentPhysicalPlan = physicalPlan;
+ this.simulationScheduler = simulationScheduler;
+ this.physicalPlanGenerator = physicalPlanGenerator;
+ this.edgesToOptimize = edgesToOptimize;
+ this.stageId =
edgesToOptimize.stream().map(StageEdge::getDst).map(Stage::getId).collect(Collectors.toSet());
+ calculatePartitionerProperty(edgesToOptimize);
+ }
+
+ @Override
+ public Map<String, Long> calculate() {
+ final Map<String, Long> result = new HashMap<>();
+ final List<PhysicalPlan> listOfPhysicalPlans = new ArrayList<>(); // when
to update here?
+ for (int i = 0; i < 7; i++) {
+ final int parallelism = (int) (partitionerProperty / Math.pow(2, i));
+ PhysicalPlan newPlan = makePhysicalPlanForSimulation(parallelism,
edgesToOptimize, currentIRDAG);
+ listOfPhysicalPlans.add(newPlan);
+ }
+ final List<Pair<Integer, Long>> listOfParallelismToDurationPair =
listOfPhysicalPlans.stream()
+ .map(this::launchSimulationForPlan)
+ .filter(pair -> pair.right() > 0.5)
+ .collect(Collectors.toList());
+ final Pair<Integer, Long> pairWithMinDuration =
+ Collections.min(listOfParallelismToDurationPair, Comparator.comparing(p
-> p.right()));
+ result.put("opt.parallelism", pairWithMinDuration.left().longValue());
+ return result;
+ }
+
+ /**
+ * Simulate the given physical plan.
+ * @param physicalPlan physical plan(with only one stage) to simulate
+ * @return Pair of Integer and Long. Integer value
indicates the simulated parallelism, and
+ * Long value is simulated job(=stage) duration.
+ */
+ private synchronized Pair<Integer, Long> launchSimulationForPlan(final
PhysicalPlan physicalPlan) {
+ this.simulationScheduler.schedulePlan(physicalPlan, 1);
+ final MetricStore resultingMetricStore =
this.simulationScheduler.collectMetricStore();
+ final List<Pair<Integer, Long>> taskSizeRatioToDuration = new
ArrayList<>();
+
resultingMetricStore.getMetricMap(JobMetric.class).values().forEach(jobMetric
-> {
+ final int taskSizeRatio = Integer.parseInt(((JobMetric)
jobMetric).getId().split("-")[1]);
+ taskSizeRatioToDuration.add(Pair.of(taskSizeRatio, ((JobMetric)
jobMetric).getJobDuration()));
+ });
+ return Collections.min(taskSizeRatioToDuration,
Comparator.comparing(Pair::right));
+ }
+
+ private void setPartitionerProperty(final int partitionerProperty) {
Review comment:
Can you provide a brief javadoc?
##########
File path: compiler/backend/pom.xml
##########
@@ -47,5 +47,16 @@ under the License.
<artifactId>nemo-compiler-optimizer</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.nemo</groupId>
+ <artifactId>nemo-runtime-master</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nemo</groupId>
+ <artifactId>nemo-runtime-master</artifactId>
+ <version>0.3-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
Review comment:
The indentations are off as well by the way.
##########
File path:
compiler/backend/src/main/java/org/apache/nemo/compiler/backend/nemo/NemoPlanRewriter.java
##########
@@ -60,23 +68,44 @@
private final NemoOptimizer nemoOptimizer;
private final NemoBackend nemoBackend;
private final Map<Integer, Map<Object, Long>> messageIdToAggregatedData;
+ private CountDownLatch readyToRewriteLatch;
+ private final InjectionFuture<SimulationScheduler>
simulationSchedulerInjectionFuture;
+ private final PhysicalPlanGenerator physicalPlanGenerator;
private IRDAG currentIRDAG;
+ private PhysicalPlan currentPhysicalPlan;
@Inject
public NemoPlanRewriter(final NemoOptimizer nemoOptimizer,
- final NemoBackend nemoBackend) {
+ final NemoBackend nemoBackend,
+ final InjectionFuture<SimulationScheduler>
simulationSchedulerInjectionFuture,
+ final PhysicalPlanGenerator physicalPlanGenerator,
+ @Parameter(JobConf.ClientSideRPCServerHost.class)
final String clientHost,
+ @Parameter(JobConf.ClientSideRPCServerPort.class)
final int clientPort,
Review comment:
please double-check!
##########
File path: compiler/optimizer/pom.xml
##########
@@ -33,7 +33,7 @@ under the License.
</plugins>
</build>
- <parent>
+ <parent>
Review comment:
Double check
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]