taegeonum commented on a change in pull request #318: URL: https://github.com/apache/incubator-nemo/pull/318#discussion_r698451787
########## File path: common/src/main/java/org/apache/nemo/common/ir/vertex/executionproperty/ShuffleExecutorSetProperty.java ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nemo.common.ir.vertex.executionproperty; + +import org.apache.nemo.common.ir.executionproperty.VertexExecutionProperty; + +import java.util.ArrayList; +import java.util.HashSet; + +/** + * List of set of node names to limit the scheduling of the tasks of the vertex to while shuffling. + * ShuffleExecutorSetProperty is only for IntermediateAccumulatorVertex. + * Other vertices must not have this property. + */ +public final class ShuffleExecutorSetProperty extends VertexExecutionProperty<ArrayList<HashSet<String>>> { Review comment: ArrayList<HashSet> -> List<Set> ########## File path: common/src/main/java/org/apache/nemo/common/ir/vertex/executionproperty/ShuffleExecutorSetProperty.java ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nemo.common.ir.vertex.executionproperty; + +import org.apache.nemo.common.ir.executionproperty.VertexExecutionProperty; + +import java.util.ArrayList; +import java.util.HashSet; + +/** + * List of set of node names to limit the scheduling of the tasks of the vertex to while shuffling. + * ShuffleExecutorSetProperty is only for IntermediateAccumulatorVertex. + * Other vertices must not have this property. + */ +public final class ShuffleExecutorSetProperty extends VertexExecutionProperty<ArrayList<HashSet<String>>> { + + /** + * Default constructor. + * @param value value of the execution property. + */ + private ShuffleExecutorSetProperty(final ArrayList<HashSet<String>> value) { + super(value); + } + + /** + * Static method for constructing {@link ShuffleExecutorSetProperty}. + * + * @param setsOfExecutors the list of executors to schedule the tasks of the vertex on. + * Leave empty to make it effectless. + * @return the new execution property + */ + public static ShuffleExecutorSetProperty of(final HashSet<HashSet<String>> setsOfExecutors) { Review comment: Set<Set<>> ########## File path: common/src/main/java/org/apache/nemo/common/ir/vertex/executionproperty/ShuffleExecutorSetProperty.java ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nemo.common.ir.vertex.executionproperty; + +import org.apache.nemo.common.ir.executionproperty.VertexExecutionProperty; + +import java.util.ArrayList; +import java.util.HashSet; + +/** + * List of set of node names to limit the scheduling of the tasks of the vertex to while shuffling. Review comment: This explanation is not clear to me. Does this property set the destination executor for the output of intermediate vertex? ########## File path: compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/IntermediateAccumulatorInsertionPass.java ########## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.compiler.optimizer.pass.compiletime.reshaping; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nemo.common.Util; +import org.apache.nemo.common.exception.SchedulingException; +import org.apache.nemo.common.ir.IRDAG; +import org.apache.nemo.common.ir.edge.IREdge; +import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty; +import org.apache.nemo.common.ir.vertex.OperatorVertex; +import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty; +import org.apache.nemo.common.ir.vertex.executionproperty.ShuffleExecutorSetProperty; +import org.apache.nemo.common.ir.vertex.utility.IntermediateAccumulatorVertex; +import org.apache.nemo.compiler.frontend.beam.transform.CombineTransform; +import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires; + +import java.io.File; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Pass for inserting intermediate aggregator for partial shuffle. + */ +@Requires(ParallelismProperty.class) +public class IntermediateAccumulatorInsertionPass extends ReshapingPass { Review comment: public final ########## File path: common/src/main/java/org/apache/nemo/common/ir/vertex/executionproperty/ShuffleExecutorSetProperty.java ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nemo.common.ir.vertex.executionproperty; + +import org.apache.nemo.common.ir.executionproperty.VertexExecutionProperty; + +import java.util.ArrayList; +import java.util.HashSet; + +/** + * List of set of node names to limit the scheduling of the tasks of the vertex to while shuffling. + * ShuffleExecutorSetProperty is only for IntermediateAccumulatorVertex. + * Other vertices must not have this property. + */ +public final class ShuffleExecutorSetProperty extends VertexExecutionProperty<ArrayList<HashSet<String>>> { + + /** + * Default constructor. + * @param value value of the execution property. + */ + private ShuffleExecutorSetProperty(final ArrayList<HashSet<String>> value) { Review comment: List<Set<>> ########## File path: compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/IntermediateAccumulatorInsertionPass.java ########## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.compiler.optimizer.pass.compiletime.reshaping; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nemo.common.Util; +import org.apache.nemo.common.exception.SchedulingException; +import org.apache.nemo.common.ir.IRDAG; +import org.apache.nemo.common.ir.edge.IREdge; +import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty; +import org.apache.nemo.common.ir.vertex.OperatorVertex; +import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty; +import org.apache.nemo.common.ir.vertex.executionproperty.ShuffleExecutorSetProperty; +import org.apache.nemo.common.ir.vertex.utility.IntermediateAccumulatorVertex; +import org.apache.nemo.compiler.frontend.beam.transform.CombineTransform; +import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires; + +import java.io.File; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Pass for inserting intermediate aggregator for partial shuffle. + */ +@Requires(ParallelismProperty.class) +public class IntermediateAccumulatorInsertionPass extends ReshapingPass { + private final String networkFilePath; + private boolean isUnitTest = false; + private static final Map<String, ArrayList<String>> UNIT_TEST_NETWORK_FILE = getUnitTestNetworkFile(); + + /** + * Default constructor. + */ + public IntermediateAccumulatorInsertionPass() { + super(IntermediateAccumulatorInsertionPass.class); + this.networkFilePath = Util.fetchProjectRootPath() + "/tools/network_profiling/labeldict.json"; + } + + /** + * Constructor for unit test. + * @param isUnitTest indicates unit test. + */ + public IntermediateAccumulatorInsertionPass(final boolean isUnitTest) { + this(); + this.isUnitTest = isUnitTest; + } + + private static Map<String, ArrayList<String>> getUnitTestNetworkFile() { Review comment: Adding unit test codes here doesn't look good to me. ########## File path: common/src/main/java/org/apache/nemo/common/ir/vertex/utility/IntermediateAccumulatorVertex.java ########## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.common.ir.vertex.utility; + +import org.apache.nemo.common.ir.vertex.OperatorVertex; +import org.apache.nemo.common.ir.vertex.transform.Transform; + +/** + * During combine transform, accumulates data among physically nearby containers prior to shuffling across WAN. + */ +public class IntermediateAccumulatorVertex extends OperatorVertex { Review comment: public final class ########## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CombineTransform.java ########## @@ -45,9 +45,9 @@ * @param <InputT> input type * @param <OutputT> output type */ -public final class GBKTransform<K, InputT, OutputT> +public final class CombineTransform<K, InputT, OutputT> Review comment: As far as I know, GroupByKey Transform is not always be represented as Combine PerKey Transform, so changing the name is confusing to me. For instance, CoGroupByKey is not combining, but it is also represented as GroupByKey as far as I know. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
