[jira] [Commented] (TEZ-2726) Handle invalid number of partitions for SCATTER-GATHER edge
[ https://issues.apache.org/jira/browse/TEZ-2726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14723758#comment-14723758 ] Bikas Saha commented on TEZ-2726: - The logs from the output and input would be a good start. The question that needs an answer is why did an UnorderedKVOutput produce a composite data movement event. It should produce a simple data movement event and in that case the job would hang because the scatter gather edge would not have any events to route to the other partitions. > Handle invalid number of partitions for SCATTER-GATHER edge > --- > > Key: TEZ-2726 > URL: https://issues.apache.org/jira/browse/TEZ-2726 > Project: Apache Tez > Issue Type: Improvement >Affects Versions: 0.7.0 >Reporter: Saikat >Assignee: Saikat > > Encountered an issue where the source vertex has M task and sink vertex has N > tasks (N > M), [e.g. M = 1, N = 3]and the edge is of type SCATTER -GATHER. > This resulted in sink vertex receiving DMEs with non existent targetIds. > The fetchers for the sink vertex tasks then try to retrieve the map outputs > and retrieve invalid headers due to exception in the ShuffleHandler. > Possible fixes: > 1. raise proper Tez Exception to indicate this invalid scenario. > 2. or write appropriate empty partition bits, for the missing partitions > before sending out the DMEs to sink vertex. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2726) Handle invalid number of partitions for SCATTER-GATHER edge
[ https://issues.apache.org/jira/browse/TEZ-2726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14723474#comment-14723474 ] Saikat commented on TEZ-2726: - [~bikassaha] If you are ok with this approach I can submit an initial patch for review. > Handle invalid number of partitions for SCATTER-GATHER edge > --- > > Key: TEZ-2726 > URL: https://issues.apache.org/jira/browse/TEZ-2726 > Project: Apache Tez > Issue Type: Improvement >Affects Versions: 0.7.0 >Reporter: Saikat >Assignee: Saikat > > Encountered an issue where the source vertex has M task and sink vertex has N > tasks (N > M), [e.g. M = 1, N = 3]and the edge is of type SCATTER -GATHER. > This resulted in sink vertex receiving DMEs with non existent targetIds. > The fetchers for the sink vertex tasks then try to retrieve the map outputs > and retrieve invalid headers due to exception in the ShuffleHandler. > Possible fixes: > 1. raise proper Tez Exception to indicate this invalid scenario. > 2. or write appropriate empty partition bits, for the missing partitions > before sending out the DMEs to sink vertex. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2726) Handle invalid number of partitions for SCATTER-GATHER edge
[ https://issues.apache.org/jira/browse/TEZ-2726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14701705#comment-14701705 ] Saikat commented on TEZ-2726: - [~bikassaha] yes. So is this a proper place to raise an exception? an AMUserCodeException by checking this condition before sending out the CDMEs in Edge.java sendTezEventToDestinationTasks() for a scatter gather edge. > Handle invalid number of partitions for SCATTER-GATHER edge > --- > > Key: TEZ-2726 > URL: https://issues.apache.org/jira/browse/TEZ-2726 > Project: Apache Tez > Issue Type: Improvement >Affects Versions: 0.7.0 >Reporter: Saikat >Assignee: Saikat > > Encountered an issue where the source vertex has M task and sink vertex has N > tasks (N > M), [e.g. M = 1, N = 3]and the edge is of type SCATTER -GATHER. > This resulted in sink vertex receiving DMEs with non existent targetIds. > The fetchers for the sink vertex tasks then try to retrieve the map outputs > and retrieve invalid headers due to exception in the ShuffleHandler. > Possible fixes: > 1. raise proper Tez Exception to indicate this invalid scenario. > 2. or write appropriate empty partition bits, for the missing partitions > before sending out the DMEs to sink vertex. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2726) Handle invalid number of partitions for SCATTER-GATHER edge
[ https://issues.apache.org/jira/browse/TEZ-2726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14701684#comment-14701684 ] Bikas Saha commented on TEZ-2726: - Ah. so the producer task wrote data and that generated a composite event. the edge was scatter-gather. so it expanded that event based on the number of downstream tasks (where num tasks == num partitions). So each downstream task got an input with a different partition index. So the ones that got indices 1 and 2 got the exception. > Handle invalid number of partitions for SCATTER-GATHER edge > --- > > Key: TEZ-2726 > URL: https://issues.apache.org/jira/browse/TEZ-2726 > Project: Apache Tez > Issue Type: Improvement >Affects Versions: 0.7.0 >Reporter: Saikat >Assignee: Saikat > > Encountered an issue where the source vertex has M task and sink vertex has N > tasks (N > M), [e.g. M = 1, N = 3]and the edge is of type SCATTER -GATHER. > This resulted in sink vertex receiving DMEs with non existent targetIds. > The fetchers for the sink vertex tasks then try to retrieve the map outputs > and retrieve invalid headers due to exception in the ShuffleHandler. > Possible fixes: > 1. raise proper Tez Exception to indicate this invalid scenario. > 2. or write appropriate empty partition bits, for the missing partitions > before sending out the DMEs to sink vertex. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2726) Handle invalid number of partitions for SCATTER-GATHER edge
[ https://issues.apache.org/jira/browse/TEZ-2726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14701505#comment-14701505 ] Saikat commented on TEZ-2726: - [~rajesh.balamohan] [~bikassaha] There are no empty partitions in the example I mentioned. The source vertex has 1 task (used a UnorderedKVOutput, so produced only 1 partition)and sink vertex has 3 tasks. The edge is of type SCATTER-GATHER. When http fetchers sent a request for fetching the map outputs, the code in shufflehandler catches IOException in IndexCache.java getIndexInformation() function for the condition [info.mapSpillRecord.size() <= reduce]. 2015-08-10 12:36:42,314 [New I/O worker #32] ERROR mapred.ShuffleHandler: Shuffle error in populating headers : java.io.IOException: Invalid request Map Id = attempt_1437478617943_17839_1_05_00_0_10003 Reducer = 1 Index Info Length = 1 at org.apache.hadoop.mapred.IndexCache.getIndexInformation(IndexCache.java:84) at org.apache.hadoop.mapred.ShuffleHandler$Shuffle.getMapOutputInfo(ShuffleHandler.java:855) at org.apache.hadoop.mapred.ShuffleHandler$Shuffle.populateHeaders(ShuffleHandler.java:875) at org.apache.hadoop.mapred.ShuffleHandler$Shuffle.messageReceived(ShuffleHandler.java:793) I ll try to get an excerpt of the Fetcher logs for DMEs and post here. > Handle invalid number of partitions for SCATTER-GATHER edge > --- > > Key: TEZ-2726 > URL: https://issues.apache.org/jira/browse/TEZ-2726 > Project: Apache Tez > Issue Type: Improvement >Reporter: Saikat >Assignee: Saikat > > Encountered an issue where the source vertex has M task and sink vertex has N > tasks (N > M), [e.g. M = 1, N = 3]and the edge is of type SCATTER -GATHER. > This resulted in sink vertex receiving DMEs with non existent targetIds. > The fetchers for the sink vertex tasks then try to retrieve the map outputs > and retrieve invalid headers due to exception in the ShuffleHandler. > Possible fixes: > 1. raise proper Tez Exception to indicate this invalid scenario. > 2. or write appropriate empty partition bits, for the missing partitions > before sending out the DMEs to sink vertex. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2726) Handle invalid number of partitions for SCATTER-GATHER edge
[ https://issues.apache.org/jira/browse/TEZ-2726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14700555#comment-14700555 ] Bikas Saha commented on TEZ-2726: - Still not sure, what the exact sequence of events was for the error. A planning bug cause empty partitions and somehow Tez handled the empty partitions erroneously? It will really help if we had logs or some sequence of events that produced the error. Tez does have some handling for empty partitions but thats an optimization to not fetch them (since they are empty). > Handle invalid number of partitions for SCATTER-GATHER edge > --- > > Key: TEZ-2726 > URL: https://issues.apache.org/jira/browse/TEZ-2726 > Project: Apache Tez > Issue Type: Improvement >Reporter: Saikat >Assignee: Saikat > > Encountered an issue where the source vertex has M task and sink vertex has N > tasks (N > M), [e.g. M = 1, N = 3]and the edge is of type SCATTER -GATHER. > This resulted in sink vertex receiving DMEs with non existent targetIds. > The fetchers for the sink vertex tasks then try to retrieve the map outputs > and retrieve invalid headers due to exception in the ShuffleHandler. > Possible fixes: > 1. raise proper Tez Exception to indicate this invalid scenario. > 2. or write appropriate empty partition bits, for the missing partitions > before sending out the DMEs to sink vertex. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2726) Handle invalid number of partitions for SCATTER-GATHER edge
[ https://issues.apache.org/jira/browse/TEZ-2726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14700410#comment-14700410 ] Rohini Palaniswamy commented on TEZ-2726: - There was some bug in Pig planning (yet to debug and create jira) which was setting incorrect edge types. > Handle invalid number of partitions for SCATTER-GATHER edge > --- > > Key: TEZ-2726 > URL: https://issues.apache.org/jira/browse/TEZ-2726 > Project: Apache Tez > Issue Type: Improvement >Reporter: Saikat >Assignee: Saikat > > Encountered an issue where the source vertex has M task and sink vertex has N > tasks (N > M), [e.g. M = 1, N = 3]and the edge is of type SCATTER -GATHER. > This resulted in sink vertex receiving DMEs with non existent targetIds. > The fetchers for the sink vertex tasks then try to retrieve the map outputs > and retrieve invalid headers due to exception in the ShuffleHandler. > Possible fixes: > 1. raise proper Tez Exception to indicate this invalid scenario. > 2. or write appropriate empty partition bits, for the missing partitions > before sending out the DMEs to sink vertex. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2726) Handle invalid number of partitions for SCATTER-GATHER edge
[ https://issues.apache.org/jira/browse/TEZ-2726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14700369#comment-14700369 ] Rajesh Balamohan commented on TEZ-2726: --- [~saikatr] - Is there any repro for this? When you say invalid headers, is it something like the following? Can you plz provide more info? {noformat} org.apache.tez.runtime.library.common.shuffle.impl.Fetcher: Invalid map id java.lang.IllegalArgumentException: Invalid header received: W^s??.attempt_1399351577718_4169_1_ partition: 95 {noformat} If so, are you using "tez.runtime.intermediate-output.compress.codec = org.apache.hadoop.io.compress.DefaultCodec" ? > Handle invalid number of partitions for SCATTER-GATHER edge > --- > > Key: TEZ-2726 > URL: https://issues.apache.org/jira/browse/TEZ-2726 > Project: Apache Tez > Issue Type: Improvement >Reporter: Saikat >Assignee: Saikat > > Encountered an issue where the source vertex has M task and sink vertex has N > tasks (N > M), [e.g. M = 1, N = 3]and the edge is of type SCATTER -GATHER. > This resulted in sink vertex receiving DMEs with non existent targetIds. > The fetchers for the sink vertex tasks then try to retrieve the map outputs > and retrieve invalid headers due to exception in the ShuffleHandler. > Possible fixes: > 1. raise proper Tez Exception to indicate this invalid scenario. > 2. or write appropriate empty partition bits, for the missing partitions > before sending out the DMEs to sink vertex. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2726) Handle invalid number of partitions for SCATTER-GATHER edge
[ https://issues.apache.org/jira/browse/TEZ-2726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14700262#comment-14700262 ] Bikas Saha commented on TEZ-2726: - Are there any details as to what exactly happened. I am not clear about that. Seems to be some issue where user misconfiguration caused empty partitions that were not handled correctly? //cc [~rajesh.balamohan] > Handle invalid number of partitions for SCATTER-GATHER edge > --- > > Key: TEZ-2726 > URL: https://issues.apache.org/jira/browse/TEZ-2726 > Project: Apache Tez > Issue Type: Improvement >Reporter: Saikat >Assignee: Saikat > > Encountered an issue where the source vertex has M task and sink vertex has N > tasks (N > M), [e.g. M = 1, N = 3]and the edge is of type SCATTER -GATHER. > This resulted in sink vertex receiving DMEs with non existent targetIds. > The fetchers for the sink vertex tasks then try to retrieve the map outputs > and retrieve invalid headers due to exception in the ShuffleHandler. > Possible fixes: > 1. raise proper Tez Exception to indicate this invalid scenario. > 2. or write appropriate empty partition bits, for the missing partitions > before sending out the DMEs to sink vertex. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2726) Handle invalid number of partitions for SCATTER-GATHER edge
[ https://issues.apache.org/jira/browse/TEZ-2726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14700237#comment-14700237 ] Saikat commented on TEZ-2726: - One possible place to raise a proper exception can be in sendTezEventToDestinationTasks() in Edge.java before sending out the DME. We can raise AMUserCodeException with source as edgemanager, and appropriate message. > Handle invalid number of partitions for SCATTER-GATHER edge > --- > > Key: TEZ-2726 > URL: https://issues.apache.org/jira/browse/TEZ-2726 > Project: Apache Tez > Issue Type: Improvement >Reporter: Saikat >Assignee: Saikat > > Encountered an issue where the source vertex has M task and sink vertex has N > tasks (N > M), [e.g. M = 1, N = 3]and the edge is of type SCATTER -GATHER. > This resulted in sink vertex receiving DMEs with non existent targetIds. > The fetchers for the sink vertex tasks then try to retrieve the map outputs > and retrieve invalid headers due to exception in the ShuffleHandler. > Possible fixes: > 1. raise proper Tez Exception to indicate this invalid scenario. > 2. or write appropriate empty partition bits, for the missing partitions > before sending out the DMEs to sink vertex. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2726) Handle invalid number of partitions for SCATTER-GATHER edge
[ https://issues.apache.org/jira/browse/TEZ-2726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1475#comment-1475 ] Hitesh Shah commented on TEZ-2726: -- \cc [~bikassaha] > Handle invalid number of partitions for SCATTER-GATHER edge > --- > > Key: TEZ-2726 > URL: https://issues.apache.org/jira/browse/TEZ-2726 > Project: Apache Tez > Issue Type: Improvement >Reporter: Saikat >Assignee: Saikat > > Encountered an issue where the source vertex has M task and sink vertex has N > tasks (N > M), [e.g. M = 1, N = 3]and the edge is of type SCATTER -GATHER. > This resulted in sink vertex receiving DMEs with non existent targetIds. > The fetchers for the sink vertex tasks then try to retrieve the map outputs > and retrieve invalid headers due to exception in the ShuffleHandler. > Possible fixes: > 1. raise proper Tez Exception to indicate this invalid scenario. > 2. or write appropriate empty partition bits, for the missing partitions > before sending out the DMEs to sink vertex. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2726) Handle invalid number of partitions for SCATTER-GATHER edge
[ https://issues.apache.org/jira/browse/TEZ-2726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14699963#comment-14699963 ] Jason Lowe commented on TEZ-2726: - +1 for throwing an exception. I think it could be dangerous to assume that putting in empty bits for missing partitions is the correct action to take. If that approach is mistaken we could end up with missing or corrupted outputs for a "successful" job. > Handle invalid number of partitions for SCATTER-GATHER edge > --- > > Key: TEZ-2726 > URL: https://issues.apache.org/jira/browse/TEZ-2726 > Project: Apache Tez > Issue Type: Improvement >Reporter: Saikat >Assignee: Saikat > > Encountered an issue where the source vertex has M task and sink vertex has N > tasks (N > M), [e.g. M = 1, N = 3]and the edge is of type SCATTER -GATHER. > This resulted in sink vertex receiving DMEs with non existent targetIds. > The fetchers for the sink vertex tasks then try to retrieve the map outputs > and retrieve invalid headers due to exception in the ShuffleHandler. > Possible fixes: > 1. raise proper Tez Exception to indicate this invalid scenario. > 2. or write appropriate empty partition bits, for the missing partitions > before sending out the DMEs to sink vertex. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2726) Handle invalid number of partitions for SCATTER-GATHER edge
[ https://issues.apache.org/jira/browse/TEZ-2726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14699945#comment-14699945 ] Rohini Palaniswamy commented on TEZ-2726: - We should raise proper exception in Tez and not write empty partition bits and mask the issue which is most due to some DAG misconfiguration. > Handle invalid number of partitions for SCATTER-GATHER edge > --- > > Key: TEZ-2726 > URL: https://issues.apache.org/jira/browse/TEZ-2726 > Project: Apache Tez > Issue Type: Improvement >Reporter: Saikat >Assignee: Saikat > > Encountered an issue where the source vertex has M task and sink vertex has N > tasks (N > M), [e.g. M = 1, N = 3]and the edge is of type SCATTER -GATHER. > This resulted in sink vertex receiving DMEs with non existent targetIds. > The fetchers for the sink vertex tasks then try to retrieve the map outputs > and retrieve invalid headers due to exception in the ShuffleHandler. > Possible fixes: > 1. raise proper Tez Exception to indicate this invalid scenario. > 2. or write appropriate empty partition bits, for the missing partitions > before sending out the DMEs to sink vertex. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2726) Handle invalid number of partitions for SCATTER-GATHER edge
[ https://issues.apache.org/jira/browse/TEZ-2726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14699942#comment-14699942 ] Saikat commented on TEZ-2726: - Adding [~jlowe] [~rohini] [~jeagles] for watch and comments. > Handle invalid number of partitions for SCATTER-GATHER edge > --- > > Key: TEZ-2726 > URL: https://issues.apache.org/jira/browse/TEZ-2726 > Project: Apache Tez > Issue Type: Improvement >Reporter: Saikat >Assignee: Saikat > > Encountered an issue where the source vertex has M task and sink vertex has N > tasks (N > M), [e.g. M = 1, N = 3]and the edge is of type SCATTER -GATHER. > This resulted in sink vertex receiving DMEs with non existent targetIds. > The fetchers for the sink vertex tasks then try to retrieve the map outputs > and retrieve invalid headers due to exception in the ShuffleHandler. > Possible fixes: > 1. raise proper Tez Exception to indicate this invalid scenario. > 2. or write appropriate empty partition bits, for the missing partitions > before sending out the DMEs to sink vertex. -- This message was sent by Atlassian JIRA (v6.3.4#6332)