[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693862#comment-16693862 ] Jonathan Eagles commented on TEZ-3998: -- [~yingdachen], Thank you for your continued patience. I have run test-patch on my desktop locally to produce Tez QA results and have found that apart from the expected checkstyle (line lengths > 80 characters). Tests Suite passes and static analysis also passed. +1. I will check this into master branch. > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > Attachments: TEZ-3998.001.patch.diff > > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated() interface of > VextexMangerPlugin to collect relevant information to assist the scheduling > decision, and *there is no additional API change necessary*. However, in more > subtle case such as the parameter-server example described in Fig. 1, other > scheduling type would be more relevant, therefore the placeholder for > *ConcurrentSchedulingType* will be introduced in this change as part of the > infrastructure work. > > Finally, since we assume that all communications between two vertices > connected via CONCURRENT edge are handled by application runtime, a > CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all > DME/VME handling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693642#comment-16693642 ] Yingda Chen commented on TEZ-3998: -- [~jeagles] kindly ping... I was hoping this can go in before the thanksgiving break...it is taking a bit too long and we have long pipeline of changes that we are hoping to contributing to Tez community. Thanks > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > Attachments: TEZ-3998.001.patch.diff > > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated() interface of > VextexMangerPlugin to collect relevant information to assist the scheduling > decision, and *there is no additional API change necessary*. However, in more > subtle case such as the parameter-server example described in Fig. 1, other > scheduling type would be more relevant, therefore the placeholder for > *ConcurrentSchedulingType* will be introduced in this change as part of the > infrastructure work. > > Finally, since we assume that all communications between two vertices > connected via CONCURRENT edge are handled by application runtime, a > CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all > DME/VME handling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689930#comment-16689930 ] Yingda Chen commented on TEZ-3998: -- [~jeagles] could you please comment on the test result and how to proceed, I looked at the build errors and they do not seem to be related to my changes... First time submitting a change to Tez, could you please advise on how to proceed with this? Thanks, -Yingda > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > Attachments: TEZ-3998.001.patch.diff > > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated() interface of > VextexMangerPlugin to collect relevant information to assist the scheduling > decision, and *there is no additional API change necessary*. However, in more > subtle case such as the parameter-server example described in Fig. 1, other > scheduling type would be more relevant, therefore the placeholder for > *ConcurrentSchedulingType* will be introduced in this change as part of the > infrastructure work. > > Finally, since we assume that all communications between two vertices > connected via CONCURRENT edge are handled by application runtime, a > CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all > DME/VME handling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688866#comment-16688866 ] TezQA commented on TEZ-3998: | (x) *{color:red}-1 overall{color}* | \\ \\ || Vote || Subsystem || Runtime || Comment || | {color:blue}0{color} | {color:blue} reexec {color} | {color:blue} 16m 6s{color} | {color:blue} Docker mode activated. {color} | | {color:blue}0{color} | {color:blue} patch {color} | {color:blue} 0m 3s{color} | {color:blue} The patch file was not named according to tez's naming conventions. Please see https://cwiki.apache.org/confluence/display/TEZ/How+to+Contribute+to+Tez for instructions. {color} | || || || || {color:brown} Prechecks {color} || | {color:green}+1{color} | {color:green} @author {color} | {color:green} 0m 0s{color} | {color:green} The patch does not contain any @author tags. {color} | | {color:green}+1{color} | {color:green} test4tests {color} | {color:green} 0m 0s{color} | {color:green} The patch appears to include 2 new or modified test files. {color} | || || || || {color:brown} master Compile Tests {color} || | {color:blue}0{color} | {color:blue} mvndep {color} | {color:blue} 0m 24s{color} | {color:blue} Maven dependency ordering for branch {color} | | {color:green}+1{color} | {color:green} mvninstall {color} | {color:green} 4m 16s{color} | {color:green} master passed {color} | | {color:green}+1{color} | {color:green} compile {color} | {color:green} 1m 16s{color} | {color:green} master passed {color} | | {color:green}+1{color} | {color:green} checkstyle {color} | {color:green} 1m 26s{color} | {color:green} master passed {color} | | {color:green}+1{color} | {color:green} findbugs {color} | {color:green} 2m 34s{color} | {color:green} master passed {color} | | {color:green}+1{color} | {color:green} javadoc {color} | {color:green} 1m 23s{color} | {color:green} master passed {color} | || || || || {color:brown} Patch Compile Tests {color} || | {color:blue}0{color} | {color:blue} mvndep {color} | {color:blue} 0m 15s{color} | {color:blue} Maven dependency ordering for patch {color} | | {color:green}+1{color} | {color:green} mvninstall {color} | {color:green} 1m 17s{color} | {color:green} the patch passed {color} | | {color:green}+1{color} | {color:green} compile {color} | {color:green} 1m 14s{color} | {color:green} the patch passed {color} | | {color:green}+1{color} | {color:green} javac {color} | {color:green} 1m 14s{color} | {color:green} the patch passed {color} | | {color:orange}-0{color} | {color:orange} checkstyle {color} | {color:orange} 0m 30s{color} | {color:orange} tez-api: The patch generated 57 new + 989 unchanged - 166 fixed = 1046 total (was 1155) {color} | | {color:orange}-0{color} | {color:orange} checkstyle {color} | {color:orange} 0m 18s{color} | {color:orange} tez-runtime-library: The patch generated 94 new + 0 unchanged - 0 fixed = 94 total (was 0) {color} | | {color:orange}-0{color} | {color:orange} checkstyle {color} | {color:orange} 0m 30s{color} | {color:orange} tez-dag: The patch generated 7 new + 661 unchanged - 20 fixed = 668 total (was 681) {color} | | {color:green}+1{color} | {color:green} whitespace {color} | {color:green} 0m 0s{color} | {color:green} The patch has no whitespace issues. {color} | | {color:green}+1{color} | {color:green} findbugs {color} | {color:green} 2m 50s{color} | {color:green} the patch passed {color} | | {color:green}+1{color} | {color:green} javadoc {color} | {color:green} 1m 32s{color} | {color:green} the patch passed {color} | || || || || {color:brown} Other Tests {color} || | {color:red}-1{color} | {color:red} unit {color} | {color:red} 0m 27s{color} | {color:red} tez-api in the patch failed. {color} | | {color:red}-1{color} | {color:red} unit {color} | {color:red} 0m 36s{color} | {color:red} tez-runtime-library in the patch failed. {color} | | {color:red}-1{color} | {color:red} unit {color} | {color:red} 0m 42s{color} | {color:red} tez-dag in the patch failed. {color} | | {color:green}+1{color} | {color:green} asflicense {color} | {color:green} 0m 45s{color} | {color:green} The patch does not generate ASF License warnings. {color} | | {color:black}{color} | {color:black} {color} | {color:black} 39m 50s{color} | {color:black} {color} | \\ \\ || Subsystem || Report/Notes || | Docker | Client=17.05.0-ce Server=17.05.0-ce Image:yetus/tez:d4a62de | | JIRA Issue | TEZ-3998 | | JIRA Patch URL | https://issues.apache.org/jira/secure/attachment/12948416/TEZ-3998.001.patch.diff | | Optional Tests | dupname asflicense javac javadoc unit findbugs checkstyle compile | | uname | Linux a65201b8721f 3.13.0-153-generic #203-Ubuntu SMP Thu Jun 14 08:52:28 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux | | Build tool | maven | | Personality | /testptch/patchprocess/precommit/personality/provided.sh | | git revision | master / efc7331 | | maven | version: Apache Maven 3.3.9 | | Default
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688771#comment-16688771 ] Jonathan Eagles commented on TEZ-3998: -- [~yingdachen], we have not successfully integrated github with Tez at this point. Please 1)download the patch from the pull request https://github.com/apache/tez/pull/33.diff 2) upload the diff file as TEZ-3998.001.patch, and 3) look at the corresponding Tez QA results. The results will have to be filtered since the checkstyle in tez is not correct yet, but it will be the next step. > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated() interface of > VextexMangerPlugin to collect relevant information to assist the scheduling > decision, and *there is no additional API change necessary*. However, in more > subtle case such as the parameter-server example described in Fig. 1, other > scheduling type would be more relevant, therefore the placeholder for > *ConcurrentSchedulingType* will be introduced in this change as part of the > infrastructure work. > > Finally, since we assume that all communications between two vertices > connected via CONCURRENT edge are handled by application runtime, a > CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all > DME/VME handling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688546#comment-16688546 ] ASF GitHub Bot commented on TEZ-3998: - Github user yingdachen commented on the issue: https://github.com/apache/tez/pull/33 @jteagles have you got any chance to proceed? thanks, > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated() interface of > VextexMangerPlugin to collect relevant information to assist the scheduling > decision, and *there is no additional API change necessary*. However, in more > subtle case such as the parameter-server example described in Fig. 1, other > scheduling type would be more relevant, therefore the placeholder for > *ConcurrentSchedulingType* will be introduced in this change as part of the > infrastructure work. > > Finally, since we assume that all communications between two vertices > connected via CONCURRENT edge are handled by application runtime, a > CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all > DME/VME handling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684583#comment-16684583 ] ASF GitHub Bot commented on TEZ-3998: - Github user beltran commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r232864378 --- Diff: tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java --- @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.library.vertexmanager; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.VertexManagerPlugin; +import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.VertexManagerPluginDescriptor; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; +import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT; + +public class VertexManagerWithConcurrentInput extends VertexManagerPlugin { + + private static final Logger LOG = LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class); + + private final Map srcVerticesConfigured = Maps.newConcurrentMap(); + private int managedTasks; + private boolean tasksScheduled = false; + private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false); + private Configuration vertexConfig; + private String vertexName; + private ConcurrentEdgeTriggerType edgeTriggerType; + private boolean allSrcVerticesConfigured; + + int completedUpstreamTasks; + + public VertexManagerWithConcurrentInput(VertexManagerPluginContext context) { +super(context); + } + + @Override + public void initialize() { +if (getContext().getUserPayload() == null) { + throw new TezUncheckedException("user payload cannot be null for VertexManagerWithConcurrentInput"); +} +managedTasks = getContext().getVertexNumTasks(getContext().getVertexName()); +Map edges = getContext().getInputVertexEdgeProperties(); +for (Map.Entry entry : edges.entrySet()) { + if (!CONCURRENT.equals(entry.getValue().getSchedulingType())) { +throw new TezUncheckedException("All input edges to vertex " + vertexName + +" must be CONCURRENT."); + } + String srcVertex = entry.getKey(); + srcVerticesConfigured.put(srcVertex, false); + getContext().registerForVertexStateUpdates(srcVertex, EnumSet.of(VertexState.CONFIGURED)); +} + +try { + vertexConfig = TezUtils.createConfFromUserPayload(getContext().getUserPayload()); +} catch (IOException e) { + throw new TezUncheckedException(e); +} +edgeTriggerType = ConcurrentEdgeTriggerType.valueOf( +vertexConfig.get(TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE, +TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT)); +if
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684479#comment-16684479 ] ASF GitHub Bot commented on TEZ-3998: - Github user yingdachen commented on the issue: https://github.com/apache/tez/pull/33 thanks @jteagles . let me know if there is anything else to address. we will begin working on TEZ-3999 once this is in. > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated() interface of > VextexMangerPlugin to collect relevant information to assist the scheduling > decision, and *there is no additional API change necessary*. However, in more > subtle case such as the parameter-server example described in Fig. 1, other > scheduling type would be more relevant, therefore the placeholder for > *ConcurrentSchedulingType* will be introduced in this change as part of the > infrastructure work. > > Finally, since we assume that all communications between two vertices > connected via CONCURRENT edge are handled by application runtime, a > CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all > DME/VME handling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684477#comment-16684477 ] ASF GitHub Bot commented on TEZ-3998: - Github user jteagles commented on the issue: https://github.com/apache/tez/pull/33 @yingdachen, I am happy with the changes. I will like to get a test-patch run to verify with static analysis. Easiest way I know is to upload patch to the jira and submit. There will be many false positives since a Tez specific checkstyle confinguration has not been created post yetus migration. Will try to get to that tomorrow. Once we have test-patch run (Tez QA) and @beltran has given his final approval, we can move to checkin. > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated() interface of > VextexMangerPlugin to collect relevant information to assist the scheduling > decision, and *there is no additional API change necessary*. However, in more > subtle case such as the parameter-server example described in Fig. 1, other > scheduling type would be more relevant, therefore the placeholder for > *ConcurrentSchedulingType* will be introduced in this change as part of the > infrastructure work. > > Finally, since we assume that all communications between two vertices > connected via CONCURRENT edge are handled by application runtime, a > CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all > DME/VME handling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16681727#comment-16681727 ] ASF GitHub Bot commented on TEZ-3998: - Github user yingdachen commented on the issue: https://github.com/apache/tez/pull/33 @jteagles @beltran latest patch has addressed all previous code review comments, please take a look at your convenience. thanks > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated() interface of > VextexMangerPlugin to collect relevant information to assist the scheduling > decision, and *there is no additional API change necessary*. However, in more > subtle case such as the parameter-server example described in Fig. 1, other > scheduling type would be more relevant, therefore the placeholder for > *ConcurrentSchedulingType* will be introduced in this change as part of the > infrastructure work. > > Finally, since we assume that all communications between two vertices > connected via CONCURRENT edge are handled by application runtime, a > CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all > DME/VME handling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680671#comment-16680671 ] ASF GitHub Bot commented on TEZ-3998: - Github user yingdachen commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r232112600 --- Diff: tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java --- @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.library.vertexmanager; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.VertexManagerPlugin; +import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.VertexManagerPluginDescriptor; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; +import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT; + +public class VertexManagerWithConcurrentInput extends VertexManagerPlugin { + + private static final Logger LOG = LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class); + + private final Map srcVerticesConfigured = Maps.newConcurrentMap(); + private int managedTasks; + private boolean tasksScheduled = false; + private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false); + private Configuration vertexConfig; + private String vertexName; + private ConcurrentEdgeTriggerType edgeTriggerType; + private boolean allSrcVerticesConfigured; + + int completedUpstreamTasks; + + public VertexManagerWithConcurrentInput(VertexManagerPluginContext context) { +super(context); + } + + @Override + public void initialize() { +if (getContext().getUserPayload() == null) { + throw new TezUncheckedException("user payload cannot be null for VertexManagerWithConcurrentInput"); +} +managedTasks = getContext().getVertexNumTasks(getContext().getVertexName()); +Map edges = getContext().getInputVertexEdgeProperties(); +for (Map.Entry entry : edges.entrySet()) { + if (!CONCURRENT.equals(entry.getValue().getSchedulingType())) { +throw new TezUncheckedException("All input edges to vertex " + vertexName + +" must be CONCURRENT."); + } + String srcVertex = entry.getKey(); + srcVerticesConfigured.put(srcVertex, false); + getContext().registerForVertexStateUpdates(srcVertex, EnumSet.of(VertexState.CONFIGURED)); +} + +try { + vertexConfig = TezUtils.createConfFromUserPayload(getContext().getUserPayload()); +} catch (IOException e) { + throw new TezUncheckedException(e); +} +edgeTriggerType = ConcurrentEdgeTriggerType.valueOf( +vertexConfig.get(TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE, +TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT)); +if
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680635#comment-16680635 ] ASF GitHub Bot commented on TEZ-3998: - Github user beltran commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r232105351 --- Diff: tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java --- @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.library.vertexmanager; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.VertexManagerPlugin; +import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.VertexManagerPluginDescriptor; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; +import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT; + +public class VertexManagerWithConcurrentInput extends VertexManagerPlugin { + + private static final Logger LOG = LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class); + + private final Map srcVerticesConfigured = Maps.newConcurrentMap(); + private int managedTasks; + private boolean tasksScheduled = false; + private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false); + private Configuration vertexConfig; + private String vertexName; + private ConcurrentEdgeTriggerType edgeTriggerType; + private boolean allSrcVerticesConfigured; + + int completedUpstreamTasks; + + public VertexManagerWithConcurrentInput(VertexManagerPluginContext context) { +super(context); + } + + @Override + public void initialize() { +if (getContext().getUserPayload() == null) { + throw new TezUncheckedException("user payload cannot be null for VertexManagerWithConcurrentInput"); +} +managedTasks = getContext().getVertexNumTasks(getContext().getVertexName()); +Map edges = getContext().getInputVertexEdgeProperties(); +for (Map.Entry entry : edges.entrySet()) { + if (!CONCURRENT.equals(entry.getValue().getSchedulingType())) { +throw new TezUncheckedException("All input edges to vertex " + vertexName + +" must be CONCURRENT."); + } + String srcVertex = entry.getKey(); + srcVerticesConfigured.put(srcVertex, false); + getContext().registerForVertexStateUpdates(srcVertex, EnumSet.of(VertexState.CONFIGURED)); +} + +try { + vertexConfig = TezUtils.createConfFromUserPayload(getContext().getUserPayload()); +} catch (IOException e) { + throw new TezUncheckedException(e); +} +edgeTriggerType = ConcurrentEdgeTriggerType.valueOf( +vertexConfig.get(TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE, +TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT)); +if
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680631#comment-16680631 ] ASF GitHub Bot commented on TEZ-3998: - Github user beltran commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r232104534 --- Diff: tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java --- @@ -2440,23 +2453,30 @@ public void run() { } } - private void startDAG() throws IOException, TezException { + private boolean hasConcurrentEdge(DAGPlan dagPlan) { +boolean hasConcurrentEdge = false; +for (DAGProtos.EdgePlan edge : dagPlan.getEdgeList()) { + if (DAGProtos.PlanEdgeSchedulingType.CONCURRENT.equals(edge.getSchedulingType())) { +return true; + } +} +return hasConcurrentEdge; + } + + private DAGPlan readDAGPlanFile() throws IOException, TezException { FileInputStream dagPBBinaryStream = null; +DAGPlan dagPlan = null; try { - DAGPlan dagPlan = null; - // Read the protobuf DAG dagPBBinaryStream = new FileInputStream(new File(workingDirectory, TezConstants.TEZ_PB_PLAN_BINARY_NAME)); dagPlan = DAGPlan.parseFrom(dagPBBinaryStream); - - startDAG(dagPlan, null); --- End diff -- OK I see, got confused with what was happening before. > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated() interface of > VextexMangerPlugin to collect relevant information to assist the scheduling > decision, and *there is no additional API change necessary*. However, in more > subtle case such as the parameter-server example described in Fig. 1, other > scheduling type would be more relevant, therefore the placeholder for > *ConcurrentSchedulingType* will be introduced in this change as part of the > infrastructure work. > > Finally, since we assume that all communications between two vertices > connected via CONCURRENT edge are handled by application runtime, a > CONCURRENT edge will be
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680624#comment-16680624 ] ASF GitHub Bot commented on TEZ-3998: - Github user yingdachen commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r232102507 --- Diff: tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java --- @@ -2440,23 +2453,30 @@ public void run() { } } - private void startDAG() throws IOException, TezException { + private boolean hasConcurrentEdge(DAGPlan dagPlan) { +boolean hasConcurrentEdge = false; +for (DAGProtos.EdgePlan edge : dagPlan.getEdgeList()) { + if (DAGProtos.PlanEdgeSchedulingType.CONCURRENT.equals(edge.getSchedulingType())) { +return true; + } +} +return hasConcurrentEdge; + } + + private DAGPlan readDAGPlanFile() throws IOException, TezException { FileInputStream dagPBBinaryStream = null; +DAGPlan dagPlan = null; try { - DAGPlan dagPlan = null; - // Read the protobuf DAG dagPBBinaryStream = new FileInputStream(new File(workingDirectory, TezConstants.TEZ_PB_PLAN_BINARY_NAME)); dagPlan = DAGPlan.parseFrom(dagPBBinaryStream); - - startDAG(dagPlan, null); --- End diff -- 1993 is the line for if (recoveredDAGData != null) , when inside that branch startDAG would not be called, it would only be called in the corresponding "else" branch > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated() interface of > VextexMangerPlugin to collect relevant information to assist the scheduling > decision, and *there is no additional API change necessary*. However, in more > subtle case such as the parameter-server example described in Fig. 1, other > scheduling type would be more relevant, therefore the placeholder for > *ConcurrentSchedulingType* will be introduced in this change as part of the > infrastructure work. > > Finally, since we assume that all communications between
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680625#comment-16680625 ] ASF GitHub Bot commented on TEZ-3998: - Github user yingdachen commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r232103086 --- Diff: tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java --- @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.library.vertexmanager; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.VertexManagerPlugin; +import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.VertexManagerPluginDescriptor; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; +import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT; + +public class VertexManagerWithConcurrentInput extends VertexManagerPlugin { + + private static final Logger LOG = LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class); + + private final Map srcVerticesConfigured = Maps.newConcurrentMap(); + private int managedTasks; + private boolean tasksScheduled = false; + private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false); + private Configuration vertexConfig; + private String vertexName; + private ConcurrentEdgeTriggerType edgeTriggerType; + private boolean allSrcVerticesConfigured; + + int completedUpstreamTasks; + + public VertexManagerWithConcurrentInput(VertexManagerPluginContext context) { +super(context); + } + + @Override + public void initialize() { +if (getContext().getUserPayload() == null) { + throw new TezUncheckedException("user payload cannot be null for VertexManagerWithConcurrentInput"); +} +managedTasks = getContext().getVertexNumTasks(getContext().getVertexName()); +Map edges = getContext().getInputVertexEdgeProperties(); +for (Map.Entry entry : edges.entrySet()) { + if (!CONCURRENT.equals(entry.getValue().getSchedulingType())) { +throw new TezUncheckedException("All input edges to vertex " + vertexName + +" must be CONCURRENT."); + } + String srcVertex = entry.getKey(); + srcVerticesConfigured.put(srcVertex, false); + getContext().registerForVertexStateUpdates(srcVertex, EnumSet.of(VertexState.CONFIGURED)); +} + +try { + vertexConfig = TezUtils.createConfFromUserPayload(getContext().getUserPayload()); +} catch (IOException e) { + throw new TezUncheckedException(e); +} +edgeTriggerType = ConcurrentEdgeTriggerType.valueOf( +vertexConfig.get(TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE, +TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT)); +if
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680607#comment-16680607 ] ASF GitHub Bot commented on TEZ-3998: - Github user beltran commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r232100045 --- Diff: tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java --- @@ -2440,23 +2453,30 @@ public void run() { } } - private void startDAG() throws IOException, TezException { + private boolean hasConcurrentEdge(DAGPlan dagPlan) { +boolean hasConcurrentEdge = false; +for (DAGProtos.EdgePlan edge : dagPlan.getEdgeList()) { + if (DAGProtos.PlanEdgeSchedulingType.CONCURRENT.equals(edge.getSchedulingType())) { +return true; + } +} +return hasConcurrentEdge; + } + + private DAGPlan readDAGPlanFile() throws IOException, TezException { FileInputStream dagPBBinaryStream = null; +DAGPlan dagPlan = null; try { - DAGPlan dagPlan = null; - // Read the protobuf DAG dagPBBinaryStream = new FileInputStream(new File(workingDirectory, TezConstants.TEZ_PB_PLAN_BINARY_NAME)); dagPlan = DAGPlan.parseFrom(dagPBBinaryStream); - - startDAG(dagPlan, null); --- End diff -- Oh yes, just saw there are two lines like that. I was referring to the one in 1993. So before when `startDAG` was called and then got into 1993 it had no effect? > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated() interface of > VextexMangerPlugin to collect relevant information to assist the scheduling > decision, and *there is no additional API change necessary*. However, in more > subtle case such as the parameter-server example described in Fig. 1, other > scheduling type would be more relevant, therefore the placeholder for > *ConcurrentSchedulingType* will be introduced in this change as part of the > infrastructure work. > > Finally, since we assume that all communications between two
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680606#comment-16680606 ] ASF GitHub Bot commented on TEZ-3998: - Github user beltran commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r232101282 --- Diff: tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java --- @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.library.vertexmanager; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.VertexManagerPlugin; +import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.VertexManagerPluginDescriptor; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; +import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT; + +public class VertexManagerWithConcurrentInput extends VertexManagerPlugin { + + private static final Logger LOG = LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class); + + private final Map srcVerticesConfigured = Maps.newConcurrentMap(); + private int managedTasks; + private boolean tasksScheduled = false; + private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false); + private Configuration vertexConfig; + private String vertexName; + private ConcurrentEdgeTriggerType edgeTriggerType; + private boolean allSrcVerticesConfigured; + + int completedUpstreamTasks; + + public VertexManagerWithConcurrentInput(VertexManagerPluginContext context) { +super(context); + } + + @Override + public void initialize() { +if (getContext().getUserPayload() == null) { + throw new TezUncheckedException("user payload cannot be null for VertexManagerWithConcurrentInput"); +} +managedTasks = getContext().getVertexNumTasks(getContext().getVertexName()); +Map edges = getContext().getInputVertexEdgeProperties(); +for (Map.Entry entry : edges.entrySet()) { + if (!CONCURRENT.equals(entry.getValue().getSchedulingType())) { +throw new TezUncheckedException("All input edges to vertex " + vertexName + +" must be CONCURRENT."); + } + String srcVertex = entry.getKey(); + srcVerticesConfigured.put(srcVertex, false); + getContext().registerForVertexStateUpdates(srcVertex, EnumSet.of(VertexState.CONFIGURED)); +} + +try { + vertexConfig = TezUtils.createConfFromUserPayload(getContext().getUserPayload()); +} catch (IOException e) { + throw new TezUncheckedException(e); +} +edgeTriggerType = ConcurrentEdgeTriggerType.valueOf( +vertexConfig.get(TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE, +TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT)); +if
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680532#comment-16680532 ] ASF GitHub Bot commented on TEZ-3998: - Github user yingdachen commented on the issue: https://github.com/apache/tez/pull/33 @beltran Thanks for your detailed review, I have addressed your review comments in the new iteration. @beltran @jteagles can you take another look at your convenience. thanks, > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated() interface of > VextexMangerPlugin to collect relevant information to assist the scheduling > decision, and *there is no additional API change necessary*. However, in more > subtle case such as the parameter-server example described in Fig. 1, other > scheduling type would be more relevant, therefore the placeholder for > *ConcurrentSchedulingType* will be introduced in this change as part of the > infrastructure work. > > Finally, since we assume that all communications between two vertices > connected via CONCURRENT edge are handled by application runtime, a > CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all > DME/VME handling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680526#comment-16680526 ] ASF GitHub Bot commented on TEZ-3998: - Github user yingdachen commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r232087761 --- Diff: tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java --- @@ -2440,23 +2453,30 @@ public void run() { } } - private void startDAG() throws IOException, TezException { + private boolean hasConcurrentEdge(DAGPlan dagPlan) { +boolean hasConcurrentEdge = false; +for (DAGProtos.EdgePlan edge : dagPlan.getEdgeList()) { + if (DAGProtos.PlanEdgeSchedulingType.CONCURRENT.equals(edge.getSchedulingType())) { +return true; + } +} +return hasConcurrentEdge; + } + + private DAGPlan readDAGPlanFile() throws IOException, TezException { FileInputStream dagPBBinaryStream = null; +DAGPlan dagPlan = null; try { - DAGPlan dagPlan = null; - // Read the protobuf DAG dagPBBinaryStream = new FileInputStream(new File(workingDirectory, TezConstants.TEZ_PB_PLAN_BINARY_NAME)); dagPlan = DAGPlan.parseFrom(dagPBBinaryStream); - - startDAG(dagPlan, null); --- End diff -- I am not sure whether you are referring to the recoveredDAGData != null on line 1993 or 1983. For the one on 1993, it would not call startDAG anyway. For the one on 1983, the logic added is only to forcefully invalid the recovery data that has been read, it does not impact the startDAG behavior either. Overall, there are two modification here: 1. the DAGPlan is read a bit earlier (in non-session mode) 2. the already-read recovery content may be useless when there is concurrent edge in the DAG. This is done on purpose and I think should be acceptable since it does not modify existing logic flow and provide the safeguard for DAG execution with concurrent edge, before we have a proper failover implementation for that TEZ-4017 > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680502#comment-16680502 ] ASF GitHub Bot commented on TEZ-3998: - Github user yingdachen commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r232084161 --- Diff: tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java --- @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.library.vertexmanager; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.VertexManagerPlugin; +import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.VertexManagerPluginDescriptor; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; +import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT; + +public class VertexManagerWithConcurrentInput extends VertexManagerPlugin { + + private static final Logger LOG = LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class); + + private final Map srcVerticesConfigured = Maps.newConcurrentMap(); + private int managedTasks; + private boolean tasksScheduled = false; + private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false); + private Configuration vertexConfig; + private String vertexName; + private ConcurrentEdgeTriggerType edgeTriggerType; + private boolean allSrcVerticesConfigured; + + int completedUpstreamTasks; + + public VertexManagerWithConcurrentInput(VertexManagerPluginContext context) { +super(context); + } + + @Override + public void initialize() { +if (getContext().getUserPayload() == null) { + throw new TezUncheckedException("user payload cannot be null for VertexManagerWithConcurrentInput"); +} +managedTasks = getContext().getVertexNumTasks(getContext().getVertexName()); +Map edges = getContext().getInputVertexEdgeProperties(); +for (Map.Entry entry : edges.entrySet()) { + if (!CONCURRENT.equals(entry.getValue().getSchedulingType())) { +throw new TezUncheckedException("All input edges to vertex " + vertexName + +" must be CONCURRENT."); + } + String srcVertex = entry.getKey(); + srcVerticesConfigured.put(srcVertex, false); + getContext().registerForVertexStateUpdates(srcVertex, EnumSet.of(VertexState.CONFIGURED)); +} + +try { + vertexConfig = TezUtils.createConfFromUserPayload(getContext().getUserPayload()); +} catch (IOException e) { + throw new TezUncheckedException(e); +} +edgeTriggerType = ConcurrentEdgeTriggerType.valueOf( +vertexConfig.get(TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE, +TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT)); +if
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680497#comment-16680497 ] ASF GitHub Bot commented on TEZ-3998: - Github user yingdachen commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r232083585 --- Diff: tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java --- @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.library.vertexmanager; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.VertexManagerPlugin; +import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.VertexManagerPluginDescriptor; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; +import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT; + +public class VertexManagerWithConcurrentInput extends VertexManagerPlugin { + + private static final Logger LOG = LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class); + + private final Map srcVerticesConfigured = Maps.newConcurrentMap(); + private int managedTasks; + private boolean tasksScheduled = false; + private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false); + private Configuration vertexConfig; + private String vertexName; + private ConcurrentEdgeTriggerType edgeTriggerType; + private boolean allSrcVerticesConfigured; --- End diff -- We intend to expand logic that manage completed/total upstream tasks in TEZ-3999. volatile property added for allSrcVerticesConfigured > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680491#comment-16680491 ] ASF GitHub Bot commented on TEZ-3998: - Github user yingdachen commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r232082409 --- Diff: tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java --- @@ -408,6 +409,18 @@ public TezConfiguration(boolean loadDefaults) { + "launch.env"; public static final String TEZ_AM_LAUNCH_ENV_DEFAULT = ""; + /** + * String value. Describes the timing of scheduling downstream vertex tasks + * when the vertex has a concurrent input edge. --- End diff -- I have expand the comment a bit in an effort to make it easier to understand. Not using the event name directly is due to the subtle difference between event and scheduling actually being kicked off: for example, in the case of "SOURCE_TASK_STARTED", the scheduling is not triggered by any TASK_STARTED event, but instead it will only be triggered after enough TASK_STARTED events have been collected (for example, all the upstream tasks are running). For the lack of a better choice, I think defining a TriggerType is easier to expand in the long run too. > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated() interface of > VextexMangerPlugin to collect relevant information to assist the scheduling > decision, and *there is no additional API change necessary*. However, in more > subtle case such as the parameter-server example described in Fig. 1, other > scheduling type would be more relevant, therefore the placeholder for > *ConcurrentSchedulingType* will be introduced in this change as part of the > infrastructure work. > > Finally, since we assume that all communications between two vertices > connected via CONCURRENT edge are handled by application runtime, a > CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all > DME/VME handling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680505#comment-16680505 ] ASF GitHub Bot commented on TEZ-3998: - Github user yingdachen commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r232084381 --- Diff: tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java --- @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.library.vertexmanager; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.VertexManagerPlugin; +import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.VertexManagerPluginDescriptor; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; +import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT; + +public class VertexManagerWithConcurrentInput extends VertexManagerPlugin { + + private static final Logger LOG = LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class); + + private final Map srcVerticesConfigured = Maps.newConcurrentMap(); + private int managedTasks; + private boolean tasksScheduled = false; + private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false); + private Configuration vertexConfig; + private String vertexName; + private ConcurrentEdgeTriggerType edgeTriggerType; + private boolean allSrcVerticesConfigured; + + int completedUpstreamTasks; + + public VertexManagerWithConcurrentInput(VertexManagerPluginContext context) { +super(context); + } + + @Override + public void initialize() { +if (getContext().getUserPayload() == null) { --- End diff -- thanks for pointing this out. done > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680504#comment-16680504 ] ASF GitHub Bot commented on TEZ-3998: - Github user yingdachen commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r232084316 --- Diff: tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java --- @@ -82,31 +82,50 @@ PERSISTED_RELIABLE, /** * Data produced by the source task is available only while the source task - * is running. This requires the destination task to run concurrently with - * the source task. This is not supported yet. + * is running. This requires the destination task to run concurrently with + * the source task. Development in progress. */ @Unstable EPHEMERAL } - + /** - * Determines when the destination task is eligible to run, once the source + * Determines when the destination task is eligible to run, once the source * task is eligible to run. */ public enum SchedulingType { /** - * Destination task is eligible to run after one or more of its source tasks + * Destination task is eligible to run after one or more of its source tasks * have started or completed. */ SEQUENTIAL, /** * Destination task must run concurrently with the source task. - * This is not supported yet. + * Development in progress. */ @Unstable CONCURRENT } - + + /** + * Determines the relevant event(s) that will assist in scheduling downstream vertex + * connected via a edge with CONCURRENT {@link SchedulingType}. + */ + public enum ConcurrentEdgeTriggerType { +/** + * trigger downstream vertex tasks scheduling upon upstream vertex being configured --- End diff -- done > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated() interface of > VextexMangerPlugin to
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680275#comment-16680275 ] ASF GitHub Bot commented on TEZ-3998: - Github user beltran commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r232024580 --- Diff: tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java --- @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.library.vertexmanager; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.VertexManagerPlugin; +import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.VertexManagerPluginDescriptor; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; +import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT; + +public class VertexManagerWithConcurrentInput extends VertexManagerPlugin { + + private static final Logger LOG = LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class); + + private final Map srcVerticesConfigured = Maps.newConcurrentMap(); + private int managedTasks; + private boolean tasksScheduled = false; + private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false); + private Configuration vertexConfig; + private String vertexName; + private ConcurrentEdgeTriggerType edgeTriggerType; + private boolean allSrcVerticesConfigured; + + int completedUpstreamTasks; + + public VertexManagerWithConcurrentInput(VertexManagerPluginContext context) { +super(context); + } + + @Override + public void initialize() { +if (getContext().getUserPayload() == null) { --- End diff -- Should the same check as in [here](https://github.com/apache/tez/blob/master/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java#L235) be done? Otherwise a NullPointerException may happen at `vertexConfig = TezUtils.createConfFromUserPayload(getContext().getUserPayload());`. > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680277#comment-16680277 ] ASF GitHub Bot commented on TEZ-3998: - Github user beltran commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r232016999 --- Diff: tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java --- @@ -408,6 +409,18 @@ public TezConfiguration(boolean loadDefaults) { + "launch.env"; public static final String TEZ_AM_LAUNCH_ENV_DEFAULT = ""; + /** + * String value. Describes the timing of scheduling downstream vertex tasks + * when the vertex has a concurrent input edge. --- End diff -- This is a bit hard to undertstand IMO. I think something like "It should be set to the name of the event necessary to trigger the scheduling of a vertex task" would be clearer > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated() interface of > VextexMangerPlugin to collect relevant information to assist the scheduling > decision, and *there is no additional API change necessary*. However, in more > subtle case such as the parameter-server example described in Fig. 1, other > scheduling type would be more relevant, therefore the placeholder for > *ConcurrentSchedulingType* will be introduced in this change as part of the > infrastructure work. > > Finally, since we assume that all communications between two vertices > connected via CONCURRENT edge are handled by application runtime, a > CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all > DME/VME handling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680279#comment-16680279 ] ASF GitHub Bot commented on TEZ-3998: - Github user beltran commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r232028676 --- Diff: tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java --- @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.library.vertexmanager; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.VertexManagerPlugin; +import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.VertexManagerPluginDescriptor; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; +import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT; + +public class VertexManagerWithConcurrentInput extends VertexManagerPlugin { + + private static final Logger LOG = LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class); + + private final Map srcVerticesConfigured = Maps.newConcurrentMap(); + private int managedTasks; + private boolean tasksScheduled = false; + private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false); + private Configuration vertexConfig; + private String vertexName; + private ConcurrentEdgeTriggerType edgeTriggerType; + private boolean allSrcVerticesConfigured; + + int completedUpstreamTasks; + + public VertexManagerWithConcurrentInput(VertexManagerPluginContext context) { +super(context); + } + + @Override + public void initialize() { +if (getContext().getUserPayload() == null) { + throw new TezUncheckedException("user payload cannot be null for VertexManagerWithConcurrentInput"); +} +managedTasks = getContext().getVertexNumTasks(getContext().getVertexName()); +Map edges = getContext().getInputVertexEdgeProperties(); +for (Map.Entry entry : edges.entrySet()) { + if (!CONCURRENT.equals(entry.getValue().getSchedulingType())) { +throw new TezUncheckedException("All input edges to vertex " + vertexName + +" must be CONCURRENT."); + } + String srcVertex = entry.getKey(); + srcVerticesConfigured.put(srcVertex, false); + getContext().registerForVertexStateUpdates(srcVertex, EnumSet.of(VertexState.CONFIGURED)); +} + +try { + vertexConfig = TezUtils.createConfFromUserPayload(getContext().getUserPayload()); +} catch (IOException e) { + throw new TezUncheckedException(e); +} +edgeTriggerType = ConcurrentEdgeTriggerType.valueOf( +vertexConfig.get(TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE, +TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT)); +if
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680274#comment-16680274 ] ASF GitHub Bot commented on TEZ-3998: - Github user beltran commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r232026642 --- Diff: tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java --- @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.library.vertexmanager; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.VertexManagerPlugin; +import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.VertexManagerPluginDescriptor; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; +import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT; + +public class VertexManagerWithConcurrentInput extends VertexManagerPlugin{ + private static final Logger LOG = LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class); + + private final Map srcVerticesConfigured = Maps.newConcurrentMap(); + private int managedTasks; + private boolean tasksScheduled = false; + private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false); + private Configuration vertexConfig; + private String vertexName; + private ConcurrentEdgeTriggerType edgeTriggerType; + private boolean allSrcVerticesConfigured; + + int completedUpstreamTasks; + + public VertexManagerWithConcurrentInput(VertexManagerPluginContext context) { +super(context); + } + + @Override + public void initialize() { +if (getContext().getUserPayload() == null) { + throw new TezUncheckedException("user payload cannot be null for VertexManagerWithConcurrentInput"); +} +managedTasks = getContext().getVertexNumTasks(getContext().getVertexName()); +Map edges = getContext().getInputVertexEdgeProperties(); +for (Map.Entry entry : edges.entrySet()) { + if (!CONCURRENT.equals(entry.getValue().getSchedulingType())){ +throw new TezUncheckedException("All input edges to vertex " + vertexName + +" must be CONCURRENT."); + } + String srcVertex = entry.getKey(); + srcVerticesConfigured.put(srcVertex, false); + getContext().registerForVertexStateUpdates(srcVertex, EnumSet.of(VertexState.CONFIGURED)); +} + +try { + vertexConfig = TezUtils.createConfFromUserPayload(getContext().getUserPayload()); +} catch (IOException e) { + throw new TezUncheckedException(e); +} +edgeTriggerType = ConcurrentEdgeTriggerType.valueOf( +vertexConfig.get(TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE, +TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT)); +if
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680276#comment-16680276 ] ASF GitHub Bot commented on TEZ-3998: - Github user beltran commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r232014782 --- Diff: tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java --- @@ -82,31 +82,50 @@ PERSISTED_RELIABLE, /** * Data produced by the source task is available only while the source task - * is running. This requires the destination task to run concurrently with - * the source task. This is not supported yet. + * is running. This requires the destination task to run concurrently with + * the source task. Development in progress. */ @Unstable EPHEMERAL } - + /** - * Determines when the destination task is eligible to run, once the source + * Determines when the destination task is eligible to run, once the source * task is eligible to run. */ public enum SchedulingType { /** - * Destination task is eligible to run after one or more of its source tasks + * Destination task is eligible to run after one or more of its source tasks * have started or completed. */ SEQUENTIAL, /** * Destination task must run concurrently with the source task. - * This is not supported yet. + * Development in progress. */ @Unstable CONCURRENT } - + + /** + * Determines the relevant event(s) that will assist in scheduling downstream vertex + * connected via a edge with CONCURRENT {@link SchedulingType}. + */ + public enum ConcurrentEdgeTriggerType { +/** + * trigger downstream vertex tasks scheduling upon upstream vertex being configured --- End diff -- type: vertex -> vertexes? > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated() interface of >
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680278#comment-16680278 ] ASF GitHub Bot commented on TEZ-3998: - Github user beltran commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r232027395 --- Diff: tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java --- @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.library.vertexmanager; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.VertexManagerPlugin; +import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.VertexManagerPluginDescriptor; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; +import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT; + +public class VertexManagerWithConcurrentInput extends VertexManagerPlugin { + + private static final Logger LOG = LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class); + + private final Map srcVerticesConfigured = Maps.newConcurrentMap(); + private int managedTasks; + private boolean tasksScheduled = false; + private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false); + private Configuration vertexConfig; + private String vertexName; + private ConcurrentEdgeTriggerType edgeTriggerType; + private boolean allSrcVerticesConfigured; --- End diff -- This variable seems to be written/read in several callback. Maybe it's worth to make it volatile and prevent some race condition? > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680280#comment-16680280 ] ASF GitHub Bot commented on TEZ-3998: - Github user beltran commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r232032540 --- Diff: tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java --- @@ -2440,23 +2453,30 @@ public void run() { } } - private void startDAG() throws IOException, TezException { + private boolean hasConcurrentEdge(DAGPlan dagPlan) { +boolean hasConcurrentEdge = false; +for (DAGProtos.EdgePlan edge : dagPlan.getEdgeList()) { + if (DAGProtos.PlanEdgeSchedulingType.CONCURRENT.equals(edge.getSchedulingType())) { +return true; + } +} +return hasConcurrentEdge; + } + + private DAGPlan readDAGPlanFile() throws IOException, TezException { FileInputStream dagPBBinaryStream = null; +DAGPlan dagPlan = null; try { - DAGPlan dagPlan = null; - // Read the protobuf DAG dagPBBinaryStream = new FileInputStream(new File(workingDirectory, TezConstants.TEZ_PB_PLAN_BINARY_NAME)); dagPlan = DAGPlan.parseFrom(dagPBBinaryStream); - - startDAG(dagPlan, null); --- End diff -- I see this has been moved [here](https://github.com/apache/tez/pull/33/files#diff-755c0ec043a1800cd6cbf31823a59c8fR2069). Are there going to be any consequences with that? The logic is not exactly the same since before `startDAG(dagPlan, null);` is called as long as `(!isSession)` and now it wouldn't if it gets in the section `if (recoveredDAGData != null) {` in `DAGAppMaster.serviceStart` > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated() interface of > VextexMangerPlugin to collect relevant information to assist the scheduling > decision, and *there is no additional API change necessary*. However, in more > subtle case such as the parameter-server example described in Fig. 1, other > scheduling
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677780#comment-16677780 ] ASF GitHub Bot commented on TEZ-3998: - Github user yingdachen commented on the issue: https://github.com/apache/tez/pull/33 @jteagles could you please take a look at the latest patch? we hope to work on TEZ-3999 once this is in. thanks, > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated() interface of > VextexMangerPlugin to collect relevant information to assist the scheduling > decision, and *there is no additional API change necessary*. However, in more > subtle case such as the parameter-server example described in Fig. 1, other > scheduling type would be more relevant, therefore the placeholder for > *ConcurrentSchedulingType* will be introduced in this change as part of the > infrastructure work. > > Finally, since we assume that all communications between two vertices > connected via CONCURRENT edge are handled by application runtime, a > CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all > DME/VME handling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16670920#comment-16670920 ] ASF GitHub Bot commented on TEZ-3998: - Github user yingdachen commented on the issue: https://github.com/apache/tez/pull/33 for the latest patch 1. address previous code review comments 2. add check in DAGAppMaster, for a DAG with concurrent edge, disable recovery (enforce the recovered DAG to run from scratch in case of AM failover). We will tackle proper failover strategy for a DAG with concurrent edge in TEZ-4017. > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated() interface of > VextexMangerPlugin to collect relevant information to assist the scheduling > decision, and *there is no additional API change necessary*. However, in more > subtle case such as the parameter-server example described in Fig. 1, other > scheduling type would be more relevant, therefore the placeholder for > *ConcurrentSchedulingType* will be introduced in this change as part of the > infrastructure work. > > Finally, since we assume that all communications between two vertices > connected via CONCURRENT edge are handled by application runtime, a > CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all > DME/VME handling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16670918#comment-16670918 ] ASF GitHub Bot commented on TEZ-3998: - Github user yingdachen commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r229906256 --- Diff: tez-runtime-library/src/main/java/org/apache/tez/dag/library/edgemanager/SilentEdgeManager.java --- @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.library.edgemanager; + +import org.apache.tez.dag.api.EdgeManagerPlugin; +import org.apache.tez.dag.api.EdgeManagerPluginContext; +import org.apache.tez.runtime.api.events.DataMovementEvent; +import org.apache.tez.runtime.api.events.InputReadErrorEvent; + +import java.util.List; +import java.util.Map; + +/** + * A dummy edge manager used in scenarios where application will depend on + * the direct connection between containers/tasks to handle all data communications, + * including both routing and actual data transfers. + */ + +public class SilentEdgeManager extends EdgeManagerPlugin{ --- End diff -- done > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated()
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16670914#comment-16670914 ] ASF GitHub Bot commented on TEZ-3998: - Github user yingdachen commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r229906154 --- Diff: tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java --- @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.library.vertexmanager; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.VertexManagerPlugin; +import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.VertexManagerPluginDescriptor; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; +import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT; + +public class VertexManagerWithConcurrentInput extends VertexManagerPlugin{ + private static final Logger LOG = LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class); + + private final Map srcVerticesConfigured = Maps.newConcurrentMap(); + private int managedTasks; + private boolean tasksScheduled = false; + private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false); + private Configuration vertexConfig; + private String vertexName; + private ConcurrentEdgeTriggerType edgeTriggerType; + private boolean allSrcVerticesConfigured; + + int completedUpstreamTasks; + + public VertexManagerWithConcurrentInput(VertexManagerPluginContext context) { +super(context); + } + + @Override + public void initialize() { +if (getContext().getUserPayload() == null) { + throw new TezUncheckedException("user payload cannot be null for VertexManagerWithConcurrentInput"); +} +managedTasks = getContext().getVertexNumTasks(getContext().getVertexName()); +Map edges = getContext().getInputVertexEdgeProperties(); +for (Map.Entry entry : edges.entrySet()) { + if (!CONCURRENT.equals(entry.getValue().getSchedulingType())){ +throw new TezUncheckedException("All input edges to vertex " + vertexName + +" must be CONCURRENT."); + } + String srcVertex = entry.getKey(); + srcVerticesConfigured.put(srcVertex, false); + getContext().registerForVertexStateUpdates(srcVertex, EnumSet.of(VertexState.CONFIGURED)); +} + +try { + vertexConfig = TezUtils.createConfFromUserPayload(getContext().getUserPayload()); +} catch (IOException e) { + throw new TezUncheckedException(e); +} +edgeTriggerType = ConcurrentEdgeTriggerType.valueOf( +vertexConfig.get(TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE, +TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT)); +if
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16670916#comment-16670916 ] ASF GitHub Bot commented on TEZ-3998: - Github user yingdachen commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r229906229 --- Diff: tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java --- @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.library.vertexmanager; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.VertexManagerPlugin; +import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.VertexManagerPluginDescriptor; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; +import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT; + +public class VertexManagerWithConcurrentInput extends VertexManagerPlugin{ + private static final Logger LOG = LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class); + + private final Map srcVerticesConfigured = Maps.newConcurrentMap(); + private int managedTasks; + private boolean tasksScheduled = false; + private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false); + private Configuration vertexConfig; + private String vertexName; + private ConcurrentEdgeTriggerType edgeTriggerType; + private boolean allSrcVerticesConfigured; + + int completedUpstreamTasks; + + public VertexManagerWithConcurrentInput(VertexManagerPluginContext context) { +super(context); + } + + @Override + public void initialize() { +if (getContext().getUserPayload() == null) { + throw new TezUncheckedException("user payload cannot be null for VertexManagerWithConcurrentInput"); +} +managedTasks = getContext().getVertexNumTasks(getContext().getVertexName()); +Map edges = getContext().getInputVertexEdgeProperties(); +for (Map.Entry entry : edges.entrySet()) { + if (!CONCURRENT.equals(entry.getValue().getSchedulingType())){ --- End diff -- done > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16670913#comment-16670913 ] ASF GitHub Bot commented on TEZ-3998: - Github user yingdachen commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r229906138 --- Diff: tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestVertexManagerWithConcurrentInput.java --- @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.library.vertexmanager; + +import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.OutputDescriptor; +import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.VertexManagerPluginDescriptor; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.dag.library.edgemanager.SilentEdgeManager; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.MockitoAnnotations; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TestVertexManagerWithConcurrentInput { + + @Captor + ArgumentCaptor> requestCaptor; + + @Before + public void init() { +MockitoAnnotations.initMocks(this); + } + + @Test//(timeout=5000) --- End diff -- done > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios.
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16670917#comment-16670917 ] ASF GitHub Bot commented on TEZ-3998: - Github user yingdachen commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r229906241 --- Diff: tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java --- @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.library.vertexmanager; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.VertexManagerPlugin; +import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.VertexManagerPluginDescriptor; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; +import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT; + +public class VertexManagerWithConcurrentInput extends VertexManagerPlugin{ --- End diff -- done > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16658029#comment-16658029 ] ASF GitHub Bot commented on TEZ-3998: - Github user yingdachen commented on the issue: https://github.com/apache/tez/pull/33 Will be addressing the review comments in the next few days. In addition, some more details will be provided for fault tolerance in terms of AM recovery. I am thinking that to begin with, we will at least guarantee that the introduction of concurrent edge would not interfere with normal AM recovery. Design for fully functional recovery (for example, making the entire sub-graph connected by concurrent edge a a failure region) will likely to follow after that, since there are quite some details to polish. > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated() interface of > VextexMangerPlugin to collect relevant information to assist the scheduling > decision, and *there is no additional API change necessary*. However, in more > subtle case such as the parameter-server example described in Fig. 1, other > scheduling type would be more relevant, therefore the placeholder for > *ConcurrentSchedulingType* will be introduced in this change as part of the > infrastructure work. > > Finally, since we assume that all communications between two vertices > connected via CONCURRENT edge are handled by application runtime, a > CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all > DME/VME handling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657408#comment-16657408 ] ASF GitHub Bot commented on TEZ-3998: - Github user jteagles commented on the issue: https://github.com/apache/tez/pull/33 @yingdachen, a few other considerations. 1)Normally, a feature like this could be a candidate for feature branch. Do to the low risk interaction with Systems and backwards compatible API, I think we could allow develop in trunk. It may cause some coordinating with ongoing 0.10 build to prevent delay of that release line. 2) Please provide some input on fault tolerance of AM and attempt in the presence of Concurrent Edges. Specifically, we will want to provide fault tolerance guarantees and possible interactions with AM recovery and assign component responsibilities and interactions where needed. > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated() interface of > VextexMangerPlugin to collect relevant information to assist the scheduling > decision, and *there is no additional API change necessary*. However, in more > subtle case such as the parameter-server example described in Fig. 1, other > scheduling type would be more relevant, therefore the placeholder for > *ConcurrentSchedulingType* will be introduced in this change as part of the > infrastructure work. > > Finally, since we assume that all communications between two vertices > connected via CONCURRENT edge are handled by application runtime, a > CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all > DME/VME handling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657367#comment-16657367 ] ASF GitHub Bot commented on TEZ-3998: - Github user jteagles commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r226708226 --- Diff: tez-runtime-library/src/main/java/org/apache/tez/dag/library/edgemanager/SilentEdgeManager.java --- @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.library.edgemanager; + +import org.apache.tez.dag.api.EdgeManagerPlugin; +import org.apache.tez.dag.api.EdgeManagerPluginContext; +import org.apache.tez.runtime.api.events.DataMovementEvent; +import org.apache.tez.runtime.api.events.InputReadErrorEvent; + +import java.util.List; +import java.util.Map; + +/** + * A dummy edge manager used in scenarios where application will depend on + * the direct connection between containers/tasks to handle all data communications, + * including both routing and actual data transfers. + */ + +public class SilentEdgeManager extends EdgeManagerPlugin{ --- End diff -- Insert a space between "EdgeManagerPlugin" and "{" ```suggestion public class SilentEdgeManager extends EdgeManagerPlugin { ``` > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be >
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657361#comment-16657361 ] ASF GitHub Bot commented on TEZ-3998: - Github user jteagles commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r226708831 --- Diff: tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java --- @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.library.vertexmanager; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.VertexManagerPlugin; +import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.VertexManagerPluginDescriptor; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; +import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT; + +public class VertexManagerWithConcurrentInput extends VertexManagerPlugin{ --- End diff -- ```suggestion public class VertexManagerWithConcurrentInput extends VertexManagerPlugin { ``` > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657366#comment-16657366 ] ASF GitHub Bot commented on TEZ-3998: - Github user jteagles commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r226769441 --- Diff: tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestVertexManagerWithConcurrentInput.java --- @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.library.vertexmanager; + +import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.OutputDescriptor; +import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.VertexManagerPluginDescriptor; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.dag.library.edgemanager.SilentEdgeManager; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.MockitoAnnotations; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TestVertexManagerWithConcurrentInput { + + @Captor + ArgumentCaptor> requestCaptor; + + @Before + public void init() { +MockitoAnnotations.initMocks(this); + } + + @Test//(timeout=5000) --- End diff -- Perhaps timeout should be uncommented ```suggestion @Test(timeout=5000) ``` > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType*
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657362#comment-16657362 ] ASF GitHub Bot commented on TEZ-3998: - Github user jteagles commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r226709122 --- Diff: tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java --- @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.library.vertexmanager; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.VertexManagerPlugin; +import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.VertexManagerPluginDescriptor; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; +import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT; + +public class VertexManagerWithConcurrentInput extends VertexManagerPlugin{ + private static final Logger LOG = LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class); + + private final Map srcVerticesConfigured = Maps.newConcurrentMap(); + private int managedTasks; + private boolean tasksScheduled = false; + private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false); + private Configuration vertexConfig; + private String vertexName; + private ConcurrentEdgeTriggerType edgeTriggerType; + private boolean allSrcVerticesConfigured; + + int completedUpstreamTasks; + + public VertexManagerWithConcurrentInput(VertexManagerPluginContext context) { +super(context); + } + + @Override + public void initialize() { +if (getContext().getUserPayload() == null) { + throw new TezUncheckedException("user payload cannot be null for VertexManagerWithConcurrentInput"); +} +managedTasks = getContext().getVertexNumTasks(getContext().getVertexName()); +Map edges = getContext().getInputVertexEdgeProperties(); +for (Map.Entry entry : edges.entrySet()) { + if (!CONCURRENT.equals(entry.getValue().getSchedulingType())){ --- End diff -- ```suggestion if (!CONCURRENT.equals(entry.getValue().getSchedulingType())) { ``` > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > >
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657365#comment-16657365 ] ASF GitHub Bot commented on TEZ-3998: - Github user jteagles commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r226768726 --- Diff: tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java --- @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.library.vertexmanager; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.VertexManagerPlugin; +import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.VertexManagerPluginDescriptor; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; +import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT; + +public class VertexManagerWithConcurrentInput extends VertexManagerPlugin{ + private static final Logger LOG = LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class); + + private final Map srcVerticesConfigured = Maps.newConcurrentMap(); + private int managedTasks; + private boolean tasksScheduled = false; + private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false); + private Configuration vertexConfig; + private String vertexName; + private ConcurrentEdgeTriggerType edgeTriggerType; + private boolean allSrcVerticesConfigured; + + int completedUpstreamTasks; + + public VertexManagerWithConcurrentInput(VertexManagerPluginContext context) { +super(context); + } + + @Override + public void initialize() { +if (getContext().getUserPayload() == null) { + throw new TezUncheckedException("user payload cannot be null for VertexManagerWithConcurrentInput"); +} +managedTasks = getContext().getVertexNumTasks(getContext().getVertexName()); +Map edges = getContext().getInputVertexEdgeProperties(); +for (Map.Entry entry : edges.entrySet()) { + if (!CONCURRENT.equals(entry.getValue().getSchedulingType())){ +throw new TezUncheckedException("All input edges to vertex " + vertexName + +" must be CONCURRENT."); + } + String srcVertex = entry.getKey(); + srcVerticesConfigured.put(srcVertex, false); + getContext().registerForVertexStateUpdates(srcVertex, EnumSet.of(VertexState.CONFIGURED)); +} + +try { + vertexConfig = TezUtils.createConfFromUserPayload(getContext().getUserPayload()); +} catch (IOException e) { + throw new TezUncheckedException(e); +} +edgeTriggerType = ConcurrentEdgeTriggerType.valueOf( +vertexConfig.get(TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE, +TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT)); +if
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657363#comment-16657363 ] ASF GitHub Bot commented on TEZ-3998: - Github user jteagles commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r226768963 --- Diff: tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java --- @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.library.vertexmanager; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.VertexManagerPlugin; +import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.VertexManagerPluginDescriptor; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; +import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT; + +public class VertexManagerWithConcurrentInput extends VertexManagerPlugin{ + private static final Logger LOG = LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class); + + private final Map srcVerticesConfigured = Maps.newConcurrentMap(); + private int managedTasks; + private boolean tasksScheduled = false; + private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false); + private Configuration vertexConfig; + private String vertexName; + private ConcurrentEdgeTriggerType edgeTriggerType; + private boolean allSrcVerticesConfigured; + + int completedUpstreamTasks; + + public VertexManagerWithConcurrentInput(VertexManagerPluginContext context) { +super(context); + } + + @Override + public void initialize() { +if (getContext().getUserPayload() == null) { + throw new TezUncheckedException("user payload cannot be null for VertexManagerWithConcurrentInput"); +} +managedTasks = getContext().getVertexNumTasks(getContext().getVertexName()); +Map edges = getContext().getInputVertexEdgeProperties(); +for (Map.Entry entry : edges.entrySet()) { + if (!CONCURRENT.equals(entry.getValue().getSchedulingType())){ +throw new TezUncheckedException("All input edges to vertex " + vertexName + +" must be CONCURRENT."); + } + String srcVertex = entry.getKey(); + srcVerticesConfigured.put(srcVertex, false); + getContext().registerForVertexStateUpdates(srcVertex, EnumSet.of(VertexState.CONFIGURED)); +} + +try { + vertexConfig = TezUtils.createConfFromUserPayload(getContext().getUserPayload()); +} catch (IOException e) { + throw new TezUncheckedException(e); +} +edgeTriggerType = ConcurrentEdgeTriggerType.valueOf( +vertexConfig.get(TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE, +TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT)); +if
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657364#comment-16657364 ] ASF GitHub Bot commented on TEZ-3998: - Github user jteagles commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r226706087 --- Diff: tez-api/src/main/java/org/apache/tez/dag/api/DAG.java --- @@ -79,16 +79,16 @@ import com.google.common.collect.Sets; /** - * Top level entity that defines the DAG (Directed Acyclic Graph) representing --- End diff -- Usually for formatting changes we submit a separate change to address the whitespace changes to keep the logic changes separate to ease the review. Many editors default to automatically fix trailing whitespace on save and hadoop developers tend to disable this feature. Github allows me to hide whitespace changes so this is not much of a big deal. > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated() interface of > VextexMangerPlugin to collect relevant information to assist the scheduling > decision, and *there is no additional API change necessary*. However, in more > subtle case such as the parameter-server example described in Fig. 1, other > scheduling type would be more relevant, therefore the placeholder for > *ConcurrentSchedulingType* will be introduced in this change as part of the > infrastructure work. > > Finally, since we assume that all communications between two vertices > connected via CONCURRENT edge are handled by application runtime, a > CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all > DME/VME handling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656728#comment-16656728 ] ASF GitHub Bot commented on TEZ-3998: - Github user yingdachen commented on the issue: https://github.com/apache/tez/pull/33 @jteagles thanks for your input. Regarding the implication of RM(YARN) supporting gang scheduling, it largely depends on how application/runtime is implemented, and may not be an absolute necessity. While it would be best if strong gang-scheduling can be guaranteed by RM, but even when without, AM can still assist in several ways: 1. AM can schedule all tasks in upstream and downstream vertices simultaneously, as is implemented in this change 2. AM can gather information about downstream/upstream tasks and inform dependent tasks to put things into action only when all dependencies are satisfied. This will require mechanisms such as waiting/timeout/retry to be implemented in application runtime, but that would be something needed anyways, even with RM gang-scheduling. Some of the changes along this line would be covered by TEZ-4000. > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated() interface of > VextexMangerPlugin to collect relevant information to assist the scheduling > decision, and *there is no additional API change necessary*. However, in more > subtle case such as the parameter-server example described in Fig. 1, other > scheduling type would be more relevant, therefore the placeholder for > *ConcurrentSchedulingType* will be introduced in this change as part of the > infrastructure work. > > Finally, since we assume that all communications between two vertices > connected via CONCURRENT edge are handled by application runtime, a > CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all > DME/VME handling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16653996#comment-16653996 ] Jonathan Eagles commented on TEZ-3998: -- I'm starting to look at this design. I'll need a few more days to read the design and give feedback. > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated() interface of > VextexMangerPlugin to collect relevant information to assist the scheduling > decision, and *there is no additional API change necessary*. However, in more > subtle case such as the parameter-server example described in Fig. 1, other > scheduling type would be more relevant, therefore the placeholder for > *ConcurrentSchedulingType* will be introduced in this change as part of the > infrastructure work. > > Finally, since we assume that all communications between two vertices > connected via CONCURRENT edge are handled by application runtime, a > CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all > DME/VME handling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16652766#comment-16652766 ] ASF GitHub Bot commented on TEZ-3998: - Github user yingdachen commented on the issue: https://github.com/apache/tez/pull/33 > @yingdachen since this is part of a larger end-to-end feature (Tez-3997) would it be better for you to own a feature branch, in which you commit for the subtasks, and later, when the feature is complete, merge into master? > It is easier to provide meaningful feedback when seeing how it all comes together - right now, most of the changes are stubs. @anicoara thanks for the feedback. couple of things 1. The design doc in 3997 was meant to provide the big picture and as the place to collect feedback for the overall design. We believe suitable level of details about proposed changes have been provided, and it would be great hear from community on the design. 2. The feature was broken into 4 tasks carefully that range from basic support and plugin addition(3998), api change in AM component (3999), change in runtime (4000) and more complete scenario support (4001). We believe such break-down is suitable for us as new contributors to fit in, beginning with a simple change discussed here (3998) that is standalone in itself. We are hoping that our changes will take a gradual path to completion, which would not necessitate a separate feature branch. 3. In the change introduced here, the SilentEdgeManager is indeed a stub, that is by design and would not likely to change. However, I would not categorize the other change (such as the new VertexManagerWithConcurrentInput) as so, since it is functional as it is and we have added UT coverage to cover that as well. > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated() interface of > VextexMangerPlugin to collect relevant information to assist the scheduling > decision, and *there is no additional API change necessary*. However, in more > subtle case such as the parameter-server example described in Fig. 1,
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16652201#comment-16652201 ] ASF GitHub Bot commented on TEZ-3998: - Github user anicoara commented on the issue: https://github.com/apache/tez/pull/33 @yingdachen since this is part of a larger end-to-end feature (Tez-3997) would it be better for you to own a feature branch, in which you commit for the subtasks, and later, when the feature is complete, merge into master? It is easier to provide meaningful feedback when seeing how it all comes together - right now, most of the changes are stubs. > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated() interface of > VextexMangerPlugin to collect relevant information to assist the scheduling > decision, and *there is no additional API change necessary*. However, in more > subtle case such as the parameter-server example described in Fig. 1, other > scheduling type would be more relevant, therefore the placeholder for > *ConcurrentSchedulingType* will be introduced in this change as part of the > infrastructure work. > > Finally, since we assume that all communications between two vertices > connected via CONCURRENT edge are handled by application runtime, a > CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all > DME/VME handling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651706#comment-16651706 ] Kuhu Shukla commented on TEZ-3998: -- I plan to review this in the next few days. Would be nice to get some comments from [~gopalv] , [~jlowe] and [~jeagles] among others. > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated() interface of > VextexMangerPlugin to collect relevant information to assist the scheduling > decision, and *there is no additional API change necessary*. However, in more > subtle case such as the parameter-server example described in Fig. 1, other > scheduling type would be more relevant, therefore the placeholder for > *ConcurrentSchedulingType* will be introduced in this change as part of the > infrastructure work. > > Finally, since we assume that all communications between two vertices > connected via CONCURRENT edge are handled by application runtime, a > CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all > DME/VME handling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType
[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16649687#comment-16649687 ] ASF GitHub Bot commented on TEZ-3998: - GitHub user yingdachen opened a pull request: https://github.com/apache/tez/pull/33 TEZ-3998: support constructing DAG with concurrent edge 1. unblock the construction of DAG with concurrent edge 2. introduce VerteManagerWithConcurrentInput with SilentEdgeManager 3. introcuce the concept of ConcurrentEdgeTriggerType 4. UT Change-Id: I4202e2946b99c440d7d7923151588632d74121cc You can merge this pull request into a Git repository by running: $ git pull https://github.com/yingdachen/tez 3998 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/tez/pull/33.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #33 commit 5c7557140d816d5c1ca3ae89484c62c62391f856 Author: Yingda CHEN Date: 2018-10-15T03:28:37Z TEZ-3998: support constructing DAG with concurrent edge 1. unblock the construction of DAG with concurrent edge 2. introduce VerteManagerWithConcurrentInput with SilentEdgeManager 3. introcuce the concept of ConcurrentEdgeTriggerType 4. UT Change-Id: I4202e2946b99c440d7d7923151588632d74121cc > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > - > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task >Reporter: Yingda Chen >Assignee: Yingda Chen >Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated() interface of > VextexMangerPlugin to collect relevant information to assist the scheduling > decision, and *there is no additional API change necessary*. However, in more > subtle case such as the parameter-server example described in Fig. 1, other > scheduling type would be more relevant, therefore the placeholder for > *ConcurrentSchedulingType* will be introduced in this change as part of the > infrastructure work. > > Finally, since we assume that all communications between two vertices > connected via CONCURRENT edge are handled by application runtime, a > CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all > DME/VME