[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source
[ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15055295#comment-15055295 ] Rajesh Balamohan commented on TEZ-2496: --- Committed to branch-0.7 >> commit 62e4b6ea1cd4021791ae3b133e8e80c9a82456d6 >> > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source > -- > > Key: TEZ-2496 > URL: https://issues.apache.org/jira/browse/TEZ-2496 > Project: Apache Tez > Issue Type: Improvement >Reporter: Rajesh Balamohan >Assignee: Rajesh Balamohan > Fix For: 0.8.0-alpha > > Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, > TEZ-2496.4.patch, TEZ-2496.5.patch, TEZ-2496.6.patch, TEZ-2496.7.patch, > TEZ-2496.8.patch, TEZ-2496.8.patch, TEZ-2496.9.addendum.patch, > TEZ-2496.9.branch-0.7.patch, TEZ-2496.9.patch > > > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source. This would be helpful in scenarios, where there is > limited resources (or concurrent jobs running or multiple waves) with > dataskew and the task which gets large amount of data gets sceheduled much > later. > e.g Consider the following hive query running in a queue with limited > capacity (42 slots in total) @ 200 GB scale > {noformat} > CREATE TEMPORARY TABLE sampleData AS > SELECT CASE >WHEN ss_sold_time_sk IS NULL THEN 70429 >ELSE ss_sold_time_sk >END AS ss_sold_time_sk, >ss_item_sk, >ss_customer_sk, >ss_cdemo_sk, >ss_hdemo_sk, >ss_addr_sk, >ss_store_sk, >ss_promo_sk, >ss_ticket_number, >ss_quantity, >ss_wholesale_cost, >ss_list_price, >ss_sales_price, >ss_ext_discount_amt, >ss_ext_sales_price, >ss_ext_wholesale_cost, >ss_ext_list_price, >ss_ext_tax, >ss_coupon_amt, >ss_net_paid, >ss_net_paid_inc_tax, >ss_net_profit, >ss_sold_date_sk > FROM store_sales distribute by ss_sold_time_sk; > {noformat} > This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of > nulls are there for ss_sold_time_sk, it would tend to have data skew towards > 70429. If the reducer which gets this data gets scheduled much earlier (i.e > in first wave itself), entire job would finish fast. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source
[ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14633726#comment-14633726 ] TezQA commented on TEZ-2496: {color:green}+1 overall{color}. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12746117/TEZ-2496.9.patch against master revision be5b191. {color:green}+1 @author{color}. The patch does not contain any @author tags. {color:green}+1 tests included{color}. The patch appears to include 6 new or modified test files. {color:green}+1 javac{color}. The applied patch does not increase the total number of javac compiler warnings. {color:green}+1 javadoc{color}. There were no new javadoc warning messages. {color:green}+1 findbugs{color}. The patch does not introduce any new Findbugs (version 3.0.1) warnings. {color:green}+1 release audit{color}. The applied patch does not increase the total number of release audit warnings. {color:green}+1 core tests{color}. The patch passed unit tests in . Test results: https://builds.apache.org/job/PreCommit-TEZ-Build/906//testReport/ Console output: https://builds.apache.org/job/PreCommit-TEZ-Build/906//console This message is automatically generated. > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source > -- > > Key: TEZ-2496 > URL: https://issues.apache.org/jira/browse/TEZ-2496 > Project: Apache Tez > Issue Type: Improvement >Reporter: Rajesh Balamohan >Assignee: Rajesh Balamohan > Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, > TEZ-2496.4.patch, TEZ-2496.5.patch, TEZ-2496.6.patch, TEZ-2496.7.patch, > TEZ-2496.8.patch, TEZ-2496.8.patch, TEZ-2496.9.patch > > > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source. This would be helpful in scenarios, where there is > limited resources (or concurrent jobs running or multiple waves) with > dataskew and the task which gets large amount of data gets sceheduled much > later. > e.g Consider the following hive query running in a queue with limited > capacity (42 slots in total) @ 200 GB scale > {noformat} > CREATE TEMPORARY TABLE sampleData AS > SELECT CASE >WHEN ss_sold_time_sk IS NULL THEN 70429 >ELSE ss_sold_time_sk >END AS ss_sold_time_sk, >ss_item_sk, >ss_customer_sk, >ss_cdemo_sk, >ss_hdemo_sk, >ss_addr_sk, >ss_store_sk, >ss_promo_sk, >ss_ticket_number, >ss_quantity, >ss_wholesale_cost, >ss_list_price, >ss_sales_price, >ss_ext_discount_amt, >ss_ext_sales_price, >ss_ext_wholesale_cost, >ss_ext_list_price, >ss_ext_tax, >ss_coupon_amt, >ss_net_paid, >ss_net_paid_inc_tax, >ss_net_profit, >ss_sold_date_sk > FROM store_sales distribute by ss_sold_time_sk; > {noformat} > This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of > nulls are there for ss_sold_time_sk, it would tend to have data skew towards > 70429. If the reducer which gets this data gets scheduled much earlier (i.e > in first wave itself), entire job would finish fast. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source
[ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14619670#comment-14619670 ] Rajesh Balamohan commented on TEZ-2496: --- Thanks [~bikassaha]. Will implement both changes and run the experiment before committing (did that for .7 version of the patch which has initial code base for this approach). Have to wait for couple of more days, as the cluster I experiment with is down for maintenance. > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source > -- > > Key: TEZ-2496 > URL: https://issues.apache.org/jira/browse/TEZ-2496 > Project: Apache Tez > Issue Type: Improvement >Reporter: Rajesh Balamohan >Assignee: Rajesh Balamohan > Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, > TEZ-2496.4.patch, TEZ-2496.5.patch, TEZ-2496.6.patch, TEZ-2496.7.patch, > TEZ-2496.8.patch, TEZ-2496.8.patch > > > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source. This would be helpful in scenarios, where there is > limited resources (or concurrent jobs running or multiple waves) with > dataskew and the task which gets large amount of data gets sceheduled much > later. > e.g Consider the following hive query running in a queue with limited > capacity (42 slots in total) @ 200 GB scale > {noformat} > CREATE TEMPORARY TABLE sampleData AS > SELECT CASE >WHEN ss_sold_time_sk IS NULL THEN 70429 >ELSE ss_sold_time_sk >END AS ss_sold_time_sk, >ss_item_sk, >ss_customer_sk, >ss_cdemo_sk, >ss_hdemo_sk, >ss_addr_sk, >ss_store_sk, >ss_promo_sk, >ss_ticket_number, >ss_quantity, >ss_wholesale_cost, >ss_list_price, >ss_sales_price, >ss_ext_discount_amt, >ss_ext_sales_price, >ss_ext_wholesale_cost, >ss_ext_list_price, >ss_ext_tax, >ss_coupon_amt, >ss_net_paid, >ss_net_paid_inc_tax, >ss_net_profit, >ss_sold_date_sk > FROM store_sales distribute by ss_sold_time_sk; > {noformat} > This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of > nulls are there for ss_sold_time_sk, it would tend to have data skew towards > 70429. If the reducer which gets this data gets scheduled much earlier (i.e > in first wave itself), entire job would finish fast. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source
[ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14619659#comment-14619659 ] Bikas Saha commented on TEZ-2496: - bq. Used just for short circuiting. In case it returns false, sorting of tasks can be avoided Can that be done by simply tracking if we have received a new vertex manager event? Then executing that rest of that method can be avoided. In any case, statsAvailable probably needs a rename to statsUpdated {code} +boolean statsAvailable = computePartitionSizes();{code} I would change the order to 2, 0, 1 just to be sure that we are not getting fooled by direct or reverse iteration. {code}+//Tasks should be scheduled in task 2, 1, 0 order +long[] sizes = new long[]{(0l), (100 * 1000l * 1000l), (5000 * 1000l * 1000l)};{code} lgtm. +1. Please double check with your previous real life experiment before committing. > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source > -- > > Key: TEZ-2496 > URL: https://issues.apache.org/jira/browse/TEZ-2496 > Project: Apache Tez > Issue Type: Improvement >Reporter: Rajesh Balamohan >Assignee: Rajesh Balamohan > Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, > TEZ-2496.4.patch, TEZ-2496.5.patch, TEZ-2496.6.patch, TEZ-2496.7.patch, > TEZ-2496.8.patch, TEZ-2496.8.patch > > > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source. This would be helpful in scenarios, where there is > limited resources (or concurrent jobs running or multiple waves) with > dataskew and the task which gets large amount of data gets sceheduled much > later. > e.g Consider the following hive query running in a queue with limited > capacity (42 slots in total) @ 200 GB scale > {noformat} > CREATE TEMPORARY TABLE sampleData AS > SELECT CASE >WHEN ss_sold_time_sk IS NULL THEN 70429 >ELSE ss_sold_time_sk >END AS ss_sold_time_sk, >ss_item_sk, >ss_customer_sk, >ss_cdemo_sk, >ss_hdemo_sk, >ss_addr_sk, >ss_store_sk, >ss_promo_sk, >ss_ticket_number, >ss_quantity, >ss_wholesale_cost, >ss_list_price, >ss_sales_price, >ss_ext_discount_amt, >ss_ext_sales_price, >ss_ext_wholesale_cost, >ss_ext_list_price, >ss_ext_tax, >ss_coupon_amt, >ss_net_paid, >ss_net_paid_inc_tax, >ss_net_profit, >ss_sold_date_sk > FROM store_sales distribute by ss_sold_time_sk; > {noformat} > This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of > nulls are there for ss_sold_time_sk, it would tend to have data skew towards > 70429. If the reducer which gets this data gets scheduled much earlier (i.e > in first wave itself), entire job would finish fast. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source
[ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14618460#comment-14618460 ] TezQA commented on TEZ-2496: {color:green}+1 overall{color}. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12744190/TEZ-2496.8.patch against master revision cb59851. {color:green}+1 @author{color}. The patch does not contain any @author tags. {color:green}+1 tests included{color}. The patch appears to include 6 new or modified test files. {color:green}+1 javac{color}. The applied patch does not increase the total number of javac compiler warnings. {color:green}+1 javadoc{color}. There were no new javadoc warning messages. {color:green}+1 findbugs{color}. The patch does not introduce any new Findbugs (version 3.0.1) warnings. {color:green}+1 release audit{color}. The applied patch does not increase the total number of release audit warnings. {color:green}+1 core tests{color}. The patch passed unit tests in . Test results: https://builds.apache.org/job/PreCommit-TEZ-Build/890//testReport/ Console output: https://builds.apache.org/job/PreCommit-TEZ-Build/890//console This message is automatically generated. > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source > -- > > Key: TEZ-2496 > URL: https://issues.apache.org/jira/browse/TEZ-2496 > Project: Apache Tez > Issue Type: Improvement >Reporter: Rajesh Balamohan >Assignee: Rajesh Balamohan > Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, > TEZ-2496.4.patch, TEZ-2496.5.patch, TEZ-2496.6.patch, TEZ-2496.7.patch, > TEZ-2496.8.patch, TEZ-2496.8.patch > > > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source. This would be helpful in scenarios, where there is > limited resources (or concurrent jobs running or multiple waves) with > dataskew and the task which gets large amount of data gets sceheduled much > later. > e.g Consider the following hive query running in a queue with limited > capacity (42 slots in total) @ 200 GB scale > {noformat} > CREATE TEMPORARY TABLE sampleData AS > SELECT CASE >WHEN ss_sold_time_sk IS NULL THEN 70429 >ELSE ss_sold_time_sk >END AS ss_sold_time_sk, >ss_item_sk, >ss_customer_sk, >ss_cdemo_sk, >ss_hdemo_sk, >ss_addr_sk, >ss_store_sk, >ss_promo_sk, >ss_ticket_number, >ss_quantity, >ss_wholesale_cost, >ss_list_price, >ss_sales_price, >ss_ext_discount_amt, >ss_ext_sales_price, >ss_ext_wholesale_cost, >ss_ext_list_price, >ss_ext_tax, >ss_coupon_amt, >ss_net_paid, >ss_net_paid_inc_tax, >ss_net_profit, >ss_sold_date_sk > FROM store_sales distribute by ss_sold_time_sk; > {noformat} > This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of > nulls are there for ss_sold_time_sk, it would tend to have data skew towards > 70429. If the reducer which gets this data gets scheduled much earlier (i.e > in first wave itself), entire job would finish fast. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source
[ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14617708#comment-14617708 ] Bikas Saha commented on TEZ-2496: - Looks good overall. This does not have to be in the API package. {code}diff --git tez-api/src/main/java/org/apache/tez/runtime/api/DATA_RANGE_IN_MB.java tez-api/src/main/java/org/apache/tez/runtime/api/DATA_RANGE_IN_MB.java {code} TEN? Also, looks like the constructor and member var are dead code? {code}+public enum DATA_RANGE_IN_MB { + THOUSAND(1000), HUNDRED(100), TEZ(10), ONE(1), ZERO(0);{code} Do we really need to do math.ceil()? There is probably a bit manipulation method to do this cheaper. {code}+ public static final DATA_RANGE_IN_MB getRange(long sizeInBytes) { +int sizeInMB = (int) Math.ceil(sizeInBytes / (1024 * 1024 * 1.0));{code} Does runtime-internals need roaring bitmaps? {code}diff --git tez-runtime-internals/pom.xml tez-runtime-internals/pom.xml ... + org.roaringbitmap + RoaringBitmap {code} Unnecessary diff {code}diff --git tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java {code} Why do the +1 here instead of in getBucket()? Spreading the bucket logic in 3 places - here + getBucket() + DATA_RANGE_MB is error prone. Perhaps replace all 3 with getBucket()? {code}+for (int i = 0; i < sizes.length; i++) { + int bucket = getBucket(sizes[i]) + 1; {code} No point having 2 vars that can be tracked as one? reportPartitionStats === reportPartitionStats() { return partitions != null}, right? {code}+ protected OutputStatisticsReporter statsReporter; + protected final long[] partitionStats;{code} Still needed? {code} VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class); + when(mockContext.getVertexStatistics(any(String.class))).thenReturn(mock(VertexStatistics.class));{code} Are there existing OrderedPartitionedOutput/PipeLinedSorter/ExternalSorter tests that can be enhanced to verify that partition stats are being recorded? Assuming the ShuffleVertexManager code is the same as when I looked at it the last time. Not sure why the second if part of each of the if checks is useful? Any issues in simply over-writing the new value? {code}+if ((totalStats > 0) && (taskInfo.outputStats != totalStats)) { + computedPartitionSizes = true; + taskInfo.outputStats = totalStats; +} + } else { +if ((stats[index] > 0) && (stats[index] != taskInfo.outputStats)) { + computedPartitionSizes = true; + taskInfo.outputStats = stats[index];{code} If I understand this right, the code is trying to not sort based on this check. But could this be done simply by whether we have received a new stats update event? And move the code from computePartitionSizes()+sortPendingTasksBasedOnDataSize into parsePartitionStats()? Nothing should change unless we have received new stats, right? So all stats dependent updates can be made when we receive new stats. Spurious change? {code}- @Test(timeout = 5000) + @Test(timeout = 500) public void testShuffleVertexManagerAutoParallelism() throws Exception {{code} Why did this change? {code} Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled -Assert.assertTrue(scheduledTasks.size() == 3); +Assert.assertTrue(scheduledTasks.size() == 1);{code} Can the shuffle vertex manager bucket calculation test be further enhanced to verify that the first task to be scheduled is the largest task? that is the final intent of the jira right :) In a separate jira we need to track the bug that the vertex manager event is not resilient to task retries because it does not provide that info. So the same task rerun would cause double counting. Its an existing bug not introduced in this patch. > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source > -- > > Key: TEZ-2496 > URL: https://issues.apache.org/jira/browse/TEZ-2496 > Project: Apache Tez > Issue Type: Improvement >Reporter: Rajesh Balamohan >Assignee: Rajesh Balamohan > Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, > TEZ-2496.4.patch, TEZ-2496.5.patch, TEZ-2496.6.patch, TEZ-2496.7.patch, > TEZ-2496.8.patch > > > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source. This would be helpful in scenarios, where there is > limited resources (or concurrent jobs running or multiple waves) with > dataskew and the task which gets large amount of data gets sceheduled much > later. > e.g Consider the follo
[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source
[ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14617362#comment-14617362 ] Bikas Saha commented on TEZ-2496: - Summarizing an offline discussion on the pros and cons on this approach 1) lives in user land - easier to iterate 2) memory efficient - shuffle vertex manager can apply policy specific to partition stats use case. deterministic sizes mean it can aggregate upon event receipt and discard the raw data 3) cpu efficient - because its not calling getStatistics() repeatedly 4) works with pipelining and getting early stats from running tasks. getStatistics() would get expensive for this. 5) allows for other use cases like sending partition stats to inputs for runtime optimizations. Only the shuffle vertex manager can correctly do this since it merges partitions during auto reduce. 6) Once this has stabilized and optimized then we can transfer the logic to a partition stats API that would be generally available as part of the system. > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source > -- > > Key: TEZ-2496 > URL: https://issues.apache.org/jira/browse/TEZ-2496 > Project: Apache Tez > Issue Type: Improvement >Reporter: Rajesh Balamohan >Assignee: Rajesh Balamohan > Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, > TEZ-2496.4.patch, TEZ-2496.5.patch, TEZ-2496.6.patch, TEZ-2496.7.patch, > TEZ-2496.8.patch > > > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source. This would be helpful in scenarios, where there is > limited resources (or concurrent jobs running or multiple waves) with > dataskew and the task which gets large amount of data gets sceheduled much > later. > e.g Consider the following hive query running in a queue with limited > capacity (42 slots in total) @ 200 GB scale > {noformat} > CREATE TEMPORARY TABLE sampleData AS > SELECT CASE >WHEN ss_sold_time_sk IS NULL THEN 70429 >ELSE ss_sold_time_sk >END AS ss_sold_time_sk, >ss_item_sk, >ss_customer_sk, >ss_cdemo_sk, >ss_hdemo_sk, >ss_addr_sk, >ss_store_sk, >ss_promo_sk, >ss_ticket_number, >ss_quantity, >ss_wholesale_cost, >ss_list_price, >ss_sales_price, >ss_ext_discount_amt, >ss_ext_sales_price, >ss_ext_wholesale_cost, >ss_ext_list_price, >ss_ext_tax, >ss_coupon_amt, >ss_net_paid, >ss_net_paid_inc_tax, >ss_net_profit, >ss_sold_date_sk > FROM store_sales distribute by ss_sold_time_sk; > {noformat} > This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of > nulls are there for ss_sold_time_sk, it would tend to have data skew towards > 70429. If the reducer which gets this data gets scheduled much earlier (i.e > in first wave itself), entire job would finish fast. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source
[ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14616879#comment-14616879 ] TezQA commented on TEZ-2496: {color:red}-1 overall{color}. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12743967/TEZ-2496.8.patch against master revision cb59851. {color:green}+1 @author{color}. The patch does not contain any @author tags. {color:green}+1 tests included{color}. The patch appears to include 5 new or modified test files. {color:green}+1 javac{color}. The applied patch does not increase the total number of javac compiler warnings. {color:green}+1 javadoc{color}. There were no new javadoc warning messages. {color:red}-1 findbugs{color}. The patch appears to introduce 1 new Findbugs (version 3.0.1) warnings. {color:green}+1 release audit{color}. The applied patch does not increase the total number of release audit warnings. {color:green}+1 core tests{color}. The patch passed unit tests in . Test results: https://builds.apache.org/job/PreCommit-TEZ-Build/888//testReport/ Findbugs warnings: https://builds.apache.org/job/PreCommit-TEZ-Build/888//artifact/patchprocess/newPatchFindbugsWarningstez-runtime-library.html Console output: https://builds.apache.org/job/PreCommit-TEZ-Build/888//console This message is automatically generated. > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source > -- > > Key: TEZ-2496 > URL: https://issues.apache.org/jira/browse/TEZ-2496 > Project: Apache Tez > Issue Type: Improvement >Reporter: Rajesh Balamohan >Assignee: Rajesh Balamohan > Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, > TEZ-2496.4.patch, TEZ-2496.5.patch, TEZ-2496.6.patch, TEZ-2496.7.patch, > TEZ-2496.8.patch > > > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source. This would be helpful in scenarios, where there is > limited resources (or concurrent jobs running or multiple waves) with > dataskew and the task which gets large amount of data gets sceheduled much > later. > e.g Consider the following hive query running in a queue with limited > capacity (42 slots in total) @ 200 GB scale > {noformat} > CREATE TEMPORARY TABLE sampleData AS > SELECT CASE >WHEN ss_sold_time_sk IS NULL THEN 70429 >ELSE ss_sold_time_sk >END AS ss_sold_time_sk, >ss_item_sk, >ss_customer_sk, >ss_cdemo_sk, >ss_hdemo_sk, >ss_addr_sk, >ss_store_sk, >ss_promo_sk, >ss_ticket_number, >ss_quantity, >ss_wholesale_cost, >ss_list_price, >ss_sales_price, >ss_ext_discount_amt, >ss_ext_sales_price, >ss_ext_wholesale_cost, >ss_ext_list_price, >ss_ext_tax, >ss_coupon_amt, >ss_net_paid, >ss_net_paid_inc_tax, >ss_net_profit, >ss_sold_date_sk > FROM store_sales distribute by ss_sold_time_sk; > {noformat} > This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of > nulls are there for ss_sold_time_sk, it would tend to have data skew towards > 70429. If the reducer which gets this data gets scheduled much earlier (i.e > in first wave itself), entire job would finish fast. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source
[ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14616679#comment-14616679 ] TezQA commented on TEZ-2496: {color:red}-1 overall{color}. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12743948/TEZ-2496.7.patch against master revision cb59851. {color:green}+1 @author{color}. The patch does not contain any @author tags. {color:green}+1 tests included{color}. The patch appears to include 3 new or modified test files. {color:green}+1 javac{color}. The applied patch does not increase the total number of javac compiler warnings. {color:green}+1 javadoc{color}. There were no new javadoc warning messages. {color:red}-1 findbugs{color}. The patch appears to introduce 1 new Findbugs (version 3.0.1) warnings. {color:green}+1 release audit{color}. The applied patch does not increase the total number of release audit warnings. {color:red}-1 core tests{color}. The patch failed these unit tests in : org.apache.tez.runtime.library.common.sort.impl.TestPipelinedSorter org.apache.tez.runtime.library.common.sort.impl.dflt.TestDefaultSorter Test results: https://builds.apache.org/job/PreCommit-TEZ-Build/887//testReport/ Findbugs warnings: https://builds.apache.org/job/PreCommit-TEZ-Build/887//artifact/patchprocess/newPatchFindbugsWarningstez-runtime-library.html Console output: https://builds.apache.org/job/PreCommit-TEZ-Build/887//console This message is automatically generated. > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source > -- > > Key: TEZ-2496 > URL: https://issues.apache.org/jira/browse/TEZ-2496 > Project: Apache Tez > Issue Type: Improvement >Reporter: Rajesh Balamohan >Assignee: Rajesh Balamohan > Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, > TEZ-2496.4.patch, TEZ-2496.5.patch, TEZ-2496.6.patch, TEZ-2496.7.patch > > > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source. This would be helpful in scenarios, where there is > limited resources (or concurrent jobs running or multiple waves) with > dataskew and the task which gets large amount of data gets sceheduled much > later. > e.g Consider the following hive query running in a queue with limited > capacity (42 slots in total) @ 200 GB scale > {noformat} > CREATE TEMPORARY TABLE sampleData AS > SELECT CASE >WHEN ss_sold_time_sk IS NULL THEN 70429 >ELSE ss_sold_time_sk >END AS ss_sold_time_sk, >ss_item_sk, >ss_customer_sk, >ss_cdemo_sk, >ss_hdemo_sk, >ss_addr_sk, >ss_store_sk, >ss_promo_sk, >ss_ticket_number, >ss_quantity, >ss_wholesale_cost, >ss_list_price, >ss_sales_price, >ss_ext_discount_amt, >ss_ext_sales_price, >ss_ext_wholesale_cost, >ss_ext_list_price, >ss_ext_tax, >ss_coupon_amt, >ss_net_paid, >ss_net_paid_inc_tax, >ss_net_profit, >ss_sold_date_sk > FROM store_sales distribute by ss_sold_time_sk; > {noformat} > This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of > nulls are there for ss_sold_time_sk, it would tend to have data skew towards > 70429. If the reducer which gets this data gets scheduled much earlier (i.e > in first wave itself), entire job would finish fast. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source
[ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14615866#comment-14615866 ] Bikas Saha commented on TEZ-2496: - Yes. That is what I am suggesting because I think going the full fledged partition route is risky in its current form. Not sure if you disagree with that risk assessment. The other features mentioned would need new information to be passed to the reducer inputs right? That could be done via new events sent from shufflevertexmanager to the tasks (eg. input data information event being used to send splits today). > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source > -- > > Key: TEZ-2496 > URL: https://issues.apache.org/jira/browse/TEZ-2496 > Project: Apache Tez > Issue Type: Improvement >Reporter: Rajesh Balamohan >Assignee: Rajesh Balamohan > Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, > TEZ-2496.4.patch, TEZ-2496.5.patch, TEZ-2496.6.patch > > > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source. This would be helpful in scenarios, where there is > limited resources (or concurrent jobs running or multiple waves) with > dataskew and the task which gets large amount of data gets sceheduled much > later. > e.g Consider the following hive query running in a queue with limited > capacity (42 slots in total) @ 200 GB scale > {noformat} > CREATE TEMPORARY TABLE sampleData AS > SELECT CASE >WHEN ss_sold_time_sk IS NULL THEN 70429 >ELSE ss_sold_time_sk >END AS ss_sold_time_sk, >ss_item_sk, >ss_customer_sk, >ss_cdemo_sk, >ss_hdemo_sk, >ss_addr_sk, >ss_store_sk, >ss_promo_sk, >ss_ticket_number, >ss_quantity, >ss_wholesale_cost, >ss_list_price, >ss_sales_price, >ss_ext_discount_amt, >ss_ext_sales_price, >ss_ext_wholesale_cost, >ss_ext_list_price, >ss_ext_tax, >ss_coupon_amt, >ss_net_paid, >ss_net_paid_inc_tax, >ss_net_profit, >ss_sold_date_sk > FROM store_sales distribute by ss_sold_time_sk; > {noformat} > This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of > nulls are there for ss_sold_time_sk, it would tend to have data skew towards > 70429. If the reducer which gets this data gets scheduled much earlier (i.e > in first wave itself), entire job would finish fast. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source
[ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14615847#comment-14615847 ] Rajesh Balamohan commented on TEZ-2496: --- - For 50K x 20K, it would end up with ~760MB when all 50K are in running phase. If number of concurrent tasks being executed are in the order of 10K, the pressure would reduced as the aggregation happens in vertex level. - The proposed approach is piggybacking on bundling the partition stats with VertexManagerEvent in ShuffleUtils, so that ShuffleVertexManager can receive then in onVertexManagerEventReceived(). And the aggregation happens in ShuffleVertexManager itself by unpacking the bitmap/roaringbitmap?. That would help this usecase to move forward, but might affect other improvements related to this jira (like better memory allocation, better thread handling in fetchers based on stats). > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source > -- > > Key: TEZ-2496 > URL: https://issues.apache.org/jira/browse/TEZ-2496 > Project: Apache Tez > Issue Type: Improvement >Reporter: Rajesh Balamohan >Assignee: Rajesh Balamohan > Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, > TEZ-2496.4.patch, TEZ-2496.5.patch, TEZ-2496.6.patch > > > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source. This would be helpful in scenarios, where there is > limited resources (or concurrent jobs running or multiple waves) with > dataskew and the task which gets large amount of data gets sceheduled much > later. > e.g Consider the following hive query running in a queue with limited > capacity (42 slots in total) @ 200 GB scale > {noformat} > CREATE TEMPORARY TABLE sampleData AS > SELECT CASE >WHEN ss_sold_time_sk IS NULL THEN 70429 >ELSE ss_sold_time_sk >END AS ss_sold_time_sk, >ss_item_sk, >ss_customer_sk, >ss_cdemo_sk, >ss_hdemo_sk, >ss_addr_sk, >ss_store_sk, >ss_promo_sk, >ss_ticket_number, >ss_quantity, >ss_wholesale_cost, >ss_list_price, >ss_sales_price, >ss_ext_discount_amt, >ss_ext_sales_price, >ss_ext_wholesale_cost, >ss_ext_list_price, >ss_ext_tax, >ss_coupon_amt, >ss_net_paid, >ss_net_paid_inc_tax, >ss_net_profit, >ss_sold_date_sk > FROM store_sales distribute by ss_sold_time_sk; > {noformat} > This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of > nulls are there for ss_sold_time_sk, it would tend to have data skew towards > 70429. If the reducer which gets this data gets scheduled much earlier (i.e > in first wave itself), entire job would finish fast. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source
[ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14615484#comment-14615484 ] Bikas Saha commented on TEZ-2496: - IMO, the solution is risky and I am wary of adding it into the API for everyone to use because its not clear that we have a viable long term solution. A single job with 50Kx20K vertices would end up with 1GB or more in the bitmap. Or multiple large vertices running in parallel would hold memory. Secondly, clearing the partition stats makes this susceptible to even 1 task failure. Clearing some stats but keeping others may lead of inconsistencies down the road or affect other stats improvements. Approximating large histograms is not a new problem and we may be able to find some known work (like https://metamarkets.com/2013/histograms/) to create a more viable algorithmic solution. However, we do need to start experimenting with what we can do with stats and so making progress is also important. So here is my suggestion. How about we dont make this part of the API or the internal stats flow for now? That keeps the main engine scalable and we dont expose a WIP API. We can send the same bitmap information in VertexManagerEvents from the output to the ShuffleVertexManager. We already do that to send the total output size per task. We can enhance the payload to send the partition info instead. This way we can send the necessary info and solve the scenario this jira is trying to address. Here are the pros I see with this 1) Uses the existing event mechanism by enhancing its payload. We know this works e2e and less scope for bugs. 2) Does not change the internal flow or API's. 3) Is localized to the ShuffleVertexManager (which needs this) and the feature can be turned on/off based on config for large jobs if needed, API's cannot be turned on/off based on config. Essentially, this lives in "user" land instead of "tez" land. Once we have experimented with this and refined this, we can consider moving this from user land into tez land by adding it to the API infra. Thoughts? > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source > -- > > Key: TEZ-2496 > URL: https://issues.apache.org/jira/browse/TEZ-2496 > Project: Apache Tez > Issue Type: Improvement >Reporter: Rajesh Balamohan >Assignee: Rajesh Balamohan > Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, > TEZ-2496.4.patch, TEZ-2496.5.patch, TEZ-2496.6.patch > > > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source. This would be helpful in scenarios, where there is > limited resources (or concurrent jobs running or multiple waves) with > dataskew and the task which gets large amount of data gets sceheduled much > later. > e.g Consider the following hive query running in a queue with limited > capacity (42 slots in total) @ 200 GB scale > {noformat} > CREATE TEMPORARY TABLE sampleData AS > SELECT CASE >WHEN ss_sold_time_sk IS NULL THEN 70429 >ELSE ss_sold_time_sk >END AS ss_sold_time_sk, >ss_item_sk, >ss_customer_sk, >ss_cdemo_sk, >ss_hdemo_sk, >ss_addr_sk, >ss_store_sk, >ss_promo_sk, >ss_ticket_number, >ss_quantity, >ss_wholesale_cost, >ss_list_price, >ss_sales_price, >ss_ext_discount_amt, >ss_ext_sales_price, >ss_ext_wholesale_cost, >ss_ext_list_price, >ss_ext_tax, >ss_coupon_amt, >ss_net_paid, >ss_net_paid_inc_tax, >ss_net_profit, >ss_sold_date_sk > FROM store_sales distribute by ss_sold_time_sk; > {noformat} > This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of > nulls are there for ss_sold_time_sk, it would tend to have data skew towards > 70429. If the reducer which gets this data gets scheduled much earlier (i.e > in first wave itself), entire job would finish fast. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source
[ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14614873#comment-14614873 ] TezQA commented on TEZ-2496: {color:red}-1 overall{color}. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12743701/TEZ-2496.6.patch against master revision cb59851. {color:green}+1 @author{color}. The patch does not contain any @author tags. {color:green}+1 tests included{color}. The patch appears to include 6 new or modified test files. {color:green}+1 javac{color}. The applied patch does not increase the total number of javac compiler warnings. {color:red}-1 javadoc{color}. The javadoc tool appears to have generated 1 warning messages. See https://builds.apache.org/job/PreCommit-TEZ-Build/884//artifact/patchprocess/diffJavadocWarnings.txt for details. {color:green}+1 findbugs{color}. The patch does not introduce any new Findbugs (version 3.0.1) warnings. {color:green}+1 release audit{color}. The applied patch does not increase the total number of release audit warnings. {color:green}+1 core tests{color}. The patch passed unit tests in . Test results: https://builds.apache.org/job/PreCommit-TEZ-Build/884//testReport/ Console output: https://builds.apache.org/job/PreCommit-TEZ-Build/884//console This message is automatically generated. > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source > -- > > Key: TEZ-2496 > URL: https://issues.apache.org/jira/browse/TEZ-2496 > Project: Apache Tez > Issue Type: Improvement >Reporter: Rajesh Balamohan >Assignee: Rajesh Balamohan > Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, > TEZ-2496.4.patch, TEZ-2496.5.patch, TEZ-2496.6.patch > > > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source. This would be helpful in scenarios, where there is > limited resources (or concurrent jobs running or multiple waves) with > dataskew and the task which gets large amount of data gets sceheduled much > later. > e.g Consider the following hive query running in a queue with limited > capacity (42 slots in total) @ 200 GB scale > {noformat} > CREATE TEMPORARY TABLE sampleData AS > SELECT CASE >WHEN ss_sold_time_sk IS NULL THEN 70429 >ELSE ss_sold_time_sk >END AS ss_sold_time_sk, >ss_item_sk, >ss_customer_sk, >ss_cdemo_sk, >ss_hdemo_sk, >ss_addr_sk, >ss_store_sk, >ss_promo_sk, >ss_ticket_number, >ss_quantity, >ss_wholesale_cost, >ss_list_price, >ss_sales_price, >ss_ext_discount_amt, >ss_ext_sales_price, >ss_ext_wholesale_cost, >ss_ext_list_price, >ss_ext_tax, >ss_coupon_amt, >ss_net_paid, >ss_net_paid_inc_tax, >ss_net_profit, >ss_sold_date_sk > FROM store_sales distribute by ss_sold_time_sk; > {noformat} > This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of > nulls are there for ss_sold_time_sk, it would tend to have data skew towards > 70429. If the reducer which gets this data gets scheduled much earlier (i.e > in first wave itself), entire job would finish fast. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source
[ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14614587#comment-14614587 ] TezQA commented on TEZ-2496: {color:red}-1 overall{color}. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12743666/TEZ-2496.5.patch against master revision cb59851. {color:green}+1 @author{color}. The patch does not contain any @author tags. {color:green}+1 tests included{color}. The patch appears to include 6 new or modified test files. {color:green}+1 javac{color}. The applied patch does not increase the total number of javac compiler warnings. {color:red}-1 javadoc{color}. The javadoc tool appears to have generated 1 warning messages. See https://builds.apache.org/job/PreCommit-TEZ-Build/883//artifact/patchprocess/diffJavadocWarnings.txt for details. {color:green}+1 findbugs{color}. The patch does not introduce any new Findbugs (version 3.0.1) warnings. {color:green}+1 release audit{color}. The applied patch does not increase the total number of release audit warnings. {color:red}-1 core tests{color}. The patch failed these unit tests in : org.apache.tez.test.TestTezJobs org.apache.tez.test.TestFaultTolerance Test results: https://builds.apache.org/job/PreCommit-TEZ-Build/883//testReport/ Console output: https://builds.apache.org/job/PreCommit-TEZ-Build/883//console This message is automatically generated. > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source > -- > > Key: TEZ-2496 > URL: https://issues.apache.org/jira/browse/TEZ-2496 > Project: Apache Tez > Issue Type: Improvement >Reporter: Rajesh Balamohan >Assignee: Rajesh Balamohan > Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, > TEZ-2496.4.patch, TEZ-2496.5.patch > > > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source. This would be helpful in scenarios, where there is > limited resources (or concurrent jobs running or multiple waves) with > dataskew and the task which gets large amount of data gets sceheduled much > later. > e.g Consider the following hive query running in a queue with limited > capacity (42 slots in total) @ 200 GB scale > {noformat} > CREATE TEMPORARY TABLE sampleData AS > SELECT CASE >WHEN ss_sold_time_sk IS NULL THEN 70429 >ELSE ss_sold_time_sk >END AS ss_sold_time_sk, >ss_item_sk, >ss_customer_sk, >ss_cdemo_sk, >ss_hdemo_sk, >ss_addr_sk, >ss_store_sk, >ss_promo_sk, >ss_ticket_number, >ss_quantity, >ss_wholesale_cost, >ss_list_price, >ss_sales_price, >ss_ext_discount_amt, >ss_ext_sales_price, >ss_ext_wholesale_cost, >ss_ext_list_price, >ss_ext_tax, >ss_coupon_amt, >ss_net_paid, >ss_net_paid_inc_tax, >ss_net_profit, >ss_sold_date_sk > FROM store_sales distribute by ss_sold_time_sk; > {noformat} > This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of > nulls are there for ss_sold_time_sk, it would tend to have data skew towards > 70429. If the reducer which gets this data gets scheduled much earlier (i.e > in first wave itself), entire job would finish fast. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source
[ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14589311#comment-14589311 ] TezQA commented on TEZ-2496: {color:red}-1 overall{color}. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12740034/TEZ-2496.4.patch against master revision d160781. {color:green}+1 @author{color}. The patch does not contain any @author tags. {color:green}+1 tests included{color}. The patch appears to include 6 new or modified test files. {color:red}-1 javac{color:red}. The patch appears to cause the build to fail. Console output: https://builds.apache.org/job/PreCommit-TEZ-Build/832//console This message is automatically generated. > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source > -- > > Key: TEZ-2496 > URL: https://issues.apache.org/jira/browse/TEZ-2496 > Project: Apache Tez > Issue Type: Improvement >Reporter: Rajesh Balamohan >Assignee: Rajesh Balamohan > Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, > TEZ-2496.4.patch > > > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source. This would be helpful in scenarios, where there is > limited resources (or concurrent jobs running or multiple waves) with > dataskew and the task which gets large amount of data gets sceheduled much > later. > e.g Consider the following hive query running in a queue with limited > capacity (42 slots in total) @ 200 GB scale > {noformat} > CREATE TEMPORARY TABLE sampleData AS > SELECT CASE >WHEN ss_sold_time_sk IS NULL THEN 70429 >ELSE ss_sold_time_sk >END AS ss_sold_time_sk, >ss_item_sk, >ss_customer_sk, >ss_cdemo_sk, >ss_hdemo_sk, >ss_addr_sk, >ss_store_sk, >ss_promo_sk, >ss_ticket_number, >ss_quantity, >ss_wholesale_cost, >ss_list_price, >ss_sales_price, >ss_ext_discount_amt, >ss_ext_sales_price, >ss_ext_wholesale_cost, >ss_ext_list_price, >ss_ext_tax, >ss_coupon_amt, >ss_net_paid, >ss_net_paid_inc_tax, >ss_net_profit, >ss_sold_date_sk > FROM store_sales distribute by ss_sold_time_sk; > {noformat} > This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of > nulls are there for ss_sold_time_sk, it would tend to have data skew towards > 70429. If the reducer which gets this data gets scheduled much earlier (i.e > in first wave itself), entire job would finish fast. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source
[ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14589272#comment-14589272 ] Rajesh Balamohan commented on TEZ-2496: --- Review board link: https://reviews.apache.org/r/35548/diff/ > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source > -- > > Key: TEZ-2496 > URL: https://issues.apache.org/jira/browse/TEZ-2496 > Project: Apache Tez > Issue Type: Improvement >Reporter: Rajesh Balamohan >Assignee: Rajesh Balamohan > Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, > TEZ-2496.4.patch > > > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source. This would be helpful in scenarios, where there is > limited resources (or concurrent jobs running or multiple waves) with > dataskew and the task which gets large amount of data gets sceheduled much > later. > e.g Consider the following hive query running in a queue with limited > capacity (42 slots in total) @ 200 GB scale > {noformat} > CREATE TEMPORARY TABLE sampleData AS > SELECT CASE >WHEN ss_sold_time_sk IS NULL THEN 70429 >ELSE ss_sold_time_sk >END AS ss_sold_time_sk, >ss_item_sk, >ss_customer_sk, >ss_cdemo_sk, >ss_hdemo_sk, >ss_addr_sk, >ss_store_sk, >ss_promo_sk, >ss_ticket_number, >ss_quantity, >ss_wholesale_cost, >ss_list_price, >ss_sales_price, >ss_ext_discount_amt, >ss_ext_sales_price, >ss_ext_wholesale_cost, >ss_ext_list_price, >ss_ext_tax, >ss_coupon_amt, >ss_net_paid, >ss_net_paid_inc_tax, >ss_net_profit, >ss_sold_date_sk > FROM store_sales distribute by ss_sold_time_sk; > {noformat} > This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of > nulls are there for ss_sold_time_sk, it would tend to have data skew towards > 70429. If the reducer which gets this data gets scheduled much earlier (i.e > in first wave itself), entire job would finish fast. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source
[ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14585373#comment-14585373 ] Bikas Saha commented on TEZ-2496: - Will thread safety be an issue if a different partition writers are writing the their partition sizes? {code} + public void setDataSizeForDest(int destIndex, long size) { +if (partitionStatistics == null) { + partitionStatistics = new BitSet(); +}{code} Since sizeInMb is used in getBucket() can the whole calculation wrt size be moved to getBucket() to simplify the code. {code}+ public void setDataSizeForDest(int destIndex, long size) { +if (partitionStatistics == null) { + partitionStatistics = new BitSet(); +} +int sizeInMB = (int) (size / (1024l * 1024l)); +int index = destIndex * (BUCKET_SIZE_IN_MB.length + 1); +int bucket = getBucket(sizeInMB) + 1;{code} The actual int value is not being used anywhere other than as an exists check, right? Then why not simply writeBoolean? {code}+int cardinality = in.readInt(); +if (cardinality > -1) { + BytesWritable pStats = new BytesWritable();{code} Duplication of existing code? {code}+ private int[] createIndices(int partitionRange, int taskIndex, int offSetPerTask) { +int startIndex = taskIndex * offSetPerTask; +int[] indices = new int[partitionRange]; +for (int currentIndex = 0; currentIndex < partitionRange; ++currentIndex) { + indices[currentIndex] = (startIndex + currentIndex); +} +return indices; + }{code} The code in shufflevertexmanager can be tricky to get right :) Needs some targeted test coverage, ideally with odd and even number of partitions and basePartitionRange == & != remainderRangeForLastShuffle. Does computing partition size and sorting have to be done every time a task is scheduled? >From the API pov, this is introducing redundancy with the existing >setDataSize() that sets it globally. Do both need to be called now? Perhaps we >can create setDataSizeWithPartitions(long[] sizes) such that if partition >sizes are reported then this can be used, else the existing global API can be >used. Internally, setDataSizeWithPartitions() could 1) check all partitions >have been specified and then also set the global size by summing the >partitions. Do we have some before after numbers that show memory usage in the AM, say for a 10Kx10K job. Ideally, a new case could be added in TestMemoryWithEvents that covers this case of sending per partition stats so that we can repro easily. > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source > -- > > Key: TEZ-2496 > URL: https://issues.apache.org/jira/browse/TEZ-2496 > Project: Apache Tez > Issue Type: Improvement >Reporter: Rajesh Balamohan >Assignee: Rajesh Balamohan > Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch > > > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source. This would be helpful in scenarios, where there is > limited resources (or concurrent jobs running or multiple waves) with > dataskew and the task which gets large amount of data gets sceheduled much > later. > e.g Consider the following hive query running in a queue with limited > capacity (42 slots in total) @ 200 GB scale > {noformat} > CREATE TEMPORARY TABLE sampleData AS > SELECT CASE >WHEN ss_sold_time_sk IS NULL THEN 70429 >ELSE ss_sold_time_sk >END AS ss_sold_time_sk, >ss_item_sk, >ss_customer_sk, >ss_cdemo_sk, >ss_hdemo_sk, >ss_addr_sk, >ss_store_sk, >ss_promo_sk, >ss_ticket_number, >ss_quantity, >ss_wholesale_cost, >ss_list_price, >ss_sales_price, >ss_ext_discount_amt, >ss_ext_sales_price, >ss_ext_wholesale_cost, >ss_ext_list_price, >ss_ext_tax, >ss_coupon_amt, >ss_net_paid, >ss_net_paid_inc_tax, >ss_net_profit, >ss_sold_date_sk > FROM store_sales distribute by ss_sold_time_sk; > {noformat} > This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of > nulls are there for ss_sold_time_sk, it would tend to have data skew towards > 70429. If the reducer which gets this data gets scheduled much earlier (i.e > in first wave itself), entire job would finish fast. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source
[ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14578109#comment-14578109 ] Bikas Saha commented on TEZ-2496: - Will look at it this week. > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source > -- > > Key: TEZ-2496 > URL: https://issues.apache.org/jira/browse/TEZ-2496 > Project: Apache Tez > Issue Type: Improvement >Reporter: Rajesh Balamohan >Assignee: Rajesh Balamohan > Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch > > > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source. This would be helpful in scenarios, where there is > limited resources (or concurrent jobs running or multiple waves) with > dataskew and the task which gets large amount of data gets sceheduled much > later. > e.g Consider the following hive query running in a queue with limited > capacity (42 slots in total) @ 200 GB scale > {noformat} > CREATE TEMPORARY TABLE sampleData AS > SELECT CASE >WHEN ss_sold_time_sk IS NULL THEN 70429 >ELSE ss_sold_time_sk >END AS ss_sold_time_sk, >ss_item_sk, >ss_customer_sk, >ss_cdemo_sk, >ss_hdemo_sk, >ss_addr_sk, >ss_store_sk, >ss_promo_sk, >ss_ticket_number, >ss_quantity, >ss_wholesale_cost, >ss_list_price, >ss_sales_price, >ss_ext_discount_amt, >ss_ext_sales_price, >ss_ext_wholesale_cost, >ss_ext_list_price, >ss_ext_tax, >ss_coupon_amt, >ss_net_paid, >ss_net_paid_inc_tax, >ss_net_profit, >ss_sold_date_sk > FROM store_sales distribute by ss_sold_time_sk; > {noformat} > This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of > nulls are there for ss_sold_time_sk, it would tend to have data skew towards > 70429. If the reducer which gets this data gets scheduled much earlier (i.e > in first wave itself), entire job would finish fast. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source
[ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14570714#comment-14570714 ] Rajesh Balamohan commented on TEZ-2496: --- Thanks [~sseth] >> For large jobs (large number of partitions), does this information also >> become too expensive ? - Since it is in compressed format & not materialized unless needed, it wasn't adding much to memory pressure. IIRC, it was adding couple of more bytes (much < 10-50 bytes in compressed form), but definitely can add the option of enabling it as needed. >> The way the data is aggregated is very lossy. 100 tasks generating <1MB per >> partition will show up as 1MB per partition. While that's ok to determine >> the sort order - it can get in the way of other optimizations which may be >> possible with such statistics. Rather than providing aggregated stats - it >> may be better to provide task level statistics, and let the consumer >> aggregate the information as they see fit. - Yes, this was done purposefully to reduce the memory/cpu pressure. Completely agree that it can be an issue for other optimizations. Might have to change from "getDataSize(int destIndex)" --> "getTaskDataSizes()" and let the consumer decide on aggregation. Will change it. >> The APIs get interesting as well, especially when adding a flag to >> enable/disable these stats. Returning 0 is loss of information. May need an >> API to check whether such stats are being published by the specific Output. >> Alternately a return code which indicates that stats are not available. >> Beyond this - are there any semantics on whether all tasks must publish >> these stats if one task in the vertex is publishing them ? No, currently all tasks publish the info. If it is not available, it would not be accounted for the sort order in ShuffleVertexManager. >> Depending on how the API is structures - invocations to get these statistics >> would need to indicate that these stats are disabled. On the API - what's >> reported is the exact size, but what's available is a range. The reporting / >> get API could be an ENUM indicating the range / absence of stats to be >> consistent. - Agreed, will change it. > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source > -- > > Key: TEZ-2496 > URL: https://issues.apache.org/jira/browse/TEZ-2496 > Project: Apache Tez > Issue Type: Improvement >Reporter: Rajesh Balamohan >Assignee: Rajesh Balamohan > Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch > > > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source. This would be helpful in scenarios, where there is > limited resources (or concurrent jobs running or multiple waves) with > dataskew and the task which gets large amount of data gets sceheduled much > later. > e.g Consider the following hive query running in a queue with limited > capacity (42 slots in total) @ 200 GB scale > {noformat} > CREATE TEMPORARY TABLE sampleData AS > SELECT CASE >WHEN ss_sold_time_sk IS NULL THEN 70429 >ELSE ss_sold_time_sk >END AS ss_sold_time_sk, >ss_item_sk, >ss_customer_sk, >ss_cdemo_sk, >ss_hdemo_sk, >ss_addr_sk, >ss_store_sk, >ss_promo_sk, >ss_ticket_number, >ss_quantity, >ss_wholesale_cost, >ss_list_price, >ss_sales_price, >ss_ext_discount_amt, >ss_ext_sales_price, >ss_ext_wholesale_cost, >ss_ext_list_price, >ss_ext_tax, >ss_coupon_amt, >ss_net_paid, >ss_net_paid_inc_tax, >ss_net_profit, >ss_sold_date_sk > FROM store_sales distribute by ss_sold_time_sk; > {noformat} > This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of > nulls are there for ss_sold_time_sk, it would tend to have data skew towards > 70429. If the reducer which gets this data gets scheduled much earlier (i.e > in first wave itself), entire job would finish fast. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source
[ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14569937#comment-14569937 ] Siddharth Seth commented on TEZ-2496: - bq. Storing absolute values of partition sizes would cost lots of memory. Instead, patch tries to bucketize the partition sizes into (0,1,10,100,1000 MB buckets) and packs all details into a single BitSet per task. This information is stored in IOStatisticsImpl & aggregated at vertex level (unpacked only when needed). ShuffleVertexManager schedules the tasks based on this information available in VertexImpl. Storing exact sizes per partition will be a large overhead. Rightnow - 6 bits are being used to represent 5 ranges + the presence of the size. There's several bits to look at here - For large jobs (large number of partitions), does this information also become too expensive ? I think the option to disable publishing this information would be useful. [~hitesh] - suggested enabling this dynamically. Probably in a follow up though. Another thing to be considered here is to partially enable it for a DAG. Smaller vertices can publish it - larger vertices don't. - The way the data is aggregated is very lossy. 100 tasks generating <1MB per partition will show up as 1MB per partition. While that's ok to determine the sort order - it can get in the way of other optimizations which may be possible with such statistics. Rather than providing aggregated stats - it may be better to provide task level statistics, and let the consumer aggregate the information as they see fit. - The APIs get interesting as well, especially when adding a flag to enable/disable these stats. Returning 0 is loss of information. May need an API to check whether such stats are being published by the specific Output. Alternately a return code which indicates that stats are not available. Beyond this - are there any semantics on whether all tasks must publish these stats if one task in the vertex is publishing them ? - Depending on how the API is structures - invocations to get these statistics would need to indicate that these stats are disabled. - On the API - what's reported is the exact size, but what's available is a range. The reporting / get API could be an ENUM indicating the range / absence of stats to be consistent. On the patch itself. - In JavaDocs - replace destination with 'physical output' - Handling of values when not set - this currently returns 0. - Figuring out the bucket (getBucket) would be easier with an Enumeration. maybe a helper to convert a size to an enum. - Use 4 bits instead of 6. I'm guessing this is 6 to optimize for the merge operation - ShuffleVertexManager: PendingTaskInfo.stats - rename to what the parameter represents. - Does configureTargetMapping(); need to be invoked inside the conditional which updates parallelism ? - Sorter stats reporting: If I'm reading this correctly - stats are reported for each spill ? Shouldn't this be once at the end only. Especially since there's no increment API for the stats being reported. Alternately, report combined stats for all spills up to the last report time. Haven't looked at the ShuffleVertexManager changes in detail. Will take a look based on the discussion. > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source > -- > > Key: TEZ-2496 > URL: https://issues.apache.org/jira/browse/TEZ-2496 > Project: Apache Tez > Issue Type: Improvement >Reporter: Rajesh Balamohan >Assignee: Rajesh Balamohan > Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch > > > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source. This would be helpful in scenarios, where there is > limited resources (or concurrent jobs running or multiple waves) with > dataskew and the task which gets large amount of data gets sceheduled much > later. > e.g Consider the following hive query running in a queue with limited > capacity (42 slots in total) @ 200 GB scale > {noformat} > CREATE TEMPORARY TABLE sampleData AS > SELECT CASE >WHEN ss_sold_time_sk IS NULL THEN 70429 >ELSE ss_sold_time_sk >END AS ss_sold_time_sk, >ss_item_sk, >ss_customer_sk, >ss_cdemo_sk, >ss_hdemo_sk, >ss_addr_sk, >ss_store_sk, >ss_promo_sk, >ss_ticket_number, >ss_quantity, >ss_wholesale_cost, >ss_list_price, >ss_sales_price, >ss_ext_discount_amt, >ss_ext_sales_price, >ss_ext_wholesale_cost, >ss_ext_list_price, >ss_ext_tax, >ss_coupon_amt, >ss_net_paid, >ss_net_paid_inc_tax, >
[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source
[ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14568193#comment-14568193 ] Rajesh Balamohan commented on TEZ-2496: --- [~sseth], [~bikassaha] - Can you please review when you find time? > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source > -- > > Key: TEZ-2496 > URL: https://issues.apache.org/jira/browse/TEZ-2496 > Project: Apache Tez > Issue Type: Improvement >Reporter: Rajesh Balamohan >Assignee: Rajesh Balamohan > Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch > > > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source. This would be helpful in scenarios, where there is > limited resources (or concurrent jobs running or multiple waves) with > dataskew and the task which gets large amount of data gets sceheduled much > later. > e.g Consider the following hive query running in a queue with limited > capacity (42 slots in total) @ 200 GB scale > {noformat} > CREATE TEMPORARY TABLE sampleData AS > SELECT CASE >WHEN ss_sold_time_sk IS NULL THEN 70429 >ELSE ss_sold_time_sk >END AS ss_sold_time_sk, >ss_item_sk, >ss_customer_sk, >ss_cdemo_sk, >ss_hdemo_sk, >ss_addr_sk, >ss_store_sk, >ss_promo_sk, >ss_ticket_number, >ss_quantity, >ss_wholesale_cost, >ss_list_price, >ss_sales_price, >ss_ext_discount_amt, >ss_ext_sales_price, >ss_ext_wholesale_cost, >ss_ext_list_price, >ss_ext_tax, >ss_coupon_amt, >ss_net_paid, >ss_net_paid_inc_tax, >ss_net_profit, >ss_sold_date_sk > FROM store_sales distribute by ss_sold_time_sk; > {noformat} > This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of > nulls are there for ss_sold_time_sk, it would tend to have data skew towards > 70429. If the reducer which gets this data gets scheduled much earlier (i.e > in first wave itself), entire job would finish fast. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source
[ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14565558#comment-14565558 ] TezQA commented on TEZ-2496: {color:green}+1 overall{color}. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12736054/TEZ-2496.3.patch against master revision 87f26a2. {color:green}+1 @author{color}. The patch does not contain any @author tags. {color:green}+1 tests included{color}. The patch appears to include 4 new or modified test files. {color:green}+1 javac{color}. The applied patch does not increase the total number of javac compiler warnings. {color:green}+1 javadoc{color}. There were no new javadoc warning messages. {color:green}+1 findbugs{color}. The patch does not introduce any new Findbugs (version 3.0.1) warnings. {color:green}+1 release audit{color}. The applied patch does not increase the total number of release audit warnings. {color:green}+1 core tests{color}. The patch passed unit tests in . Test results: https://builds.apache.org/job/PreCommit-TEZ-Build/768//testReport/ Console output: https://builds.apache.org/job/PreCommit-TEZ-Build/768//console This message is automatically generated. > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source > -- > > Key: TEZ-2496 > URL: https://issues.apache.org/jira/browse/TEZ-2496 > Project: Apache Tez > Issue Type: Improvement >Reporter: Rajesh Balamohan >Assignee: Rajesh Balamohan > Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch > > > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source. This would be helpful in scenarios, where there is > limited resources (or concurrent jobs running or multiple waves) with > dataskew and the task which gets large amount of data gets sceheduled much > later. > e.g Consider the following hive query running in a queue with limited > capacity (42 slots in total) @ 200 GB scale > {noformat} > CREATE TEMPORARY TABLE sampleData AS > SELECT CASE >WHEN ss_sold_time_sk IS NULL THEN 70429 >ELSE ss_sold_time_sk >END AS ss_sold_time_sk, >ss_item_sk, >ss_customer_sk, >ss_cdemo_sk, >ss_hdemo_sk, >ss_addr_sk, >ss_store_sk, >ss_promo_sk, >ss_ticket_number, >ss_quantity, >ss_wholesale_cost, >ss_list_price, >ss_sales_price, >ss_ext_discount_amt, >ss_ext_sales_price, >ss_ext_wholesale_cost, >ss_ext_list_price, >ss_ext_tax, >ss_coupon_amt, >ss_net_paid, >ss_net_paid_inc_tax, >ss_net_profit, >ss_sold_date_sk > FROM store_sales distribute by ss_sold_time_sk; > {noformat} > This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of > nulls are there for ss_sold_time_sk, it would tend to have data skew towards > 70429. If the reducer which gets this data gets scheduled much earlier (i.e > in first wave itself), entire job would finish fast. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source
[ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14564312#comment-14564312 ] TezQA commented on TEZ-2496: {color:green}+1 overall{color}. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12736054/TEZ-2496.3.patch against master revision 42b7756. {color:green}+1 @author{color}. The patch does not contain any @author tags. {color:green}+1 tests included{color}. The patch appears to include 4 new or modified test files. {color:green}+1 javac{color}. The applied patch does not increase the total number of javac compiler warnings. {color:green}+1 javadoc{color}. There were no new javadoc warning messages. {color:green}+1 findbugs{color}. The patch does not introduce any new Findbugs (version ) warnings. {color:green}+1 release audit{color}. The applied patch does not increase the total number of release audit warnings. {color:green}+1 core tests{color}. The patch passed unit tests in . Test results: https://builds.apache.org/job/PreCommit-TEZ-Build/763//testReport/ Console output: https://builds.apache.org/job/PreCommit-TEZ-Build/763//console This message is automatically generated. > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source > -- > > Key: TEZ-2496 > URL: https://issues.apache.org/jira/browse/TEZ-2496 > Project: Apache Tez > Issue Type: Improvement >Reporter: Rajesh Balamohan >Assignee: Rajesh Balamohan > Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch > > > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source. This would be helpful in scenarios, where there is > limited resources (or concurrent jobs running or multiple waves) with > dataskew and the task which gets large amount of data gets sceheduled much > later. > e.g Consider the following hive query running in a queue with limited > capacity (42 slots in total) @ 200 GB scale > {noformat} > CREATE TEMPORARY TABLE sampleData AS > SELECT CASE >WHEN ss_sold_time_sk IS NULL THEN 70429 >ELSE ss_sold_time_sk >END AS ss_sold_time_sk, >ss_item_sk, >ss_customer_sk, >ss_cdemo_sk, >ss_hdemo_sk, >ss_addr_sk, >ss_store_sk, >ss_promo_sk, >ss_ticket_number, >ss_quantity, >ss_wholesale_cost, >ss_list_price, >ss_sales_price, >ss_ext_discount_amt, >ss_ext_sales_price, >ss_ext_wholesale_cost, >ss_ext_list_price, >ss_ext_tax, >ss_coupon_amt, >ss_net_paid, >ss_net_paid_inc_tax, >ss_net_profit, >ss_sold_date_sk > FROM store_sales distribute by ss_sold_time_sk; > {noformat} > This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of > nulls are there for ss_sold_time_sk, it would tend to have data skew towards > 70429. If the reducer which gets this data gets scheduled much earlier (i.e > in first wave itself), entire job would finish fast. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source
[ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14564233#comment-14564233 ] TezQA commented on TEZ-2496: {color:red}-1 overall{color}. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12736050/TEZ-2496.2.patch against master revision 42b7756. {color:green}+1 @author{color}. The patch does not contain any @author tags. {color:green}+1 tests included{color}. The patch appears to include 2 new or modified test files. {color:green}+1 javac{color}. The applied patch does not increase the total number of javac compiler warnings. {color:green}+1 javadoc{color}. There were no new javadoc warning messages. {color:green}+1 findbugs{color}. The patch does not introduce any new Findbugs (version ) warnings. {color:green}+1 release audit{color}. The applied patch does not increase the total number of release audit warnings. {color:red}-1 core tests{color}. The patch failed these unit tests in : org.apache.tez.runtime.library.common.sort.impl.dflt.TestDefaultSorter org.apache.tez.runtime.library.common.sort.impl.TestPipelinedSorter Test results: https://builds.apache.org/job/PreCommit-TEZ-Build/762//testReport/ Console output: https://builds.apache.org/job/PreCommit-TEZ-Build/762//console This message is automatically generated. > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source > -- > > Key: TEZ-2496 > URL: https://issues.apache.org/jira/browse/TEZ-2496 > Project: Apache Tez > Issue Type: Improvement >Reporter: Rajesh Balamohan >Assignee: Rajesh Balamohan > Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch > > > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source. This would be helpful in scenarios, where there is > limited resources (or concurrent jobs running or multiple waves) with > dataskew and the task which gets large amount of data gets sceheduled much > later. > e.g Consider the following hive query running in a queue with limited > capacity (42 slots in total) @ 200 GB scale > {noformat} > CREATE TEMPORARY TABLE sampleData AS > SELECT CASE >WHEN ss_sold_time_sk IS NULL THEN 70429 >ELSE ss_sold_time_sk >END AS ss_sold_time_sk, >ss_item_sk, >ss_customer_sk, >ss_cdemo_sk, >ss_hdemo_sk, >ss_addr_sk, >ss_store_sk, >ss_promo_sk, >ss_ticket_number, >ss_quantity, >ss_wholesale_cost, >ss_list_price, >ss_sales_price, >ss_ext_discount_amt, >ss_ext_sales_price, >ss_ext_wholesale_cost, >ss_ext_list_price, >ss_ext_tax, >ss_coupon_amt, >ss_net_paid, >ss_net_paid_inc_tax, >ss_net_profit, >ss_sold_date_sk > FROM store_sales distribute by ss_sold_time_sk; > {noformat} > This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of > nulls are there for ss_sold_time_sk, it would tend to have data skew towards > 70429. If the reducer which gets this data gets scheduled much earlier (i.e > in first wave itself), entire job would finish fast. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source
[ https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14563701#comment-14563701 ] Siddharth Seth commented on TEZ-2496: - +1 for providing per partition information. This can also be used to re-distribute partitions to tasks so that all of them get the same amount of data. Providing this information to consumer tasks is also helpful - in terms of sizing buffers, and potentially prioritizing fetches. > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source > -- > > Key: TEZ-2496 > URL: https://issues.apache.org/jira/browse/TEZ-2496 > Project: Apache Tez > Issue Type: Improvement >Reporter: Rajesh Balamohan > Attachments: TEZ-2496.1.patch > > > Consider scheduling tasks in ShuffleVertexManager based on the partition > sizes from the source. This would be helpful in scenarios, where there is > limited resources (or concurrent jobs running or multiple waves) with > dataskew and the task which gets large amount of data gets sceheduled much > later. > e.g Consider the following hive query running in a queue with limited > capacity (42 slots in total) @ 200 GB scale > {noformat} > CREATE TEMPORARY TABLE sampleData AS > SELECT CASE >WHEN ss_sold_time_sk IS NULL THEN 70429 >ELSE ss_sold_time_sk >END AS ss_sold_time_sk, >ss_item_sk, >ss_customer_sk, >ss_cdemo_sk, >ss_hdemo_sk, >ss_addr_sk, >ss_store_sk, >ss_promo_sk, >ss_ticket_number, >ss_quantity, >ss_wholesale_cost, >ss_list_price, >ss_sales_price, >ss_ext_discount_amt, >ss_ext_sales_price, >ss_ext_wholesale_cost, >ss_ext_list_price, >ss_ext_tax, >ss_coupon_amt, >ss_net_paid, >ss_net_paid_inc_tax, >ss_net_profit, >ss_sold_date_sk > FROM store_sales distribute by ss_sold_time_sk; > {noformat} > This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of > nulls are there for ss_sold_time_sk, it would tend to have data skew towards > 70429. If the reducer which gets this data gets scheduled much earlier (i.e > in first wave itself), entire job would finish fast. -- This message was sent by Atlassian JIRA (v6.3.4#6332)