[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-20 Thread Jonathan Eagles (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693862#comment-16693862
 ] 

Jonathan Eagles commented on TEZ-3998:
--

[~yingdachen], Thank you for your continued patience. I have run test-patch on 
my desktop locally to produce Tez QA results and have found that apart from the 
expected checkstyle (line lengths > 80 characters). Tests Suite passes and 
static analysis also passed.

+1. I will check this into master branch.

> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
> Attachments: TEZ-3998.001.patch.diff
>
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> scheduled together with (all the tasks) in the upstream vertex. In this case, 
> we can leverage the existing onVertexStateUpdated() interface of 
> VextexMangerPlugin to collect relevant information to assist the scheduling 
> decision, and *there is no additional API change necessary*. However, in more 
> subtle case such as the parameter-server example described in Fig. 1, other 
> scheduling type would be more relevant, therefore the placeholder for 
> *ConcurrentSchedulingType* will be introduced in this change as part of the 
> infrastructure work.
>  
> Finally, since we assume that all communications between two vertices 
> connected via CONCURRENT edge are handled by application runtime, a 
> CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all 
> DME/VME handling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-20 Thread Yingda Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693642#comment-16693642
 ] 

Yingda Chen commented on TEZ-3998:
--

[~jeagles] kindly ping... I was hoping this can go in before the thanksgiving 
break...it is taking a bit too long and we have long pipeline of changes that 
we are hoping to contributing to Tez community.

 

Thanks

> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
> Attachments: TEZ-3998.001.patch.diff
>
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> scheduled together with (all the tasks) in the upstream vertex. In this case, 
> we can leverage the existing onVertexStateUpdated() interface of 
> VextexMangerPlugin to collect relevant information to assist the scheduling 
> decision, and *there is no additional API change necessary*. However, in more 
> subtle case such as the parameter-server example described in Fig. 1, other 
> scheduling type would be more relevant, therefore the placeholder for 
> *ConcurrentSchedulingType* will be introduced in this change as part of the 
> infrastructure work.
>  
> Finally, since we assume that all communications between two vertices 
> connected via CONCURRENT edge are handled by application runtime, a 
> CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all 
> DME/VME handling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-16 Thread Yingda Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689930#comment-16689930
 ] 

Yingda Chen commented on TEZ-3998:
--

[~jeagles] could you please comment on the test result and how to proceed, I 
looked at the build errors and they do not seem to be related to my changes... 

 

First time submitting a change to Tez, could you please advise on how to 
proceed with this?

 

Thanks,

-Yingda

> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
> Attachments: TEZ-3998.001.patch.diff
>
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> scheduled together with (all the tasks) in the upstream vertex. In this case, 
> we can leverage the existing onVertexStateUpdated() interface of 
> VextexMangerPlugin to collect relevant information to assist the scheduling 
> decision, and *there is no additional API change necessary*. However, in more 
> subtle case such as the parameter-server example described in Fig. 1, other 
> scheduling type would be more relevant, therefore the placeholder for 
> *ConcurrentSchedulingType* will be introduced in this change as part of the 
> infrastructure work.
>  
> Finally, since we assume that all communications between two vertices 
> connected via CONCURRENT edge are handled by application runtime, a 
> CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all 
> DME/VME handling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-15 Thread TezQA (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688866#comment-16688866
 ] 

TezQA commented on TEZ-3998:


| (x) *{color:red}-1 overall{color}* |
\\
\\
|| Vote || Subsystem || Runtime || Comment ||
| {color:blue}0{color} | {color:blue} reexec {color} | {color:blue} 16m  
6s{color} | {color:blue} Docker mode activated. {color} |
| {color:blue}0{color} | {color:blue} patch {color} | {color:blue}  0m  
3s{color} | {color:blue} The patch file was not named according to tez's naming 
conventions. Please see 
https://cwiki.apache.org/confluence/display/TEZ/How+to+Contribute+to+Tez for 
instructions. {color} |
|| || || || {color:brown} Prechecks {color} ||
| {color:green}+1{color} | {color:green} @author {color} | {color:green}  0m  
0s{color} | {color:green} The patch does not contain any @author tags. {color} |
| {color:green}+1{color} | {color:green} test4tests {color} | {color:green}  0m 
 0s{color} | {color:green} The patch appears to include 2 new or modified test 
files. {color} |
|| || || || {color:brown} master Compile Tests {color} ||
| {color:blue}0{color} | {color:blue} mvndep {color} | {color:blue}  0m 
24s{color} | {color:blue} Maven dependency ordering for branch {color} |
| {color:green}+1{color} | {color:green} mvninstall {color} | {color:green}  4m 
16s{color} | {color:green} master passed {color} |
| {color:green}+1{color} | {color:green} compile {color} | {color:green}  1m 
16s{color} | {color:green} master passed {color} |
| {color:green}+1{color} | {color:green} checkstyle {color} | {color:green}  1m 
26s{color} | {color:green} master passed {color} |
| {color:green}+1{color} | {color:green} findbugs {color} | {color:green}  2m 
34s{color} | {color:green} master passed {color} |
| {color:green}+1{color} | {color:green} javadoc {color} | {color:green}  1m 
23s{color} | {color:green} master passed {color} |
|| || || || {color:brown} Patch Compile Tests {color} ||
| {color:blue}0{color} | {color:blue} mvndep {color} | {color:blue}  0m 
15s{color} | {color:blue} Maven dependency ordering for patch {color} |
| {color:green}+1{color} | {color:green} mvninstall {color} | {color:green}  1m 
17s{color} | {color:green} the patch passed {color} |
| {color:green}+1{color} | {color:green} compile {color} | {color:green}  1m 
14s{color} | {color:green} the patch passed {color} |
| {color:green}+1{color} | {color:green} javac {color} | {color:green}  1m 
14s{color} | {color:green} the patch passed {color} |
| {color:orange}-0{color} | {color:orange} checkstyle {color} | {color:orange}  
0m 30s{color} | {color:orange} tez-api: The patch generated 57 new + 989 
unchanged - 166 fixed = 1046 total (was 1155) {color} |
| {color:orange}-0{color} | {color:orange} checkstyle {color} | {color:orange}  
0m 18s{color} | {color:orange} tez-runtime-library: The patch generated 94 new 
+ 0 unchanged - 0 fixed = 94 total (was 0) {color} |
| {color:orange}-0{color} | {color:orange} checkstyle {color} | {color:orange}  
0m 30s{color} | {color:orange} tez-dag: The patch generated 7 new + 661 
unchanged - 20 fixed = 668 total (was 681) {color} |
| {color:green}+1{color} | {color:green} whitespace {color} | {color:green}  0m 
 0s{color} | {color:green} The patch has no whitespace issues. {color} |
| {color:green}+1{color} | {color:green} findbugs {color} | {color:green}  2m 
50s{color} | {color:green} the patch passed {color} |
| {color:green}+1{color} | {color:green} javadoc {color} | {color:green}  1m 
32s{color} | {color:green} the patch passed {color} |
|| || || || {color:brown} Other Tests {color} ||
| {color:red}-1{color} | {color:red} unit {color} | {color:red}  0m 27s{color} 
| {color:red} tez-api in the patch failed. {color} |
| {color:red}-1{color} | {color:red} unit {color} | {color:red}  0m 36s{color} 
| {color:red} tez-runtime-library in the patch failed. {color} |
| {color:red}-1{color} | {color:red} unit {color} | {color:red}  0m 42s{color} 
| {color:red} tez-dag in the patch failed. {color} |
| {color:green}+1{color} | {color:green} asflicense {color} | {color:green}  0m 
45s{color} | {color:green} The patch does not generate ASF License warnings. 
{color} |
| {color:black}{color} | {color:black} {color} | {color:black} 39m 50s{color} | 
{color:black} {color} |
\\
\\
|| Subsystem || Report/Notes ||
| Docker | Client=17.05.0-ce Server=17.05.0-ce Image:yetus/tez:d4a62de |
| JIRA Issue | TEZ-3998 |
| JIRA Patch URL | 
https://issues.apache.org/jira/secure/attachment/12948416/TEZ-3998.001.patch.diff
 |
| Optional Tests |  dupname  asflicense  javac  javadoc  unit  findbugs  
checkstyle  compile  |
| uname | Linux a65201b8721f 3.13.0-153-generic #203-Ubuntu SMP Thu Jun 14 
08:52:28 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux |
| Build tool | maven |
| Personality | /testptch/patchprocess/precommit/personality/provided.sh |
| git revision | master / efc7331 |
| maven | version: Apache Maven 3.3.9 |
| Default 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-15 Thread Jonathan Eagles (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688771#comment-16688771
 ] 

Jonathan Eagles commented on TEZ-3998:
--

[~yingdachen], we have not successfully integrated github with Tez at this 
point. Please 1)download the patch from the pull request 
https://github.com/apache/tez/pull/33.diff 2) upload the diff file as 
TEZ-3998.001.patch, and 3) look at the corresponding Tez QA results. The 
results will have to be filtered since the checkstyle in tez is not correct 
yet, but it will be the next step.

> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> scheduled together with (all the tasks) in the upstream vertex. In this case, 
> we can leverage the existing onVertexStateUpdated() interface of 
> VextexMangerPlugin to collect relevant information to assist the scheduling 
> decision, and *there is no additional API change necessary*. However, in more 
> subtle case such as the parameter-server example described in Fig. 1, other 
> scheduling type would be more relevant, therefore the placeholder for 
> *ConcurrentSchedulingType* will be introduced in this change as part of the 
> infrastructure work.
>  
> Finally, since we assume that all communications between two vertices 
> connected via CONCURRENT edge are handled by application runtime, a 
> CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all 
> DME/VME handling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688546#comment-16688546
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user yingdachen commented on the issue:

https://github.com/apache/tez/pull/33
  
@jteagles have you got any chance to proceed? thanks,


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> scheduled together with (all the tasks) in the upstream vertex. In this case, 
> we can leverage the existing onVertexStateUpdated() interface of 
> VextexMangerPlugin to collect relevant information to assist the scheduling 
> decision, and *there is no additional API change necessary*. However, in more 
> subtle case such as the parameter-server example described in Fig. 1, other 
> scheduling type would be more relevant, therefore the placeholder for 
> *ConcurrentSchedulingType* will be introduced in this change as part of the 
> infrastructure work.
>  
> Finally, since we assume that all communications between two vertices 
> connected via CONCURRENT edge are handled by application runtime, a 
> CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all 
> DME/VME handling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684583#comment-16684583
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user beltran commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r232864378
  
--- Diff: 
tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java
 ---
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.vertexmanager;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT;
+
+public class VertexManagerWithConcurrentInput extends VertexManagerPlugin {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class);
+
+  private final Map srcVerticesConfigured = 
Maps.newConcurrentMap();
+  private int managedTasks;
+  private boolean tasksScheduled = false;
+  private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
+  private Configuration vertexConfig;
+  private String vertexName;
+  private ConcurrentEdgeTriggerType edgeTriggerType;
+  private boolean allSrcVerticesConfigured;
+
+  int completedUpstreamTasks;
+
+  public VertexManagerWithConcurrentInput(VertexManagerPluginContext 
context) {
+super(context);
+  }
+
+  @Override
+  public void initialize() {
+if (getContext().getUserPayload() == null) {
+  throw new TezUncheckedException("user payload cannot be null for 
VertexManagerWithConcurrentInput");
+}
+managedTasks = 
getContext().getVertexNumTasks(getContext().getVertexName());
+Map edges = 
getContext().getInputVertexEdgeProperties();
+for (Map.Entry entry : edges.entrySet()) {
+  if (!CONCURRENT.equals(entry.getValue().getSchedulingType())) {
+throw new TezUncheckedException("All input edges to vertex " + 
vertexName +
+"  must be CONCURRENT.");
+  }
+  String srcVertex = entry.getKey();
+  srcVerticesConfigured.put(srcVertex, false);
+  getContext().registerForVertexStateUpdates(srcVertex, 
EnumSet.of(VertexState.CONFIGURED));
+}
+
+try {
+  vertexConfig = 
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+} catch (IOException e) {
+  throw new TezUncheckedException(e);
+}
+edgeTriggerType = ConcurrentEdgeTriggerType.valueOf(
+vertexConfig.get(TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE,
+TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT));
+if 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684479#comment-16684479
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user yingdachen commented on the issue:

https://github.com/apache/tez/pull/33
  
thanks @jteagles . let me know if there is anything else to address.
 we will begin working on TEZ-3999 once this is in.


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> scheduled together with (all the tasks) in the upstream vertex. In this case, 
> we can leverage the existing onVertexStateUpdated() interface of 
> VextexMangerPlugin to collect relevant information to assist the scheduling 
> decision, and *there is no additional API change necessary*. However, in more 
> subtle case such as the parameter-server example described in Fig. 1, other 
> scheduling type would be more relevant, therefore the placeholder for 
> *ConcurrentSchedulingType* will be introduced in this change as part of the 
> infrastructure work.
>  
> Finally, since we assume that all communications between two vertices 
> connected via CONCURRENT edge are handled by application runtime, a 
> CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all 
> DME/VME handling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684477#comment-16684477
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user jteagles commented on the issue:

https://github.com/apache/tez/pull/33
  
@yingdachen, I am happy with the changes. I will like to get a test-patch 
run to verify with static analysis. Easiest way I know is to upload patch to 
the jira and submit. There will be many false positives since a Tez specific 
checkstyle confinguration has not been created post yetus migration. Will try 
to get to that tomorrow. Once we have test-patch run (Tez QA) and @beltran has 
given his final approval, we can move to checkin.


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> scheduled together with (all the tasks) in the upstream vertex. In this case, 
> we can leverage the existing onVertexStateUpdated() interface of 
> VextexMangerPlugin to collect relevant information to assist the scheduling 
> decision, and *there is no additional API change necessary*. However, in more 
> subtle case such as the parameter-server example described in Fig. 1, other 
> scheduling type would be more relevant, therefore the placeholder for 
> *ConcurrentSchedulingType* will be introduced in this change as part of the 
> infrastructure work.
>  
> Finally, since we assume that all communications between two vertices 
> connected via CONCURRENT edge are handled by application runtime, a 
> CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all 
> DME/VME handling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16681727#comment-16681727
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user yingdachen commented on the issue:

https://github.com/apache/tez/pull/33
  
@jteagles @beltran latest patch has addressed all previous code review 
comments, please take a look at your convenience. thanks




> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> scheduled together with (all the tasks) in the upstream vertex. In this case, 
> we can leverage the existing onVertexStateUpdated() interface of 
> VextexMangerPlugin to collect relevant information to assist the scheduling 
> decision, and *there is no additional API change necessary*. However, in more 
> subtle case such as the parameter-server example described in Fig. 1, other 
> scheduling type would be more relevant, therefore the placeholder for 
> *ConcurrentSchedulingType* will be introduced in this change as part of the 
> infrastructure work.
>  
> Finally, since we assume that all communications between two vertices 
> connected via CONCURRENT edge are handled by application runtime, a 
> CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all 
> DME/VME handling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680671#comment-16680671
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user yingdachen commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r232112600
  
--- Diff: 
tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java
 ---
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.vertexmanager;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT;
+
+public class VertexManagerWithConcurrentInput extends VertexManagerPlugin {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class);
+
+  private final Map srcVerticesConfigured = 
Maps.newConcurrentMap();
+  private int managedTasks;
+  private boolean tasksScheduled = false;
+  private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
+  private Configuration vertexConfig;
+  private String vertexName;
+  private ConcurrentEdgeTriggerType edgeTriggerType;
+  private boolean allSrcVerticesConfigured;
+
+  int completedUpstreamTasks;
+
+  public VertexManagerWithConcurrentInput(VertexManagerPluginContext 
context) {
+super(context);
+  }
+
+  @Override
+  public void initialize() {
+if (getContext().getUserPayload() == null) {
+  throw new TezUncheckedException("user payload cannot be null for 
VertexManagerWithConcurrentInput");
+}
+managedTasks = 
getContext().getVertexNumTasks(getContext().getVertexName());
+Map edges = 
getContext().getInputVertexEdgeProperties();
+for (Map.Entry entry : edges.entrySet()) {
+  if (!CONCURRENT.equals(entry.getValue().getSchedulingType())) {
+throw new TezUncheckedException("All input edges to vertex " + 
vertexName +
+"  must be CONCURRENT.");
+  }
+  String srcVertex = entry.getKey();
+  srcVerticesConfigured.put(srcVertex, false);
+  getContext().registerForVertexStateUpdates(srcVertex, 
EnumSet.of(VertexState.CONFIGURED));
+}
+
+try {
+  vertexConfig = 
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+} catch (IOException e) {
+  throw new TezUncheckedException(e);
+}
+edgeTriggerType = ConcurrentEdgeTriggerType.valueOf(
+vertexConfig.get(TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE,
+TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT));
+if 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680635#comment-16680635
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user beltran commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r232105351
  
--- Diff: 
tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java
 ---
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.vertexmanager;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT;
+
+public class VertexManagerWithConcurrentInput extends VertexManagerPlugin {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class);
+
+  private final Map srcVerticesConfigured = 
Maps.newConcurrentMap();
+  private int managedTasks;
+  private boolean tasksScheduled = false;
+  private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
+  private Configuration vertexConfig;
+  private String vertexName;
+  private ConcurrentEdgeTriggerType edgeTriggerType;
+  private boolean allSrcVerticesConfigured;
+
+  int completedUpstreamTasks;
+
+  public VertexManagerWithConcurrentInput(VertexManagerPluginContext 
context) {
+super(context);
+  }
+
+  @Override
+  public void initialize() {
+if (getContext().getUserPayload() == null) {
+  throw new TezUncheckedException("user payload cannot be null for 
VertexManagerWithConcurrentInput");
+}
+managedTasks = 
getContext().getVertexNumTasks(getContext().getVertexName());
+Map edges = 
getContext().getInputVertexEdgeProperties();
+for (Map.Entry entry : edges.entrySet()) {
+  if (!CONCURRENT.equals(entry.getValue().getSchedulingType())) {
+throw new TezUncheckedException("All input edges to vertex " + 
vertexName +
+"  must be CONCURRENT.");
+  }
+  String srcVertex = entry.getKey();
+  srcVerticesConfigured.put(srcVertex, false);
+  getContext().registerForVertexStateUpdates(srcVertex, 
EnumSet.of(VertexState.CONFIGURED));
+}
+
+try {
+  vertexConfig = 
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+} catch (IOException e) {
+  throw new TezUncheckedException(e);
+}
+edgeTriggerType = ConcurrentEdgeTriggerType.valueOf(
+vertexConfig.get(TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE,
+TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT));
+if 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680631#comment-16680631
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user beltran commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r232104534
  
--- Diff: tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---
@@ -2440,23 +2453,30 @@ public void run() {
 }
   }
 
-  private void startDAG() throws IOException, TezException {
+  private boolean hasConcurrentEdge(DAGPlan dagPlan) {
+boolean hasConcurrentEdge = false;
+for (DAGProtos.EdgePlan edge : dagPlan.getEdgeList()) {
+  if 
(DAGProtos.PlanEdgeSchedulingType.CONCURRENT.equals(edge.getSchedulingType())) {
+return true;
+  }
+}
+return hasConcurrentEdge;
+  }
+
+  private DAGPlan readDAGPlanFile() throws IOException, TezException {
 FileInputStream dagPBBinaryStream = null;
+DAGPlan dagPlan = null;
 try {
-  DAGPlan dagPlan = null;
-
   // Read the protobuf DAG
   dagPBBinaryStream = new FileInputStream(new File(workingDirectory,
   TezConstants.TEZ_PB_PLAN_BINARY_NAME));
   dagPlan = DAGPlan.parseFrom(dagPBBinaryStream);
-
-  startDAG(dagPlan, null);
--- End diff --

OK I see, got confused with what was happening before.


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> scheduled together with (all the tasks) in the upstream vertex. In this case, 
> we can leverage the existing onVertexStateUpdated() interface of 
> VextexMangerPlugin to collect relevant information to assist the scheduling 
> decision, and *there is no additional API change necessary*. However, in more 
> subtle case such as the parameter-server example described in Fig. 1, other 
> scheduling type would be more relevant, therefore the placeholder for 
> *ConcurrentSchedulingType* will be introduced in this change as part of the 
> infrastructure work.
>  
> Finally, since we assume that all communications between two vertices 
> connected via CONCURRENT edge are handled by application runtime, a 
> CONCURRENT edge will be 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680624#comment-16680624
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user yingdachen commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r232102507
  
--- Diff: tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---
@@ -2440,23 +2453,30 @@ public void run() {
 }
   }
 
-  private void startDAG() throws IOException, TezException {
+  private boolean hasConcurrentEdge(DAGPlan dagPlan) {
+boolean hasConcurrentEdge = false;
+for (DAGProtos.EdgePlan edge : dagPlan.getEdgeList()) {
+  if 
(DAGProtos.PlanEdgeSchedulingType.CONCURRENT.equals(edge.getSchedulingType())) {
+return true;
+  }
+}
+return hasConcurrentEdge;
+  }
+
+  private DAGPlan readDAGPlanFile() throws IOException, TezException {
 FileInputStream dagPBBinaryStream = null;
+DAGPlan dagPlan = null;
 try {
-  DAGPlan dagPlan = null;
-
   // Read the protobuf DAG
   dagPBBinaryStream = new FileInputStream(new File(workingDirectory,
   TezConstants.TEZ_PB_PLAN_BINARY_NAME));
   dagPlan = DAGPlan.parseFrom(dagPBBinaryStream);
-
-  startDAG(dagPlan, null);
--- End diff --

1993 is the line for if (recoveredDAGData != null) , when inside that 
branch startDAG would not be called, it would only be called in the 
corresponding "else" branch


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> scheduled together with (all the tasks) in the upstream vertex. In this case, 
> we can leverage the existing onVertexStateUpdated() interface of 
> VextexMangerPlugin to collect relevant information to assist the scheduling 
> decision, and *there is no additional API change necessary*. However, in more 
> subtle case such as the parameter-server example described in Fig. 1, other 
> scheduling type would be more relevant, therefore the placeholder for 
> *ConcurrentSchedulingType* will be introduced in this change as part of the 
> infrastructure work.
>  
> Finally, since we assume that all communications between 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680625#comment-16680625
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user yingdachen commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r232103086
  
--- Diff: 
tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java
 ---
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.vertexmanager;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT;
+
+public class VertexManagerWithConcurrentInput extends VertexManagerPlugin {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class);
+
+  private final Map srcVerticesConfigured = 
Maps.newConcurrentMap();
+  private int managedTasks;
+  private boolean tasksScheduled = false;
+  private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
+  private Configuration vertexConfig;
+  private String vertexName;
+  private ConcurrentEdgeTriggerType edgeTriggerType;
+  private boolean allSrcVerticesConfigured;
+
+  int completedUpstreamTasks;
+
+  public VertexManagerWithConcurrentInput(VertexManagerPluginContext 
context) {
+super(context);
+  }
+
+  @Override
+  public void initialize() {
+if (getContext().getUserPayload() == null) {
+  throw new TezUncheckedException("user payload cannot be null for 
VertexManagerWithConcurrentInput");
+}
+managedTasks = 
getContext().getVertexNumTasks(getContext().getVertexName());
+Map edges = 
getContext().getInputVertexEdgeProperties();
+for (Map.Entry entry : edges.entrySet()) {
+  if (!CONCURRENT.equals(entry.getValue().getSchedulingType())) {
+throw new TezUncheckedException("All input edges to vertex " + 
vertexName +
+"  must be CONCURRENT.");
+  }
+  String srcVertex = entry.getKey();
+  srcVerticesConfigured.put(srcVertex, false);
+  getContext().registerForVertexStateUpdates(srcVertex, 
EnumSet.of(VertexState.CONFIGURED));
+}
+
+try {
+  vertexConfig = 
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+} catch (IOException e) {
+  throw new TezUncheckedException(e);
+}
+edgeTriggerType = ConcurrentEdgeTriggerType.valueOf(
+vertexConfig.get(TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE,
+TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT));
+if 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680607#comment-16680607
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user beltran commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r232100045
  
--- Diff: tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---
@@ -2440,23 +2453,30 @@ public void run() {
 }
   }
 
-  private void startDAG() throws IOException, TezException {
+  private boolean hasConcurrentEdge(DAGPlan dagPlan) {
+boolean hasConcurrentEdge = false;
+for (DAGProtos.EdgePlan edge : dagPlan.getEdgeList()) {
+  if 
(DAGProtos.PlanEdgeSchedulingType.CONCURRENT.equals(edge.getSchedulingType())) {
+return true;
+  }
+}
+return hasConcurrentEdge;
+  }
+
+  private DAGPlan readDAGPlanFile() throws IOException, TezException {
 FileInputStream dagPBBinaryStream = null;
+DAGPlan dagPlan = null;
 try {
-  DAGPlan dagPlan = null;
-
   // Read the protobuf DAG
   dagPBBinaryStream = new FileInputStream(new File(workingDirectory,
   TezConstants.TEZ_PB_PLAN_BINARY_NAME));
   dagPlan = DAGPlan.parseFrom(dagPBBinaryStream);
-
-  startDAG(dagPlan, null);
--- End diff --

Oh yes, just saw there are two lines like that. I was referring to the one 
in 1993. So before when `startDAG` was called and then got into 1993 it had no 
effect?


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> scheduled together with (all the tasks) in the upstream vertex. In this case, 
> we can leverage the existing onVertexStateUpdated() interface of 
> VextexMangerPlugin to collect relevant information to assist the scheduling 
> decision, and *there is no additional API change necessary*. However, in more 
> subtle case such as the parameter-server example described in Fig. 1, other 
> scheduling type would be more relevant, therefore the placeholder for 
> *ConcurrentSchedulingType* will be introduced in this change as part of the 
> infrastructure work.
>  
> Finally, since we assume that all communications between two 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680606#comment-16680606
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user beltran commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r232101282
  
--- Diff: 
tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java
 ---
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.vertexmanager;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT;
+
+public class VertexManagerWithConcurrentInput extends VertexManagerPlugin {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class);
+
+  private final Map srcVerticesConfigured = 
Maps.newConcurrentMap();
+  private int managedTasks;
+  private boolean tasksScheduled = false;
+  private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
+  private Configuration vertexConfig;
+  private String vertexName;
+  private ConcurrentEdgeTriggerType edgeTriggerType;
+  private boolean allSrcVerticesConfigured;
+
+  int completedUpstreamTasks;
+
+  public VertexManagerWithConcurrentInput(VertexManagerPluginContext 
context) {
+super(context);
+  }
+
+  @Override
+  public void initialize() {
+if (getContext().getUserPayload() == null) {
+  throw new TezUncheckedException("user payload cannot be null for 
VertexManagerWithConcurrentInput");
+}
+managedTasks = 
getContext().getVertexNumTasks(getContext().getVertexName());
+Map edges = 
getContext().getInputVertexEdgeProperties();
+for (Map.Entry entry : edges.entrySet()) {
+  if (!CONCURRENT.equals(entry.getValue().getSchedulingType())) {
+throw new TezUncheckedException("All input edges to vertex " + 
vertexName +
+"  must be CONCURRENT.");
+  }
+  String srcVertex = entry.getKey();
+  srcVerticesConfigured.put(srcVertex, false);
+  getContext().registerForVertexStateUpdates(srcVertex, 
EnumSet.of(VertexState.CONFIGURED));
+}
+
+try {
+  vertexConfig = 
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+} catch (IOException e) {
+  throw new TezUncheckedException(e);
+}
+edgeTriggerType = ConcurrentEdgeTriggerType.valueOf(
+vertexConfig.get(TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE,
+TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT));
+if 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680532#comment-16680532
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user yingdachen commented on the issue:

https://github.com/apache/tez/pull/33
  
@beltran Thanks for your detailed review, I have addressed your review 
comments in the new iteration.

@beltran @jteagles can you take another look at your convenience. 

thanks,


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> scheduled together with (all the tasks) in the upstream vertex. In this case, 
> we can leverage the existing onVertexStateUpdated() interface of 
> VextexMangerPlugin to collect relevant information to assist the scheduling 
> decision, and *there is no additional API change necessary*. However, in more 
> subtle case such as the parameter-server example described in Fig. 1, other 
> scheduling type would be more relevant, therefore the placeholder for 
> *ConcurrentSchedulingType* will be introduced in this change as part of the 
> infrastructure work.
>  
> Finally, since we assume that all communications between two vertices 
> connected via CONCURRENT edge are handled by application runtime, a 
> CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all 
> DME/VME handling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680526#comment-16680526
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user yingdachen commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r232087761
  
--- Diff: tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---
@@ -2440,23 +2453,30 @@ public void run() {
 }
   }
 
-  private void startDAG() throws IOException, TezException {
+  private boolean hasConcurrentEdge(DAGPlan dagPlan) {
+boolean hasConcurrentEdge = false;
+for (DAGProtos.EdgePlan edge : dagPlan.getEdgeList()) {
+  if 
(DAGProtos.PlanEdgeSchedulingType.CONCURRENT.equals(edge.getSchedulingType())) {
+return true;
+  }
+}
+return hasConcurrentEdge;
+  }
+
+  private DAGPlan readDAGPlanFile() throws IOException, TezException {
 FileInputStream dagPBBinaryStream = null;
+DAGPlan dagPlan = null;
 try {
-  DAGPlan dagPlan = null;
-
   // Read the protobuf DAG
   dagPBBinaryStream = new FileInputStream(new File(workingDirectory,
   TezConstants.TEZ_PB_PLAN_BINARY_NAME));
   dagPlan = DAGPlan.parseFrom(dagPBBinaryStream);
-
-  startDAG(dagPlan, null);
--- End diff --

I am not sure whether you are referring to the recoveredDAGData != null on 
line 1993 or 1983.

For the one on 1993, it would not call startDAG anyway. For the one on 
1983, the logic added is only to forcefully invalid the recovery data that has 
been read, it does not impact the startDAG behavior either.

Overall, there are two modification here:
1.  the DAGPlan is read a bit earlier (in non-session mode)
2.  the already-read recovery content may be useless when there is 
concurrent edge in the DAG. This is done on purpose and I think should be 
acceptable since it does not modify existing logic flow and provide the 
safeguard for DAG execution with concurrent edge, before we have a proper 
failover implementation for that TEZ-4017


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> scheduled together with 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680502#comment-16680502
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user yingdachen commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r232084161
  
--- Diff: 
tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java
 ---
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.vertexmanager;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT;
+
+public class VertexManagerWithConcurrentInput extends VertexManagerPlugin {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class);
+
+  private final Map srcVerticesConfigured = 
Maps.newConcurrentMap();
+  private int managedTasks;
+  private boolean tasksScheduled = false;
+  private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
+  private Configuration vertexConfig;
+  private String vertexName;
+  private ConcurrentEdgeTriggerType edgeTriggerType;
+  private boolean allSrcVerticesConfigured;
+
+  int completedUpstreamTasks;
+
+  public VertexManagerWithConcurrentInput(VertexManagerPluginContext 
context) {
+super(context);
+  }
+
+  @Override
+  public void initialize() {
+if (getContext().getUserPayload() == null) {
+  throw new TezUncheckedException("user payload cannot be null for 
VertexManagerWithConcurrentInput");
+}
+managedTasks = 
getContext().getVertexNumTasks(getContext().getVertexName());
+Map edges = 
getContext().getInputVertexEdgeProperties();
+for (Map.Entry entry : edges.entrySet()) {
+  if (!CONCURRENT.equals(entry.getValue().getSchedulingType())) {
+throw new TezUncheckedException("All input edges to vertex " + 
vertexName +
+"  must be CONCURRENT.");
+  }
+  String srcVertex = entry.getKey();
+  srcVerticesConfigured.put(srcVertex, false);
+  getContext().registerForVertexStateUpdates(srcVertex, 
EnumSet.of(VertexState.CONFIGURED));
+}
+
+try {
+  vertexConfig = 
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+} catch (IOException e) {
+  throw new TezUncheckedException(e);
+}
+edgeTriggerType = ConcurrentEdgeTriggerType.valueOf(
+vertexConfig.get(TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE,
+TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT));
+if 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680497#comment-16680497
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user yingdachen commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r232083585
  
--- Diff: 
tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java
 ---
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.vertexmanager;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT;
+
+public class VertexManagerWithConcurrentInput extends VertexManagerPlugin {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class);
+
+  private final Map srcVerticesConfigured = 
Maps.newConcurrentMap();
+  private int managedTasks;
+  private boolean tasksScheduled = false;
+  private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
+  private Configuration vertexConfig;
+  private String vertexName;
+  private ConcurrentEdgeTriggerType edgeTriggerType;
+  private boolean allSrcVerticesConfigured;
--- End diff --

We intend to expand logic that manage completed/total upstream tasks in 
TEZ-3999.

volatile property added for allSrcVerticesConfigured


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680491#comment-16680491
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user yingdachen commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r232082409
  
--- Diff: 
tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---
@@ -408,6 +409,18 @@ public TezConfiguration(boolean loadDefaults) {
   + "launch.env";
   public static final String TEZ_AM_LAUNCH_ENV_DEFAULT = "";
 
+  /**
+   * String value. Describes the timing of scheduling downstream vertex 
tasks
+   * when the vertex has a concurrent input edge.
--- End diff --

I have expand the comment a bit in an effort to make it easier to 
understand. 

Not using the event name directly is due to the subtle difference between 
event and scheduling actually being kicked off: for example, in the case of 
"SOURCE_TASK_STARTED", the scheduling is not triggered by any TASK_STARTED 
event, but instead it will only be triggered after enough TASK_STARTED events 
have been collected (for example, all the upstream tasks are running). For the 
lack of a better choice, I think defining a TriggerType is easier to expand in 
the long run too.


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> scheduled together with (all the tasks) in the upstream vertex. In this case, 
> we can leverage the existing onVertexStateUpdated() interface of 
> VextexMangerPlugin to collect relevant information to assist the scheduling 
> decision, and *there is no additional API change necessary*. However, in more 
> subtle case such as the parameter-server example described in Fig. 1, other 
> scheduling type would be more relevant, therefore the placeholder for 
> *ConcurrentSchedulingType* will be introduced in this change as part of the 
> infrastructure work.
>  
> Finally, since we assume that all communications between two vertices 
> connected via CONCURRENT edge are handled by application runtime, a 
> CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all 
> DME/VME handling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680505#comment-16680505
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user yingdachen commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r232084381
  
--- Diff: 
tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java
 ---
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.vertexmanager;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT;
+
+public class VertexManagerWithConcurrentInput extends VertexManagerPlugin {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class);
+
+  private final Map srcVerticesConfigured = 
Maps.newConcurrentMap();
+  private int managedTasks;
+  private boolean tasksScheduled = false;
+  private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
+  private Configuration vertexConfig;
+  private String vertexName;
+  private ConcurrentEdgeTriggerType edgeTriggerType;
+  private boolean allSrcVerticesConfigured;
+
+  int completedUpstreamTasks;
+
+  public VertexManagerWithConcurrentInput(VertexManagerPluginContext 
context) {
+super(context);
+  }
+
+  @Override
+  public void initialize() {
+if (getContext().getUserPayload() == null) {
--- End diff --

thanks for pointing this out. done


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680504#comment-16680504
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user yingdachen commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r232084316
  
--- Diff: tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java ---
@@ -82,31 +82,50 @@
 PERSISTED_RELIABLE,
 /**
  * Data produced by the source task is available only while the source 
task
- * is running. This requires the destination task to run concurrently 
with 
- * the source task. This is not supported yet.
+ * is running. This requires the destination task to run concurrently 
with
+ * the source task. Development in progress.
  */
 @Unstable
 EPHEMERAL
   }
-  
+
   /**
-   * Determines when the destination task is eligible to run, once the 
source  
+   * Determines when the destination task is eligible to run, once the 
source
* task is eligible to run.
*/
   public enum SchedulingType {
 /**
- * Destination task is eligible to run after one or more of its source 
tasks 
+ * Destination task is eligible to run after one or more of its source 
tasks
  * have started or completed.
  */
 SEQUENTIAL,
 /**
  * Destination task must run concurrently with the source task.
- *  This is not supported yet.
+ * Development in progress.
  */
 @Unstable
 CONCURRENT
   }
-  
+
+  /**
+   * Determines the relevant event(s) that will assist in scheduling 
downstream vertex
+   * connected via a edge with CONCURRENT {@link SchedulingType}.
+   */
+  public enum ConcurrentEdgeTriggerType {
+/**
+ * trigger downstream vertex tasks scheduling upon upstream vertex 
being configured
--- End diff --

done


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> scheduled together with (all the tasks) in the upstream vertex. In this case, 
> we can leverage the existing onVertexStateUpdated() interface of 
> VextexMangerPlugin to 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680275#comment-16680275
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user beltran commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r232024580
  
--- Diff: 
tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java
 ---
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.vertexmanager;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT;
+
+public class VertexManagerWithConcurrentInput extends VertexManagerPlugin {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class);
+
+  private final Map srcVerticesConfigured = 
Maps.newConcurrentMap();
+  private int managedTasks;
+  private boolean tasksScheduled = false;
+  private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
+  private Configuration vertexConfig;
+  private String vertexName;
+  private ConcurrentEdgeTriggerType edgeTriggerType;
+  private boolean allSrcVerticesConfigured;
+
+  int completedUpstreamTasks;
+
+  public VertexManagerWithConcurrentInput(VertexManagerPluginContext 
context) {
+super(context);
+  }
+
+  @Override
+  public void initialize() {
+if (getContext().getUserPayload() == null) {
--- End diff --

Should the same check as in 
[here](https://github.com/apache/tez/blob/master/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java#L235)
 be done? Otherwise a NullPointerException may happen at `vertexConfig = 
TezUtils.createConfFromUserPayload(getContext().getUserPayload());`.


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680277#comment-16680277
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user beltran commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r232016999
  
--- Diff: 
tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---
@@ -408,6 +409,18 @@ public TezConfiguration(boolean loadDefaults) {
   + "launch.env";
   public static final String TEZ_AM_LAUNCH_ENV_DEFAULT = "";
 
+  /**
+   * String value. Describes the timing of scheduling downstream vertex 
tasks
+   * when the vertex has a concurrent input edge.
--- End diff --

This is a bit hard to undertstand IMO. I think something like "It should be 
set to the name of the event necessary to trigger the scheduling of a vertex 
task" would be clearer


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> scheduled together with (all the tasks) in the upstream vertex. In this case, 
> we can leverage the existing onVertexStateUpdated() interface of 
> VextexMangerPlugin to collect relevant information to assist the scheduling 
> decision, and *there is no additional API change necessary*. However, in more 
> subtle case such as the parameter-server example described in Fig. 1, other 
> scheduling type would be more relevant, therefore the placeholder for 
> *ConcurrentSchedulingType* will be introduced in this change as part of the 
> infrastructure work.
>  
> Finally, since we assume that all communications between two vertices 
> connected via CONCURRENT edge are handled by application runtime, a 
> CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all 
> DME/VME handling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680279#comment-16680279
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user beltran commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r232028676
  
--- Diff: 
tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java
 ---
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.vertexmanager;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT;
+
+public class VertexManagerWithConcurrentInput extends VertexManagerPlugin {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class);
+
+  private final Map srcVerticesConfigured = 
Maps.newConcurrentMap();
+  private int managedTasks;
+  private boolean tasksScheduled = false;
+  private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
+  private Configuration vertexConfig;
+  private String vertexName;
+  private ConcurrentEdgeTriggerType edgeTriggerType;
+  private boolean allSrcVerticesConfigured;
+
+  int completedUpstreamTasks;
+
+  public VertexManagerWithConcurrentInput(VertexManagerPluginContext 
context) {
+super(context);
+  }
+
+  @Override
+  public void initialize() {
+if (getContext().getUserPayload() == null) {
+  throw new TezUncheckedException("user payload cannot be null for 
VertexManagerWithConcurrentInput");
+}
+managedTasks = 
getContext().getVertexNumTasks(getContext().getVertexName());
+Map edges = 
getContext().getInputVertexEdgeProperties();
+for (Map.Entry entry : edges.entrySet()) {
+  if (!CONCURRENT.equals(entry.getValue().getSchedulingType())) {
+throw new TezUncheckedException("All input edges to vertex " + 
vertexName +
+"  must be CONCURRENT.");
+  }
+  String srcVertex = entry.getKey();
+  srcVerticesConfigured.put(srcVertex, false);
+  getContext().registerForVertexStateUpdates(srcVertex, 
EnumSet.of(VertexState.CONFIGURED));
+}
+
+try {
+  vertexConfig = 
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+} catch (IOException e) {
+  throw new TezUncheckedException(e);
+}
+edgeTriggerType = ConcurrentEdgeTriggerType.valueOf(
+vertexConfig.get(TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE,
+TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT));
+if 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680274#comment-16680274
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user beltran commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r232026642
  
--- Diff: 
tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java
 ---
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.vertexmanager;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT;
+
+public class VertexManagerWithConcurrentInput extends VertexManagerPlugin{
+  private static final Logger LOG = 
LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class);
+
+  private final Map srcVerticesConfigured = 
Maps.newConcurrentMap();
+  private int managedTasks;
+  private boolean tasksScheduled = false;
+  private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
+  private Configuration vertexConfig;
+  private String vertexName;
+  private ConcurrentEdgeTriggerType edgeTriggerType;
+  private boolean allSrcVerticesConfigured;
+
+  int completedUpstreamTasks;
+
+  public VertexManagerWithConcurrentInput(VertexManagerPluginContext 
context) {
+super(context);
+  }
+
+  @Override
+  public void initialize() {
+if (getContext().getUserPayload() == null) {
+  throw new TezUncheckedException("user payload cannot be null for 
VertexManagerWithConcurrentInput");
+}
+managedTasks = 
getContext().getVertexNumTasks(getContext().getVertexName());
+Map edges = 
getContext().getInputVertexEdgeProperties();
+for (Map.Entry entry : edges.entrySet()) {
+  if (!CONCURRENT.equals(entry.getValue().getSchedulingType())){
+throw new TezUncheckedException("All input edges to vertex " + 
vertexName +
+"  must be CONCURRENT.");
+  }
+  String srcVertex = entry.getKey();
+  srcVerticesConfigured.put(srcVertex, false);
+  getContext().registerForVertexStateUpdates(srcVertex, 
EnumSet.of(VertexState.CONFIGURED));
+}
+
+try {
+  vertexConfig = 
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+} catch (IOException e) {
+  throw new TezUncheckedException(e);
+}
+edgeTriggerType = ConcurrentEdgeTriggerType.valueOf(
+vertexConfig.get(TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE,
+TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT));
+if 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680276#comment-16680276
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user beltran commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r232014782
  
--- Diff: tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java ---
@@ -82,31 +82,50 @@
 PERSISTED_RELIABLE,
 /**
  * Data produced by the source task is available only while the source 
task
- * is running. This requires the destination task to run concurrently 
with 
- * the source task. This is not supported yet.
+ * is running. This requires the destination task to run concurrently 
with
+ * the source task. Development in progress.
  */
 @Unstable
 EPHEMERAL
   }
-  
+
   /**
-   * Determines when the destination task is eligible to run, once the 
source  
+   * Determines when the destination task is eligible to run, once the 
source
* task is eligible to run.
*/
   public enum SchedulingType {
 /**
- * Destination task is eligible to run after one or more of its source 
tasks 
+ * Destination task is eligible to run after one or more of its source 
tasks
  * have started or completed.
  */
 SEQUENTIAL,
 /**
  * Destination task must run concurrently with the source task.
- *  This is not supported yet.
+ * Development in progress.
  */
 @Unstable
 CONCURRENT
   }
-  
+
+  /**
+   * Determines the relevant event(s) that will assist in scheduling 
downstream vertex
+   * connected via a edge with CONCURRENT {@link SchedulingType}.
+   */
+  public enum ConcurrentEdgeTriggerType {
+/**
+ * trigger downstream vertex tasks scheduling upon upstream vertex 
being configured
--- End diff --

type: vertex -> vertexes?


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> scheduled together with (all the tasks) in the upstream vertex. In this case, 
> we can leverage the existing onVertexStateUpdated() interface of 
> 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680278#comment-16680278
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user beltran commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r232027395
  
--- Diff: 
tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java
 ---
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.vertexmanager;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT;
+
+public class VertexManagerWithConcurrentInput extends VertexManagerPlugin {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class);
+
+  private final Map srcVerticesConfigured = 
Maps.newConcurrentMap();
+  private int managedTasks;
+  private boolean tasksScheduled = false;
+  private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
+  private Configuration vertexConfig;
+  private String vertexName;
+  private ConcurrentEdgeTriggerType edgeTriggerType;
+  private boolean allSrcVerticesConfigured;
--- End diff --

This variable seems to be written/read in several callback. Maybe it's 
worth to make it volatile and prevent some race condition?


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16680280#comment-16680280
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user beltran commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r232032540
  
--- Diff: tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---
@@ -2440,23 +2453,30 @@ public void run() {
 }
   }
 
-  private void startDAG() throws IOException, TezException {
+  private boolean hasConcurrentEdge(DAGPlan dagPlan) {
+boolean hasConcurrentEdge = false;
+for (DAGProtos.EdgePlan edge : dagPlan.getEdgeList()) {
+  if 
(DAGProtos.PlanEdgeSchedulingType.CONCURRENT.equals(edge.getSchedulingType())) {
+return true;
+  }
+}
+return hasConcurrentEdge;
+  }
+
+  private DAGPlan readDAGPlanFile() throws IOException, TezException {
 FileInputStream dagPBBinaryStream = null;
+DAGPlan dagPlan = null;
 try {
-  DAGPlan dagPlan = null;
-
   // Read the protobuf DAG
   dagPBBinaryStream = new FileInputStream(new File(workingDirectory,
   TezConstants.TEZ_PB_PLAN_BINARY_NAME));
   dagPlan = DAGPlan.parseFrom(dagPBBinaryStream);
-
-  startDAG(dagPlan, null);
--- End diff --

I see this has been moved 
[here](https://github.com/apache/tez/pull/33/files#diff-755c0ec043a1800cd6cbf31823a59c8fR2069).
 Are there going to be any consequences with that? The logic is not exactly the 
same since before `startDAG(dagPlan, null);` is called as long as 
`(!isSession)` and now it wouldn't if it gets in the section `if 
(recoveredDAGData != null) {` in `DAGAppMaster.serviceStart`


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> scheduled together with (all the tasks) in the upstream vertex. In this case, 
> we can leverage the existing onVertexStateUpdated() interface of 
> VextexMangerPlugin to collect relevant information to assist the scheduling 
> decision, and *there is no additional API change necessary*. However, in more 
> subtle case such as the parameter-server example described in Fig. 1, other 
> scheduling 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-11-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677780#comment-16677780
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user yingdachen commented on the issue:

https://github.com/apache/tez/pull/33
  
@jteagles could you please take a look at the latest patch? we hope to work 
on TEZ-3999 once this is in.

thanks,


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> scheduled together with (all the tasks) in the upstream vertex. In this case, 
> we can leverage the existing onVertexStateUpdated() interface of 
> VextexMangerPlugin to collect relevant information to assist the scheduling 
> decision, and *there is no additional API change necessary*. However, in more 
> subtle case such as the parameter-server example described in Fig. 1, other 
> scheduling type would be more relevant, therefore the placeholder for 
> *ConcurrentSchedulingType* will be introduced in this change as part of the 
> infrastructure work.
>  
> Finally, since we assume that all communications between two vertices 
> connected via CONCURRENT edge are handled by application runtime, a 
> CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all 
> DME/VME handling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-10-31 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16670920#comment-16670920
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user yingdachen commented on the issue:

https://github.com/apache/tez/pull/33
  
for the latest patch

1. address previous code review comments
2. add check in DAGAppMaster, for a DAG with concurrent edge, disable 
recovery (enforce the recovered DAG to run from scratch in case of AM 
failover). We will tackle proper failover strategy for a DAG with concurrent 
edge in TEZ-4017.


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> scheduled together with (all the tasks) in the upstream vertex. In this case, 
> we can leverage the existing onVertexStateUpdated() interface of 
> VextexMangerPlugin to collect relevant information to assist the scheduling 
> decision, and *there is no additional API change necessary*. However, in more 
> subtle case such as the parameter-server example described in Fig. 1, other 
> scheduling type would be more relevant, therefore the placeholder for 
> *ConcurrentSchedulingType* will be introduced in this change as part of the 
> infrastructure work.
>  
> Finally, since we assume that all communications between two vertices 
> connected via CONCURRENT edge are handled by application runtime, a 
> CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all 
> DME/VME handling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-10-31 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16670918#comment-16670918
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user yingdachen commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r229906256
  
--- Diff: 
tez-runtime-library/src/main/java/org/apache/tez/dag/library/edgemanager/SilentEdgeManager.java
 ---
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.edgemanager;
+
+import org.apache.tez.dag.api.EdgeManagerPlugin;
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A dummy edge manager used in scenarios where application will depend on
+ * the direct connection between containers/tasks to handle all data 
communications,
+ * including both routing and actual data transfers.
+ */
+
+public class SilentEdgeManager extends EdgeManagerPlugin{
--- End diff --

done


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> scheduled together with (all the tasks) in the upstream vertex. In this case, 
> we can leverage the existing onVertexStateUpdated() 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-10-31 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16670914#comment-16670914
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user yingdachen commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r229906154
  
--- Diff: 
tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java
 ---
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.vertexmanager;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT;
+
+public class VertexManagerWithConcurrentInput extends VertexManagerPlugin{
+  private static final Logger LOG = 
LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class);
+
+  private final Map srcVerticesConfigured = 
Maps.newConcurrentMap();
+  private int managedTasks;
+  private boolean tasksScheduled = false;
+  private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
+  private Configuration vertexConfig;
+  private String vertexName;
+  private ConcurrentEdgeTriggerType edgeTriggerType;
+  private boolean allSrcVerticesConfigured;
+
+  int completedUpstreamTasks;
+
+  public VertexManagerWithConcurrentInput(VertexManagerPluginContext 
context) {
+super(context);
+  }
+
+  @Override
+  public void initialize() {
+if (getContext().getUserPayload() == null) {
+  throw new TezUncheckedException("user payload cannot be null for 
VertexManagerWithConcurrentInput");
+}
+managedTasks = 
getContext().getVertexNumTasks(getContext().getVertexName());
+Map edges = 
getContext().getInputVertexEdgeProperties();
+for (Map.Entry entry : edges.entrySet()) {
+  if (!CONCURRENT.equals(entry.getValue().getSchedulingType())){
+throw new TezUncheckedException("All input edges to vertex " + 
vertexName +
+"  must be CONCURRENT.");
+  }
+  String srcVertex = entry.getKey();
+  srcVerticesConfigured.put(srcVertex, false);
+  getContext().registerForVertexStateUpdates(srcVertex, 
EnumSet.of(VertexState.CONFIGURED));
+}
+
+try {
+  vertexConfig = 
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+} catch (IOException e) {
+  throw new TezUncheckedException(e);
+}
+edgeTriggerType = ConcurrentEdgeTriggerType.valueOf(
+vertexConfig.get(TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE,
+TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT));
+if 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-10-31 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16670916#comment-16670916
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user yingdachen commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r229906229
  
--- Diff: 
tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java
 ---
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.vertexmanager;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT;
+
+public class VertexManagerWithConcurrentInput extends VertexManagerPlugin{
+  private static final Logger LOG = 
LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class);
+
+  private final Map srcVerticesConfigured = 
Maps.newConcurrentMap();
+  private int managedTasks;
+  private boolean tasksScheduled = false;
+  private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
+  private Configuration vertexConfig;
+  private String vertexName;
+  private ConcurrentEdgeTriggerType edgeTriggerType;
+  private boolean allSrcVerticesConfigured;
+
+  int completedUpstreamTasks;
+
+  public VertexManagerWithConcurrentInput(VertexManagerPluginContext 
context) {
+super(context);
+  }
+
+  @Override
+  public void initialize() {
+if (getContext().getUserPayload() == null) {
+  throw new TezUncheckedException("user payload cannot be null for 
VertexManagerWithConcurrentInput");
+}
+managedTasks = 
getContext().getVertexNumTasks(getContext().getVertexName());
+Map edges = 
getContext().getInputVertexEdgeProperties();
+for (Map.Entry entry : edges.entrySet()) {
+  if (!CONCURRENT.equals(entry.getValue().getSchedulingType())){
--- End diff --

done


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-10-31 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16670913#comment-16670913
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user yingdachen commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r229906138
  
--- Diff: 
tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestVertexManagerWithConcurrentInput.java
 ---
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.vertexmanager;
+
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.library.edgemanager.SilentEdgeManager;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestVertexManagerWithConcurrentInput {
+
+  @Captor
+  ArgumentCaptor> 
requestCaptor;
+
+  @Before
+  public void init() {
+MockitoAnnotations.initMocks(this);
+  }
+
+  @Test//(timeout=5000)
--- End diff --

done


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-10-31 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16670917#comment-16670917
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user yingdachen commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r229906241
  
--- Diff: 
tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java
 ---
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.vertexmanager;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT;
+
+public class VertexManagerWithConcurrentInput extends VertexManagerPlugin{
--- End diff --

done


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-10-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16658029#comment-16658029
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user yingdachen commented on the issue:

https://github.com/apache/tez/pull/33
  
Will be addressing the review comments in the next few days. 

In addition, some more details will be provided for fault tolerance in 
terms of AM recovery. I am thinking that to begin with, we will at least 
guarantee that the introduction of concurrent edge would not interfere with 
normal AM recovery. 

Design for fully functional recovery (for example, making the entire 
sub-graph connected by concurrent edge a a failure region) will likely to 
follow after that, since there are quite some details to polish.


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> scheduled together with (all the tasks) in the upstream vertex. In this case, 
> we can leverage the existing onVertexStateUpdated() interface of 
> VextexMangerPlugin to collect relevant information to assist the scheduling 
> decision, and *there is no additional API change necessary*. However, in more 
> subtle case such as the parameter-server example described in Fig. 1, other 
> scheduling type would be more relevant, therefore the placeholder for 
> *ConcurrentSchedulingType* will be introduced in this change as part of the 
> infrastructure work.
>  
> Finally, since we assume that all communications between two vertices 
> connected via CONCURRENT edge are handled by application runtime, a 
> CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all 
> DME/VME handling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657408#comment-16657408
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user jteagles commented on the issue:

https://github.com/apache/tez/pull/33
  
@yingdachen, a few other considerations. 1)Normally, a feature like this 
could be a candidate for feature branch. Do to the low risk interaction with 
Systems and backwards compatible API, I think we could allow develop in trunk. 
It may cause some coordinating with ongoing 0.10 build to prevent delay of that 
release line. 2) Please provide some input on fault tolerance of AM and attempt 
in the presence of Concurrent Edges. Specifically, we will want to provide 
fault tolerance guarantees and possible interactions with AM recovery and 
assign component responsibilities and interactions where needed.


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> scheduled together with (all the tasks) in the upstream vertex. In this case, 
> we can leverage the existing onVertexStateUpdated() interface of 
> VextexMangerPlugin to collect relevant information to assist the scheduling 
> decision, and *there is no additional API change necessary*. However, in more 
> subtle case such as the parameter-server example described in Fig. 1, other 
> scheduling type would be more relevant, therefore the placeholder for 
> *ConcurrentSchedulingType* will be introduced in this change as part of the 
> infrastructure work.
>  
> Finally, since we assume that all communications between two vertices 
> connected via CONCURRENT edge are handled by application runtime, a 
> CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all 
> DME/VME handling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657367#comment-16657367
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user jteagles commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r226708226
  
--- Diff: 
tez-runtime-library/src/main/java/org/apache/tez/dag/library/edgemanager/SilentEdgeManager.java
 ---
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.edgemanager;
+
+import org.apache.tez.dag.api.EdgeManagerPlugin;
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A dummy edge manager used in scenarios where application will depend on
+ * the direct connection between containers/tasks to handle all data 
communications,
+ * including both routing and actual data transfers.
+ */
+
+public class SilentEdgeManager extends EdgeManagerPlugin{
--- End diff --

Insert a space between "EdgeManagerPlugin" and "{"
```suggestion
public class SilentEdgeManager extends EdgeManagerPlugin {
```


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657361#comment-16657361
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user jteagles commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r226708831
  
--- Diff: 
tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java
 ---
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.vertexmanager;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT;
+
+public class VertexManagerWithConcurrentInput extends VertexManagerPlugin{
--- End diff --

```suggestion
public class VertexManagerWithConcurrentInput extends VertexManagerPlugin {
```


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657366#comment-16657366
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user jteagles commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r226769441
  
--- Diff: 
tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestVertexManagerWithConcurrentInput.java
 ---
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.vertexmanager;
+
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.library.edgemanager.SilentEdgeManager;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestVertexManagerWithConcurrentInput {
+
+  @Captor
+  ArgumentCaptor> 
requestCaptor;
+
+  @Before
+  public void init() {
+MockitoAnnotations.initMocks(this);
+  }
+
+  @Test//(timeout=5000)
--- End diff --

Perhaps timeout should be uncommented
```suggestion
  @Test(timeout=5000)
```


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657362#comment-16657362
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user jteagles commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r226709122
  
--- Diff: 
tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java
 ---
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.vertexmanager;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT;
+
+public class VertexManagerWithConcurrentInput extends VertexManagerPlugin{
+  private static final Logger LOG = 
LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class);
+
+  private final Map srcVerticesConfigured = 
Maps.newConcurrentMap();
+  private int managedTasks;
+  private boolean tasksScheduled = false;
+  private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
+  private Configuration vertexConfig;
+  private String vertexName;
+  private ConcurrentEdgeTriggerType edgeTriggerType;
+  private boolean allSrcVerticesConfigured;
+
+  int completedUpstreamTasks;
+
+  public VertexManagerWithConcurrentInput(VertexManagerPluginContext 
context) {
+super(context);
+  }
+
+  @Override
+  public void initialize() {
+if (getContext().getUserPayload() == null) {
+  throw new TezUncheckedException("user payload cannot be null for 
VertexManagerWithConcurrentInput");
+}
+managedTasks = 
getContext().getVertexNumTasks(getContext().getVertexName());
+Map edges = 
getContext().getInputVertexEdgeProperties();
+for (Map.Entry entry : edges.entrySet()) {
+  if (!CONCURRENT.equals(entry.getValue().getSchedulingType())){
--- End diff --

```suggestion
  if (!CONCURRENT.equals(entry.getValue().getSchedulingType())) {
```


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657365#comment-16657365
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user jteagles commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r226768726
  
--- Diff: 
tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java
 ---
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.vertexmanager;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT;
+
+public class VertexManagerWithConcurrentInput extends VertexManagerPlugin{
+  private static final Logger LOG = 
LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class);
+
+  private final Map srcVerticesConfigured = 
Maps.newConcurrentMap();
+  private int managedTasks;
+  private boolean tasksScheduled = false;
+  private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
+  private Configuration vertexConfig;
+  private String vertexName;
+  private ConcurrentEdgeTriggerType edgeTriggerType;
+  private boolean allSrcVerticesConfigured;
+
+  int completedUpstreamTasks;
+
+  public VertexManagerWithConcurrentInput(VertexManagerPluginContext 
context) {
+super(context);
+  }
+
+  @Override
+  public void initialize() {
+if (getContext().getUserPayload() == null) {
+  throw new TezUncheckedException("user payload cannot be null for 
VertexManagerWithConcurrentInput");
+}
+managedTasks = 
getContext().getVertexNumTasks(getContext().getVertexName());
+Map edges = 
getContext().getInputVertexEdgeProperties();
+for (Map.Entry entry : edges.entrySet()) {
+  if (!CONCURRENT.equals(entry.getValue().getSchedulingType())){
+throw new TezUncheckedException("All input edges to vertex " + 
vertexName +
+"  must be CONCURRENT.");
+  }
+  String srcVertex = entry.getKey();
+  srcVerticesConfigured.put(srcVertex, false);
+  getContext().registerForVertexStateUpdates(srcVertex, 
EnumSet.of(VertexState.CONFIGURED));
+}
+
+try {
+  vertexConfig = 
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+} catch (IOException e) {
+  throw new TezUncheckedException(e);
+}
+edgeTriggerType = ConcurrentEdgeTriggerType.valueOf(
+vertexConfig.get(TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE,
+TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT));
+if 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657363#comment-16657363
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user jteagles commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r226768963
  
--- Diff: 
tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java
 ---
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.vertexmanager;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT;
+
+public class VertexManagerWithConcurrentInput extends VertexManagerPlugin{
+  private static final Logger LOG = 
LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class);
+
+  private final Map srcVerticesConfigured = 
Maps.newConcurrentMap();
+  private int managedTasks;
+  private boolean tasksScheduled = false;
+  private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
+  private Configuration vertexConfig;
+  private String vertexName;
+  private ConcurrentEdgeTriggerType edgeTriggerType;
+  private boolean allSrcVerticesConfigured;
+
+  int completedUpstreamTasks;
+
+  public VertexManagerWithConcurrentInput(VertexManagerPluginContext 
context) {
+super(context);
+  }
+
+  @Override
+  public void initialize() {
+if (getContext().getUserPayload() == null) {
+  throw new TezUncheckedException("user payload cannot be null for 
VertexManagerWithConcurrentInput");
+}
+managedTasks = 
getContext().getVertexNumTasks(getContext().getVertexName());
+Map edges = 
getContext().getInputVertexEdgeProperties();
+for (Map.Entry entry : edges.entrySet()) {
+  if (!CONCURRENT.equals(entry.getValue().getSchedulingType())){
+throw new TezUncheckedException("All input edges to vertex " + 
vertexName +
+"  must be CONCURRENT.");
+  }
+  String srcVertex = entry.getKey();
+  srcVerticesConfigured.put(srcVertex, false);
+  getContext().registerForVertexStateUpdates(srcVertex, 
EnumSet.of(VertexState.CONFIGURED));
+}
+
+try {
+  vertexConfig = 
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+} catch (IOException e) {
+  throw new TezUncheckedException(e);
+}
+edgeTriggerType = ConcurrentEdgeTriggerType.valueOf(
+vertexConfig.get(TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE,
+TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT));
+if 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657364#comment-16657364
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user jteagles commented on a diff in the pull request:

https://github.com/apache/tez/pull/33#discussion_r226706087
  
--- Diff: tez-api/src/main/java/org/apache/tez/dag/api/DAG.java ---
@@ -79,16 +79,16 @@
 import com.google.common.collect.Sets;
 
 /**
- * Top level entity that defines the DAG (Directed Acyclic Graph) 
representing 
--- End diff --

Usually for formatting changes we submit a separate change to address the 
whitespace changes to keep the logic  changes separate to ease the review. Many 
editors default to automatically fix trailing whitespace on save and hadoop 
developers tend to disable this feature. Github allows me to hide whitespace 
changes so this is not much of a big deal.


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> scheduled together with (all the tasks) in the upstream vertex. In this case, 
> we can leverage the existing onVertexStateUpdated() interface of 
> VextexMangerPlugin to collect relevant information to assist the scheduling 
> decision, and *there is no additional API change necessary*. However, in more 
> subtle case such as the parameter-server example described in Fig. 1, other 
> scheduling type would be more relevant, therefore the placeholder for 
> *ConcurrentSchedulingType* will be introduced in this change as part of the 
> infrastructure work.
>  
> Finally, since we assume that all communications between two vertices 
> connected via CONCURRENT edge are handled by application runtime, a 
> CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all 
> DME/VME handling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656728#comment-16656728
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user yingdachen commented on the issue:

https://github.com/apache/tez/pull/33
  
@jteagles thanks for your input.

Regarding the implication of RM(YARN) supporting gang scheduling, it 
largely depends on how application/runtime is implemented, and may not be an 
absolute necessity.  While it would be best if strong gang-scheduling can be 
guaranteed by RM, but even when without, AM can still assist in several ways:

1. AM can schedule all tasks in upstream and downstream vertices 
simultaneously, as is implemented in this change
2. AM can gather information about downstream/upstream tasks and inform 
dependent tasks to put things into action only when all dependencies are 
satisfied. This will require mechanisms such as waiting/timeout/retry to be 
implemented in application runtime, but that would be something needed anyways, 
even with RM gang-scheduling.  Some of the changes along this line would be 
covered by TEZ-4000.




> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> scheduled together with (all the tasks) in the upstream vertex. In this case, 
> we can leverage the existing onVertexStateUpdated() interface of 
> VextexMangerPlugin to collect relevant information to assist the scheduling 
> decision, and *there is no additional API change necessary*. However, in more 
> subtle case such as the parameter-server example described in Fig. 1, other 
> scheduling type would be more relevant, therefore the placeholder for 
> *ConcurrentSchedulingType* will be introduced in this change as part of the 
> infrastructure work.
>  
> Finally, since we assume that all communications between two vertices 
> connected via CONCURRENT edge are handled by application runtime, a 
> CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all 
> DME/VME handling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-10-17 Thread Jonathan Eagles (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16653996#comment-16653996
 ] 

Jonathan Eagles commented on TEZ-3998:
--

I'm starting to look at this design. I'll need a few more days to read the 
design and give feedback.

> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> scheduled together with (all the tasks) in the upstream vertex. In this case, 
> we can leverage the existing onVertexStateUpdated() interface of 
> VextexMangerPlugin to collect relevant information to assist the scheduling 
> decision, and *there is no additional API change necessary*. However, in more 
> subtle case such as the parameter-server example described in Fig. 1, other 
> scheduling type would be more relevant, therefore the placeholder for 
> *ConcurrentSchedulingType* will be introduced in this change as part of the 
> infrastructure work.
>  
> Finally, since we assume that all communications between two vertices 
> connected via CONCURRENT edge are handled by application runtime, a 
> CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all 
> DME/VME handling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16652766#comment-16652766
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user yingdachen commented on the issue:

https://github.com/apache/tez/pull/33
  
> @yingdachen since this is part of a larger end-to-end feature (Tez-3997) 
would it be better for you to own a feature branch, in which you commit for the 
subtasks, and later, when the feature is complete, merge into master?
> It is easier to provide meaningful feedback when seeing how it all comes 
together - right now, most of the changes are stubs.

@anicoara thanks for the feedback.

couple of things

1. The design doc in 3997 was meant to provide the big picture and as the 
place to collect feedback for the overall design. We believe suitable level of 
details about proposed changes have been provided, and it would be great hear 
from community on the design.

2. The feature was broken into 4 tasks carefully that range from basic 
support and plugin addition(3998), api change in AM component (3999), change in 
runtime (4000) and more complete scenario support (4001). We believe such 
break-down is suitable for us as new contributors to fit in, beginning with a 
simple change discussed here (3998) that is standalone in itself. We are hoping 
that our changes will take a gradual path to completion, which would not 
necessitate a separate feature branch.

3. In the change introduced here, the SilentEdgeManager is indeed a stub, 
that is by design and would not likely to change. However, I would not 
categorize the other change (such as the new VertexManagerWithConcurrentInput) 
as so, since it is functional as it is and we have added UT coverage to cover 
that as well.




> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> scheduled together with (all the tasks) in the upstream vertex. In this case, 
> we can leverage the existing onVertexStateUpdated() interface of 
> VextexMangerPlugin to collect relevant information to assist the scheduling 
> decision, and *there is no additional API change necessary*. However, in more 
> subtle case such as the parameter-server example described in Fig. 1, 

[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16652201#comment-16652201
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

Github user anicoara commented on the issue:

https://github.com/apache/tez/pull/33
  
@yingdachen since this is part of a larger end-to-end feature (Tez-3997) 
would it be better for you to own a feature branch, in which you commit for the 
subtasks, and later, when the feature is complete, merge into master?
It is easier to provide meaningful feedback when seeing how it all comes 
together - right now, most of the changes are stubs.


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> scheduled together with (all the tasks) in the upstream vertex. In this case, 
> we can leverage the existing onVertexStateUpdated() interface of 
> VextexMangerPlugin to collect relevant information to assist the scheduling 
> decision, and *there is no additional API change necessary*. However, in more 
> subtle case such as the parameter-server example described in Fig. 1, other 
> scheduling type would be more relevant, therefore the placeholder for 
> *ConcurrentSchedulingType* will be introduced in this change as part of the 
> infrastructure work.
>  
> Finally, since we assume that all communications between two vertices 
> connected via CONCURRENT edge are handled by application runtime, a 
> CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all 
> DME/VME handling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-10-16 Thread Kuhu Shukla (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651706#comment-16651706
 ] 

Kuhu Shukla commented on TEZ-3998:
--

I plan to review this in the next few days. Would be nice to get some comments 
from [~gopalv] , [~jlowe] and [~jeagles] among others.

> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> scheduled together with (all the tasks) in the upstream vertex. In this case, 
> we can leverage the existing onVertexStateUpdated() interface of 
> VextexMangerPlugin to collect relevant information to assist the scheduling 
> decision, and *there is no additional API change necessary*. However, in more 
> subtle case such as the parameter-server example described in Fig. 1, other 
> scheduling type would be more relevant, therefore the placeholder for 
> *ConcurrentSchedulingType* will be introduced in this change as part of the 
> infrastructure work.
>  
> Finally, since we assume that all communications between two vertices 
> connected via CONCURRENT edge are handled by application runtime, a 
> CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all 
> DME/VME handling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TEZ-3998) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType

2018-10-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16649687#comment-16649687
 ] 

ASF GitHub Bot commented on TEZ-3998:
-

GitHub user yingdachen opened a pull request:

https://github.com/apache/tez/pull/33

TEZ-3998: support constructing DAG with concurrent edge

1. unblock the construction of DAG with concurrent edge
2. introduce VerteManagerWithConcurrentInput with SilentEdgeManager
3. introcuce the concept of ConcurrentEdgeTriggerType
4. UT
Change-Id: I4202e2946b99c440d7d7923151588632d74121cc

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yingdachen/tez 3998

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/tez/pull/33.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #33


commit 5c7557140d816d5c1ca3ae89484c62c62391f856
Author: Yingda CHEN 
Date:   2018-10-15T03:28:37Z

TEZ-3998: support constructing DAG with concurrent edge
1. unblock the construction of DAG with concurrent edge
2. introduce VerteManagerWithConcurrentInput with SilentEdgeManager
3. introcuce the concept of ConcurrentEdgeTriggerType
4. UT
Change-Id: I4202e2946b99c440d7d7923151588632d74121cc




> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -
>
> Key: TEZ-3998
> URL: https://issues.apache.org/jira/browse/TEZ-3998
> Project: Apache Tez
>  Issue Type: Task
>Reporter: Yingda Chen
>Assignee: Yingda Chen
>Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> scheduled together with (all the tasks) in the upstream vertex. In this case, 
> we can leverage the existing onVertexStateUpdated() interface of 
> VextexMangerPlugin to collect relevant information to assist the scheduling 
> decision, and *there is no additional API change necessary*. However, in more 
> subtle case such as the parameter-server example described in Fig. 1, other 
> scheduling type would be more relevant, therefore the placeholder for 
> *ConcurrentSchedulingType* will be introduced in this change as part of the 
> infrastructure work.
>  
> Finally, since we assume that all communications between two vertices 
> connected via CONCURRENT edge are handled by application runtime, a 
> CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all 
> DME/VME