[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source

2015-12-13 Thread Rajesh Balamohan (JIRA)

[ 
https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15055295#comment-15055295
 ] 

Rajesh Balamohan commented on TEZ-2496:
---

Committed to branch-0.7
>>
commit 62e4b6ea1cd4021791ae3b133e8e80c9a82456d6
>>

> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source
> --
>
> Key: TEZ-2496
> URL: https://issues.apache.org/jira/browse/TEZ-2496
> Project: Apache Tez
>  Issue Type: Improvement
>Reporter: Rajesh Balamohan
>Assignee: Rajesh Balamohan
> Fix For: 0.8.0-alpha
>
> Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, 
> TEZ-2496.4.patch, TEZ-2496.5.patch, TEZ-2496.6.patch, TEZ-2496.7.patch, 
> TEZ-2496.8.patch, TEZ-2496.8.patch, TEZ-2496.9.addendum.patch, 
> TEZ-2496.9.branch-0.7.patch, TEZ-2496.9.patch
>
>
> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source.  This would be helpful in scenarios, where there is 
> limited resources (or concurrent jobs running or multiple waves) with 
> dataskew and the task which gets large amount of data gets sceheduled much 
> later.
> e.g Consider the following hive query running in a queue with limited 
> capacity (42 slots in total) @ 200 GB scale
> {noformat}
> CREATE TEMPORARY TABLE sampleData AS
>   SELECT CASE
>WHEN ss_sold_time_sk IS NULL THEN 70429
>ELSE ss_sold_time_sk
>END AS ss_sold_time_sk,
>ss_item_sk,
>ss_customer_sk,
>ss_cdemo_sk,
>ss_hdemo_sk,
>ss_addr_sk,
>ss_store_sk,
>ss_promo_sk,
>ss_ticket_number,
>ss_quantity,
>ss_wholesale_cost,
>ss_list_price,
>ss_sales_price,
>ss_ext_discount_amt,
>ss_ext_sales_price,
>ss_ext_wholesale_cost,
>ss_ext_list_price,
>ss_ext_tax,
>ss_coupon_amt,
>ss_net_paid,
>ss_net_paid_inc_tax,
>ss_net_profit,
>ss_sold_date_sk
>   FROM store_sales distribute by ss_sold_time_sk;
> {noformat}
> This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of 
> nulls are there for ss_sold_time_sk, it would tend to have data skew towards 
> 70429.  If the reducer which gets this data gets scheduled much earlier (i.e 
> in first wave itself), entire job would finish fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source

2015-07-20 Thread TezQA (JIRA)

[ 
https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14633726#comment-14633726
 ] 

TezQA commented on TEZ-2496:


{color:green}+1 overall{color}.  Here are the results of testing the latest 
attachment
  http://issues.apache.org/jira/secure/attachment/12746117/TEZ-2496.9.patch
  against master revision be5b191.

{color:green}+1 @author{color}.  The patch does not contain any @author 
tags.

{color:green}+1 tests included{color}.  The patch appears to include 6 new 
or modified test files.

{color:green}+1 javac{color}.  The applied patch does not increase the 
total number of javac compiler warnings.

{color:green}+1 javadoc{color}.  There were no new javadoc warning messages.

{color:green}+1 findbugs{color}.  The patch does not introduce any new 
Findbugs (version 3.0.1) warnings.

{color:green}+1 release audit{color}.  The applied patch does not increase 
the total number of release audit warnings.

{color:green}+1 core tests{color}.  The patch passed unit tests in .

Test results: https://builds.apache.org/job/PreCommit-TEZ-Build/906//testReport/
Console output: https://builds.apache.org/job/PreCommit-TEZ-Build/906//console

This message is automatically generated.

> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source
> --
>
> Key: TEZ-2496
> URL: https://issues.apache.org/jira/browse/TEZ-2496
> Project: Apache Tez
>  Issue Type: Improvement
>Reporter: Rajesh Balamohan
>Assignee: Rajesh Balamohan
> Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, 
> TEZ-2496.4.patch, TEZ-2496.5.patch, TEZ-2496.6.patch, TEZ-2496.7.patch, 
> TEZ-2496.8.patch, TEZ-2496.8.patch, TEZ-2496.9.patch
>
>
> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source.  This would be helpful in scenarios, where there is 
> limited resources (or concurrent jobs running or multiple waves) with 
> dataskew and the task which gets large amount of data gets sceheduled much 
> later.
> e.g Consider the following hive query running in a queue with limited 
> capacity (42 slots in total) @ 200 GB scale
> {noformat}
> CREATE TEMPORARY TABLE sampleData AS
>   SELECT CASE
>WHEN ss_sold_time_sk IS NULL THEN 70429
>ELSE ss_sold_time_sk
>END AS ss_sold_time_sk,
>ss_item_sk,
>ss_customer_sk,
>ss_cdemo_sk,
>ss_hdemo_sk,
>ss_addr_sk,
>ss_store_sk,
>ss_promo_sk,
>ss_ticket_number,
>ss_quantity,
>ss_wholesale_cost,
>ss_list_price,
>ss_sales_price,
>ss_ext_discount_amt,
>ss_ext_sales_price,
>ss_ext_wholesale_cost,
>ss_ext_list_price,
>ss_ext_tax,
>ss_coupon_amt,
>ss_net_paid,
>ss_net_paid_inc_tax,
>ss_net_profit,
>ss_sold_date_sk
>   FROM store_sales distribute by ss_sold_time_sk;
> {noformat}
> This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of 
> nulls are there for ss_sold_time_sk, it would tend to have data skew towards 
> 70429.  If the reducer which gets this data gets scheduled much earlier (i.e 
> in first wave itself), entire job would finish fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source

2015-07-08 Thread Rajesh Balamohan (JIRA)

[ 
https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14619670#comment-14619670
 ] 

Rajesh Balamohan commented on TEZ-2496:
---

Thanks [~bikassaha]. Will implement both changes and run the experiment before 
committing (did that for .7 version of the patch which has initial code base 
for this approach). Have to wait for couple of more days, as the cluster I 
experiment with is down for maintenance. 

> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source
> --
>
> Key: TEZ-2496
> URL: https://issues.apache.org/jira/browse/TEZ-2496
> Project: Apache Tez
>  Issue Type: Improvement
>Reporter: Rajesh Balamohan
>Assignee: Rajesh Balamohan
> Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, 
> TEZ-2496.4.patch, TEZ-2496.5.patch, TEZ-2496.6.patch, TEZ-2496.7.patch, 
> TEZ-2496.8.patch, TEZ-2496.8.patch
>
>
> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source.  This would be helpful in scenarios, where there is 
> limited resources (or concurrent jobs running or multiple waves) with 
> dataskew and the task which gets large amount of data gets sceheduled much 
> later.
> e.g Consider the following hive query running in a queue with limited 
> capacity (42 slots in total) @ 200 GB scale
> {noformat}
> CREATE TEMPORARY TABLE sampleData AS
>   SELECT CASE
>WHEN ss_sold_time_sk IS NULL THEN 70429
>ELSE ss_sold_time_sk
>END AS ss_sold_time_sk,
>ss_item_sk,
>ss_customer_sk,
>ss_cdemo_sk,
>ss_hdemo_sk,
>ss_addr_sk,
>ss_store_sk,
>ss_promo_sk,
>ss_ticket_number,
>ss_quantity,
>ss_wholesale_cost,
>ss_list_price,
>ss_sales_price,
>ss_ext_discount_amt,
>ss_ext_sales_price,
>ss_ext_wholesale_cost,
>ss_ext_list_price,
>ss_ext_tax,
>ss_coupon_amt,
>ss_net_paid,
>ss_net_paid_inc_tax,
>ss_net_profit,
>ss_sold_date_sk
>   FROM store_sales distribute by ss_sold_time_sk;
> {noformat}
> This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of 
> nulls are there for ss_sold_time_sk, it would tend to have data skew towards 
> 70429.  If the reducer which gets this data gets scheduled much earlier (i.e 
> in first wave itself), entire job would finish fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source

2015-07-08 Thread Bikas Saha (JIRA)

[ 
https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14619659#comment-14619659
 ] 

Bikas Saha commented on TEZ-2496:
-

bq. Used just for short circuiting. In case it returns false, sorting of tasks 
can be avoided
Can that be done by simply tracking if we have received a new vertex manager 
event? Then executing that rest of that method can be avoided.
In any case, statsAvailable probably needs a rename to statsUpdated
{code}
+boolean statsAvailable = computePartitionSizes();{code}

I would change the order to 2, 0, 1 just to be sure that we are not getting 
fooled by direct or reverse iteration.
{code}+//Tasks should be scheduled in task 2, 1, 0 order
+long[] sizes = new long[]{(0l), (100 * 1000l * 1000l), (5000 * 1000l * 
1000l)};{code}

lgtm. +1. Please double check with your previous real life experiment before 
committing.


> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source
> --
>
> Key: TEZ-2496
> URL: https://issues.apache.org/jira/browse/TEZ-2496
> Project: Apache Tez
>  Issue Type: Improvement
>Reporter: Rajesh Balamohan
>Assignee: Rajesh Balamohan
> Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, 
> TEZ-2496.4.patch, TEZ-2496.5.patch, TEZ-2496.6.patch, TEZ-2496.7.patch, 
> TEZ-2496.8.patch, TEZ-2496.8.patch
>
>
> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source.  This would be helpful in scenarios, where there is 
> limited resources (or concurrent jobs running or multiple waves) with 
> dataskew and the task which gets large amount of data gets sceheduled much 
> later.
> e.g Consider the following hive query running in a queue with limited 
> capacity (42 slots in total) @ 200 GB scale
> {noformat}
> CREATE TEMPORARY TABLE sampleData AS
>   SELECT CASE
>WHEN ss_sold_time_sk IS NULL THEN 70429
>ELSE ss_sold_time_sk
>END AS ss_sold_time_sk,
>ss_item_sk,
>ss_customer_sk,
>ss_cdemo_sk,
>ss_hdemo_sk,
>ss_addr_sk,
>ss_store_sk,
>ss_promo_sk,
>ss_ticket_number,
>ss_quantity,
>ss_wholesale_cost,
>ss_list_price,
>ss_sales_price,
>ss_ext_discount_amt,
>ss_ext_sales_price,
>ss_ext_wholesale_cost,
>ss_ext_list_price,
>ss_ext_tax,
>ss_coupon_amt,
>ss_net_paid,
>ss_net_paid_inc_tax,
>ss_net_profit,
>ss_sold_date_sk
>   FROM store_sales distribute by ss_sold_time_sk;
> {noformat}
> This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of 
> nulls are there for ss_sold_time_sk, it would tend to have data skew towards 
> 70429.  If the reducer which gets this data gets scheduled much earlier (i.e 
> in first wave itself), entire job would finish fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source

2015-07-08 Thread TezQA (JIRA)

[ 
https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14618460#comment-14618460
 ] 

TezQA commented on TEZ-2496:


{color:green}+1 overall{color}.  Here are the results of testing the latest 
attachment
  http://issues.apache.org/jira/secure/attachment/12744190/TEZ-2496.8.patch
  against master revision cb59851.

{color:green}+1 @author{color}.  The patch does not contain any @author 
tags.

{color:green}+1 tests included{color}.  The patch appears to include 6 new 
or modified test files.

{color:green}+1 javac{color}.  The applied patch does not increase the 
total number of javac compiler warnings.

{color:green}+1 javadoc{color}.  There were no new javadoc warning messages.

{color:green}+1 findbugs{color}.  The patch does not introduce any new 
Findbugs (version 3.0.1) warnings.

{color:green}+1 release audit{color}.  The applied patch does not increase 
the total number of release audit warnings.

{color:green}+1 core tests{color}.  The patch passed unit tests in .

Test results: https://builds.apache.org/job/PreCommit-TEZ-Build/890//testReport/
Console output: https://builds.apache.org/job/PreCommit-TEZ-Build/890//console

This message is automatically generated.

> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source
> --
>
> Key: TEZ-2496
> URL: https://issues.apache.org/jira/browse/TEZ-2496
> Project: Apache Tez
>  Issue Type: Improvement
>Reporter: Rajesh Balamohan
>Assignee: Rajesh Balamohan
> Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, 
> TEZ-2496.4.patch, TEZ-2496.5.patch, TEZ-2496.6.patch, TEZ-2496.7.patch, 
> TEZ-2496.8.patch, TEZ-2496.8.patch
>
>
> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source.  This would be helpful in scenarios, where there is 
> limited resources (or concurrent jobs running or multiple waves) with 
> dataskew and the task which gets large amount of data gets sceheduled much 
> later.
> e.g Consider the following hive query running in a queue with limited 
> capacity (42 slots in total) @ 200 GB scale
> {noformat}
> CREATE TEMPORARY TABLE sampleData AS
>   SELECT CASE
>WHEN ss_sold_time_sk IS NULL THEN 70429
>ELSE ss_sold_time_sk
>END AS ss_sold_time_sk,
>ss_item_sk,
>ss_customer_sk,
>ss_cdemo_sk,
>ss_hdemo_sk,
>ss_addr_sk,
>ss_store_sk,
>ss_promo_sk,
>ss_ticket_number,
>ss_quantity,
>ss_wholesale_cost,
>ss_list_price,
>ss_sales_price,
>ss_ext_discount_amt,
>ss_ext_sales_price,
>ss_ext_wholesale_cost,
>ss_ext_list_price,
>ss_ext_tax,
>ss_coupon_amt,
>ss_net_paid,
>ss_net_paid_inc_tax,
>ss_net_profit,
>ss_sold_date_sk
>   FROM store_sales distribute by ss_sold_time_sk;
> {noformat}
> This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of 
> nulls are there for ss_sold_time_sk, it would tend to have data skew towards 
> 70429.  If the reducer which gets this data gets scheduled much earlier (i.e 
> in first wave itself), entire job would finish fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source

2015-07-07 Thread Bikas Saha (JIRA)

[ 
https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14617708#comment-14617708
 ] 

Bikas Saha commented on TEZ-2496:
-

Looks good overall.

This does not have to be in the API package.
{code}diff --git 
tez-api/src/main/java/org/apache/tez/runtime/api/DATA_RANGE_IN_MB.java 
tez-api/src/main/java/org/apache/tez/runtime/api/DATA_RANGE_IN_MB.java
{code}

TEN? Also, looks like the constructor and member var are dead code?
{code}+public enum DATA_RANGE_IN_MB {
+  THOUSAND(1000), HUNDRED(100), TEZ(10), ONE(1), ZERO(0);{code}

Do we really need to do math.ceil()? There is probably a bit manipulation 
method to do this cheaper.
{code}+  public static final DATA_RANGE_IN_MB getRange(long sizeInBytes) {
+int sizeInMB = (int) Math.ceil(sizeInBytes / (1024 * 1024 * 1.0));{code}

Does runtime-internals need roaring bitmaps?
{code}diff --git tez-runtime-internals/pom.xml tez-runtime-internals/pom.xml
... +  org.roaringbitmap
+  RoaringBitmap
{code}

Unnecessary diff
{code}diff --git 
tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
 
tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
{code}

Why do the +1 here instead of in getBucket()? Spreading the bucket logic in 3 
places - here + getBucket() + DATA_RANGE_MB is error prone. Perhaps replace all 
3 with getBucket()?
{code}+for (int i = 0; i < sizes.length; i++) {
+  int bucket = getBucket(sizes[i]) + 1;
{code}

No point having 2 vars that can be tracked as one? reportPartitionStats === 
reportPartitionStats() { return partitions != null}, right?
{code}+  protected OutputStatisticsReporter statsReporter;
+  protected final long[] partitionStats;{code}

Still needed?
{code}
 VertexManagerPluginContext mockContext = 
mock(VertexManagerPluginContext.class);
+
when(mockContext.getVertexStatistics(any(String.class))).thenReturn(mock(VertexStatistics.class));{code}

Are there existing OrderedPartitionedOutput/PipeLinedSorter/ExternalSorter 
tests that can be enhanced to verify that partition stats are being recorded? 

Assuming the ShuffleVertexManager code is the same as when I looked at it the 
last time. 

Not sure why the second if part of each of the if checks is useful? Any issues 
in simply over-writing the new value?
{code}+if ((totalStats > 0) && (taskInfo.outputStats != totalStats)) {
+  computedPartitionSizes = true;
+  taskInfo.outputStats = totalStats;
+}
+  } else {
+if ((stats[index] > 0) && (stats[index] != taskInfo.outputStats)) {
+  computedPartitionSizes = true;
+  taskInfo.outputStats = stats[index];{code}
If I understand this right, the code is trying to not sort based on this check. 
But could this be done simply by whether we have received a new stats update 
event? And move the code from 
computePartitionSizes()+sortPendingTasksBasedOnDataSize into 
parsePartitionStats()? Nothing should change unless we have received new stats, 
right? So all stats dependent updates can be made when we receive new stats.

Spurious change?
{code}-  @Test(timeout = 5000)
+  @Test(timeout = 500)
   public void testShuffleVertexManagerAutoParallelism() throws Exception 
{{code}

Why did this change?
{code}
 Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
-Assert.assertTrue(scheduledTasks.size() == 3);
+Assert.assertTrue(scheduledTasks.size() == 1);{code}

Can the shuffle vertex manager bucket calculation test be further enhanced to 
verify that the first task to be scheduled is the largest task? that is the 
final intent of the jira right :)

In a separate jira we need to track the bug that the vertex manager event is 
not resilient to task retries because it does not provide that info. So the 
same task rerun would cause double counting. Its an existing bug not introduced 
in this patch.

> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source
> --
>
> Key: TEZ-2496
> URL: https://issues.apache.org/jira/browse/TEZ-2496
> Project: Apache Tez
>  Issue Type: Improvement
>Reporter: Rajesh Balamohan
>Assignee: Rajesh Balamohan
> Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, 
> TEZ-2496.4.patch, TEZ-2496.5.patch, TEZ-2496.6.patch, TEZ-2496.7.patch, 
> TEZ-2496.8.patch
>
>
> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source.  This would be helpful in scenarios, where there is 
> limited resources (or concurrent jobs running or multiple waves) with 
> dataskew and the task which gets large amount of data gets sceheduled much 
> later.
> e.g Consider the follo

[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source

2015-07-07 Thread Bikas Saha (JIRA)

[ 
https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14617362#comment-14617362
 ] 

Bikas Saha commented on TEZ-2496:
-

Summarizing an offline discussion on the pros and cons on this approach
1) lives in user land - easier to iterate
2) memory efficient - shuffle vertex manager can apply policy specific to 
partition stats use case. deterministic sizes mean it can aggregate upon event 
receipt and discard the raw data
3) cpu efficient - because its not calling getStatistics() repeatedly
4) works with pipelining and getting early stats from running tasks. 
getStatistics() would get expensive for this.
5) allows for other use cases like sending partition stats to inputs for 
runtime optimizations. Only the shuffle vertex manager can correctly do this 
since it merges partitions during auto reduce.
6) Once this has stabilized and optimized then we can transfer the logic to a 
partition stats API that would be generally available as part of the system.

> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source
> --
>
> Key: TEZ-2496
> URL: https://issues.apache.org/jira/browse/TEZ-2496
> Project: Apache Tez
>  Issue Type: Improvement
>Reporter: Rajesh Balamohan
>Assignee: Rajesh Balamohan
> Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, 
> TEZ-2496.4.patch, TEZ-2496.5.patch, TEZ-2496.6.patch, TEZ-2496.7.patch, 
> TEZ-2496.8.patch
>
>
> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source.  This would be helpful in scenarios, where there is 
> limited resources (or concurrent jobs running or multiple waves) with 
> dataskew and the task which gets large amount of data gets sceheduled much 
> later.
> e.g Consider the following hive query running in a queue with limited 
> capacity (42 slots in total) @ 200 GB scale
> {noformat}
> CREATE TEMPORARY TABLE sampleData AS
>   SELECT CASE
>WHEN ss_sold_time_sk IS NULL THEN 70429
>ELSE ss_sold_time_sk
>END AS ss_sold_time_sk,
>ss_item_sk,
>ss_customer_sk,
>ss_cdemo_sk,
>ss_hdemo_sk,
>ss_addr_sk,
>ss_store_sk,
>ss_promo_sk,
>ss_ticket_number,
>ss_quantity,
>ss_wholesale_cost,
>ss_list_price,
>ss_sales_price,
>ss_ext_discount_amt,
>ss_ext_sales_price,
>ss_ext_wholesale_cost,
>ss_ext_list_price,
>ss_ext_tax,
>ss_coupon_amt,
>ss_net_paid,
>ss_net_paid_inc_tax,
>ss_net_profit,
>ss_sold_date_sk
>   FROM store_sales distribute by ss_sold_time_sk;
> {noformat}
> This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of 
> nulls are there for ss_sold_time_sk, it would tend to have data skew towards 
> 70429.  If the reducer which gets this data gets scheduled much earlier (i.e 
> in first wave itself), entire job would finish fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source

2015-07-07 Thread TezQA (JIRA)

[ 
https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14616879#comment-14616879
 ] 

TezQA commented on TEZ-2496:


{color:red}-1 overall{color}.  Here are the results of testing the latest 
attachment
  http://issues.apache.org/jira/secure/attachment/12743967/TEZ-2496.8.patch
  against master revision cb59851.

{color:green}+1 @author{color}.  The patch does not contain any @author 
tags.

{color:green}+1 tests included{color}.  The patch appears to include 5 new 
or modified test files.

{color:green}+1 javac{color}.  The applied patch does not increase the 
total number of javac compiler warnings.

{color:green}+1 javadoc{color}.  There were no new javadoc warning messages.

{color:red}-1 findbugs{color}.  The patch appears to introduce 1 new 
Findbugs (version 3.0.1) warnings.

{color:green}+1 release audit{color}.  The applied patch does not increase 
the total number of release audit warnings.

{color:green}+1 core tests{color}.  The patch passed unit tests in .

Test results: https://builds.apache.org/job/PreCommit-TEZ-Build/888//testReport/
Findbugs warnings: 
https://builds.apache.org/job/PreCommit-TEZ-Build/888//artifact/patchprocess/newPatchFindbugsWarningstez-runtime-library.html
Console output: https://builds.apache.org/job/PreCommit-TEZ-Build/888//console

This message is automatically generated.

> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source
> --
>
> Key: TEZ-2496
> URL: https://issues.apache.org/jira/browse/TEZ-2496
> Project: Apache Tez
>  Issue Type: Improvement
>Reporter: Rajesh Balamohan
>Assignee: Rajesh Balamohan
> Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, 
> TEZ-2496.4.patch, TEZ-2496.5.patch, TEZ-2496.6.patch, TEZ-2496.7.patch, 
> TEZ-2496.8.patch
>
>
> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source.  This would be helpful in scenarios, where there is 
> limited resources (or concurrent jobs running or multiple waves) with 
> dataskew and the task which gets large amount of data gets sceheduled much 
> later.
> e.g Consider the following hive query running in a queue with limited 
> capacity (42 slots in total) @ 200 GB scale
> {noformat}
> CREATE TEMPORARY TABLE sampleData AS
>   SELECT CASE
>WHEN ss_sold_time_sk IS NULL THEN 70429
>ELSE ss_sold_time_sk
>END AS ss_sold_time_sk,
>ss_item_sk,
>ss_customer_sk,
>ss_cdemo_sk,
>ss_hdemo_sk,
>ss_addr_sk,
>ss_store_sk,
>ss_promo_sk,
>ss_ticket_number,
>ss_quantity,
>ss_wholesale_cost,
>ss_list_price,
>ss_sales_price,
>ss_ext_discount_amt,
>ss_ext_sales_price,
>ss_ext_wholesale_cost,
>ss_ext_list_price,
>ss_ext_tax,
>ss_coupon_amt,
>ss_net_paid,
>ss_net_paid_inc_tax,
>ss_net_profit,
>ss_sold_date_sk
>   FROM store_sales distribute by ss_sold_time_sk;
> {noformat}
> This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of 
> nulls are there for ss_sold_time_sk, it would tend to have data skew towards 
> 70429.  If the reducer which gets this data gets scheduled much earlier (i.e 
> in first wave itself), entire job would finish fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source

2015-07-07 Thread TezQA (JIRA)

[ 
https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14616679#comment-14616679
 ] 

TezQA commented on TEZ-2496:


{color:red}-1 overall{color}.  Here are the results of testing the latest 
attachment
  http://issues.apache.org/jira/secure/attachment/12743948/TEZ-2496.7.patch
  against master revision cb59851.

{color:green}+1 @author{color}.  The patch does not contain any @author 
tags.

{color:green}+1 tests included{color}.  The patch appears to include 3 new 
or modified test files.

{color:green}+1 javac{color}.  The applied patch does not increase the 
total number of javac compiler warnings.

{color:green}+1 javadoc{color}.  There were no new javadoc warning messages.

{color:red}-1 findbugs{color}.  The patch appears to introduce 1 new 
Findbugs (version 3.0.1) warnings.

{color:green}+1 release audit{color}.  The applied patch does not increase 
the total number of release audit warnings.

{color:red}-1 core tests{color}.  The patch failed these unit tests in :
   
org.apache.tez.runtime.library.common.sort.impl.TestPipelinedSorter
  
org.apache.tez.runtime.library.common.sort.impl.dflt.TestDefaultSorter

Test results: https://builds.apache.org/job/PreCommit-TEZ-Build/887//testReport/
Findbugs warnings: 
https://builds.apache.org/job/PreCommit-TEZ-Build/887//artifact/patchprocess/newPatchFindbugsWarningstez-runtime-library.html
Console output: https://builds.apache.org/job/PreCommit-TEZ-Build/887//console

This message is automatically generated.

> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source
> --
>
> Key: TEZ-2496
> URL: https://issues.apache.org/jira/browse/TEZ-2496
> Project: Apache Tez
>  Issue Type: Improvement
>Reporter: Rajesh Balamohan
>Assignee: Rajesh Balamohan
> Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, 
> TEZ-2496.4.patch, TEZ-2496.5.patch, TEZ-2496.6.patch, TEZ-2496.7.patch
>
>
> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source.  This would be helpful in scenarios, where there is 
> limited resources (or concurrent jobs running or multiple waves) with 
> dataskew and the task which gets large amount of data gets sceheduled much 
> later.
> e.g Consider the following hive query running in a queue with limited 
> capacity (42 slots in total) @ 200 GB scale
> {noformat}
> CREATE TEMPORARY TABLE sampleData AS
>   SELECT CASE
>WHEN ss_sold_time_sk IS NULL THEN 70429
>ELSE ss_sold_time_sk
>END AS ss_sold_time_sk,
>ss_item_sk,
>ss_customer_sk,
>ss_cdemo_sk,
>ss_hdemo_sk,
>ss_addr_sk,
>ss_store_sk,
>ss_promo_sk,
>ss_ticket_number,
>ss_quantity,
>ss_wholesale_cost,
>ss_list_price,
>ss_sales_price,
>ss_ext_discount_amt,
>ss_ext_sales_price,
>ss_ext_wholesale_cost,
>ss_ext_list_price,
>ss_ext_tax,
>ss_coupon_amt,
>ss_net_paid,
>ss_net_paid_inc_tax,
>ss_net_profit,
>ss_sold_date_sk
>   FROM store_sales distribute by ss_sold_time_sk;
> {noformat}
> This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of 
> nulls are there for ss_sold_time_sk, it would tend to have data skew towards 
> 70429.  If the reducer which gets this data gets scheduled much earlier (i.e 
> in first wave itself), entire job would finish fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source

2015-07-06 Thread Bikas Saha (JIRA)

[ 
https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14615866#comment-14615866
 ] 

Bikas Saha commented on TEZ-2496:
-

Yes. That is what I am suggesting because I think going the full fledged 
partition route is risky in its current form. Not sure if you disagree with 
that risk assessment. The other features mentioned would need new information 
to be passed to the reducer inputs right? That could be done via new events 
sent from shufflevertexmanager to the tasks (eg. input data information event 
being used to send splits today).

> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source
> --
>
> Key: TEZ-2496
> URL: https://issues.apache.org/jira/browse/TEZ-2496
> Project: Apache Tez
>  Issue Type: Improvement
>Reporter: Rajesh Balamohan
>Assignee: Rajesh Balamohan
> Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, 
> TEZ-2496.4.patch, TEZ-2496.5.patch, TEZ-2496.6.patch
>
>
> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source.  This would be helpful in scenarios, where there is 
> limited resources (or concurrent jobs running or multiple waves) with 
> dataskew and the task which gets large amount of data gets sceheduled much 
> later.
> e.g Consider the following hive query running in a queue with limited 
> capacity (42 slots in total) @ 200 GB scale
> {noformat}
> CREATE TEMPORARY TABLE sampleData AS
>   SELECT CASE
>WHEN ss_sold_time_sk IS NULL THEN 70429
>ELSE ss_sold_time_sk
>END AS ss_sold_time_sk,
>ss_item_sk,
>ss_customer_sk,
>ss_cdemo_sk,
>ss_hdemo_sk,
>ss_addr_sk,
>ss_store_sk,
>ss_promo_sk,
>ss_ticket_number,
>ss_quantity,
>ss_wholesale_cost,
>ss_list_price,
>ss_sales_price,
>ss_ext_discount_amt,
>ss_ext_sales_price,
>ss_ext_wholesale_cost,
>ss_ext_list_price,
>ss_ext_tax,
>ss_coupon_amt,
>ss_net_paid,
>ss_net_paid_inc_tax,
>ss_net_profit,
>ss_sold_date_sk
>   FROM store_sales distribute by ss_sold_time_sk;
> {noformat}
> This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of 
> nulls are there for ss_sold_time_sk, it would tend to have data skew towards 
> 70429.  If the reducer which gets this data gets scheduled much earlier (i.e 
> in first wave itself), entire job would finish fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source

2015-07-06 Thread Rajesh Balamohan (JIRA)

[ 
https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14615847#comment-14615847
 ] 

Rajesh Balamohan commented on TEZ-2496:
---

- For 50K x 20K, it would end up with ~760MB when all 50K are in running phase. 
If number of concurrent tasks being executed are in the order of 10K, the 
pressure would reduced as the aggregation happens in vertex level.
- The proposed approach is piggybacking on bundling the partition stats with 
VertexManagerEvent in ShuffleUtils, so that ShuffleVertexManager can receive 
then in onVertexManagerEventReceived().  And the aggregation happens in 
ShuffleVertexManager itself by unpacking the bitmap/roaringbitmap?. That would 
help this usecase to move forward, but might affect other improvements related 
to this jira (like better memory allocation, better thread handling in fetchers 
based on stats).

> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source
> --
>
> Key: TEZ-2496
> URL: https://issues.apache.org/jira/browse/TEZ-2496
> Project: Apache Tez
>  Issue Type: Improvement
>Reporter: Rajesh Balamohan
>Assignee: Rajesh Balamohan
> Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, 
> TEZ-2496.4.patch, TEZ-2496.5.patch, TEZ-2496.6.patch
>
>
> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source.  This would be helpful in scenarios, where there is 
> limited resources (or concurrent jobs running or multiple waves) with 
> dataskew and the task which gets large amount of data gets sceheduled much 
> later.
> e.g Consider the following hive query running in a queue with limited 
> capacity (42 slots in total) @ 200 GB scale
> {noformat}
> CREATE TEMPORARY TABLE sampleData AS
>   SELECT CASE
>WHEN ss_sold_time_sk IS NULL THEN 70429
>ELSE ss_sold_time_sk
>END AS ss_sold_time_sk,
>ss_item_sk,
>ss_customer_sk,
>ss_cdemo_sk,
>ss_hdemo_sk,
>ss_addr_sk,
>ss_store_sk,
>ss_promo_sk,
>ss_ticket_number,
>ss_quantity,
>ss_wholesale_cost,
>ss_list_price,
>ss_sales_price,
>ss_ext_discount_amt,
>ss_ext_sales_price,
>ss_ext_wholesale_cost,
>ss_ext_list_price,
>ss_ext_tax,
>ss_coupon_amt,
>ss_net_paid,
>ss_net_paid_inc_tax,
>ss_net_profit,
>ss_sold_date_sk
>   FROM store_sales distribute by ss_sold_time_sk;
> {noformat}
> This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of 
> nulls are there for ss_sold_time_sk, it would tend to have data skew towards 
> 70429.  If the reducer which gets this data gets scheduled much earlier (i.e 
> in first wave itself), entire job would finish fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source

2015-07-06 Thread Bikas Saha (JIRA)

[ 
https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14615484#comment-14615484
 ] 

Bikas Saha commented on TEZ-2496:
-

IMO, the solution is risky and I am wary of adding it into the API for everyone 
to use because its not clear that we have a viable long term solution. A single 
job with 50Kx20K vertices would end up with 1GB or more in the bitmap. Or 
multiple large vertices running in parallel would hold memory. Secondly, 
clearing the partition stats makes this susceptible to even 1 task failure. 
Clearing some stats but keeping others may lead of inconsistencies down the 
road or affect other stats improvements.
Approximating large histograms is not a new problem and we may be able to find 
some known work (like https://metamarkets.com/2013/histograms/) to create a 
more viable algorithmic solution.
However, we do need to start experimenting with what we can do with stats and 
so making progress is also important. So here is my suggestion.
How about we dont make this part of the API or the internal stats flow for now? 
That keeps the main engine scalable and we dont expose a WIP API. We can send 
the same bitmap information in VertexManagerEvents from the output to the 
ShuffleVertexManager. We already do that to send the total output size per 
task. We can enhance the payload to send the partition info instead. This way 
we can send the necessary info and solve the scenario this jira is trying to 
address. Here are the pros I see with this
1) Uses the existing event mechanism by enhancing its payload. We know this 
works e2e and less scope for bugs.
2) Does not change the internal flow or API's.
3) Is localized to the ShuffleVertexManager (which needs this) and the feature 
can be turned on/off based on config for large jobs if needed, API's cannot be 
turned on/off based on config.

Essentially, this lives in "user" land instead of "tez" land. Once we have 
experimented with this and refined this, we can consider moving this from user 
land into tez land by adding it to the API infra. Thoughts?

> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source
> --
>
> Key: TEZ-2496
> URL: https://issues.apache.org/jira/browse/TEZ-2496
> Project: Apache Tez
>  Issue Type: Improvement
>Reporter: Rajesh Balamohan
>Assignee: Rajesh Balamohan
> Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, 
> TEZ-2496.4.patch, TEZ-2496.5.patch, TEZ-2496.6.patch
>
>
> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source.  This would be helpful in scenarios, where there is 
> limited resources (or concurrent jobs running or multiple waves) with 
> dataskew and the task which gets large amount of data gets sceheduled much 
> later.
> e.g Consider the following hive query running in a queue with limited 
> capacity (42 slots in total) @ 200 GB scale
> {noformat}
> CREATE TEMPORARY TABLE sampleData AS
>   SELECT CASE
>WHEN ss_sold_time_sk IS NULL THEN 70429
>ELSE ss_sold_time_sk
>END AS ss_sold_time_sk,
>ss_item_sk,
>ss_customer_sk,
>ss_cdemo_sk,
>ss_hdemo_sk,
>ss_addr_sk,
>ss_store_sk,
>ss_promo_sk,
>ss_ticket_number,
>ss_quantity,
>ss_wholesale_cost,
>ss_list_price,
>ss_sales_price,
>ss_ext_discount_amt,
>ss_ext_sales_price,
>ss_ext_wholesale_cost,
>ss_ext_list_price,
>ss_ext_tax,
>ss_coupon_amt,
>ss_net_paid,
>ss_net_paid_inc_tax,
>ss_net_profit,
>ss_sold_date_sk
>   FROM store_sales distribute by ss_sold_time_sk;
> {noformat}
> This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of 
> nulls are there for ss_sold_time_sk, it would tend to have data skew towards 
> 70429.  If the reducer which gets this data gets scheduled much earlier (i.e 
> in first wave itself), entire job would finish fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source

2015-07-06 Thread TezQA (JIRA)

[ 
https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14614873#comment-14614873
 ] 

TezQA commented on TEZ-2496:


{color:red}-1 overall{color}.  Here are the results of testing the latest 
attachment
  http://issues.apache.org/jira/secure/attachment/12743701/TEZ-2496.6.patch
  against master revision cb59851.

{color:green}+1 @author{color}.  The patch does not contain any @author 
tags.

{color:green}+1 tests included{color}.  The patch appears to include 6 new 
or modified test files.

{color:green}+1 javac{color}.  The applied patch does not increase the 
total number of javac compiler warnings.

{color:red}-1 javadoc{color}.  The javadoc tool appears to have generated 1 
warning messages.
See 
https://builds.apache.org/job/PreCommit-TEZ-Build/884//artifact/patchprocess/diffJavadocWarnings.txt
 for details.

{color:green}+1 findbugs{color}.  The patch does not introduce any new 
Findbugs (version 3.0.1) warnings.

{color:green}+1 release audit{color}.  The applied patch does not increase 
the total number of release audit warnings.

{color:green}+1 core tests{color}.  The patch passed unit tests in .

Test results: https://builds.apache.org/job/PreCommit-TEZ-Build/884//testReport/
Console output: https://builds.apache.org/job/PreCommit-TEZ-Build/884//console

This message is automatically generated.

> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source
> --
>
> Key: TEZ-2496
> URL: https://issues.apache.org/jira/browse/TEZ-2496
> Project: Apache Tez
>  Issue Type: Improvement
>Reporter: Rajesh Balamohan
>Assignee: Rajesh Balamohan
> Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, 
> TEZ-2496.4.patch, TEZ-2496.5.patch, TEZ-2496.6.patch
>
>
> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source.  This would be helpful in scenarios, where there is 
> limited resources (or concurrent jobs running or multiple waves) with 
> dataskew and the task which gets large amount of data gets sceheduled much 
> later.
> e.g Consider the following hive query running in a queue with limited 
> capacity (42 slots in total) @ 200 GB scale
> {noformat}
> CREATE TEMPORARY TABLE sampleData AS
>   SELECT CASE
>WHEN ss_sold_time_sk IS NULL THEN 70429
>ELSE ss_sold_time_sk
>END AS ss_sold_time_sk,
>ss_item_sk,
>ss_customer_sk,
>ss_cdemo_sk,
>ss_hdemo_sk,
>ss_addr_sk,
>ss_store_sk,
>ss_promo_sk,
>ss_ticket_number,
>ss_quantity,
>ss_wholesale_cost,
>ss_list_price,
>ss_sales_price,
>ss_ext_discount_amt,
>ss_ext_sales_price,
>ss_ext_wholesale_cost,
>ss_ext_list_price,
>ss_ext_tax,
>ss_coupon_amt,
>ss_net_paid,
>ss_net_paid_inc_tax,
>ss_net_profit,
>ss_sold_date_sk
>   FROM store_sales distribute by ss_sold_time_sk;
> {noformat}
> This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of 
> nulls are there for ss_sold_time_sk, it would tend to have data skew towards 
> 70429.  If the reducer which gets this data gets scheduled much earlier (i.e 
> in first wave itself), entire job would finish fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source

2015-07-05 Thread TezQA (JIRA)

[ 
https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14614587#comment-14614587
 ] 

TezQA commented on TEZ-2496:


{color:red}-1 overall{color}.  Here are the results of testing the latest 
attachment
  http://issues.apache.org/jira/secure/attachment/12743666/TEZ-2496.5.patch
  against master revision cb59851.

{color:green}+1 @author{color}.  The patch does not contain any @author 
tags.

{color:green}+1 tests included{color}.  The patch appears to include 6 new 
or modified test files.

{color:green}+1 javac{color}.  The applied patch does not increase the 
total number of javac compiler warnings.

{color:red}-1 javadoc{color}.  The javadoc tool appears to have generated 1 
warning messages.
See 
https://builds.apache.org/job/PreCommit-TEZ-Build/883//artifact/patchprocess/diffJavadocWarnings.txt
 for details.

{color:green}+1 findbugs{color}.  The patch does not introduce any new 
Findbugs (version 3.0.1) warnings.

{color:green}+1 release audit{color}.  The applied patch does not increase 
the total number of release audit warnings.

{color:red}-1 core tests{color}.  The patch failed these unit tests in :
   org.apache.tez.test.TestTezJobs
  org.apache.tez.test.TestFaultTolerance

Test results: https://builds.apache.org/job/PreCommit-TEZ-Build/883//testReport/
Console output: https://builds.apache.org/job/PreCommit-TEZ-Build/883//console

This message is automatically generated.

> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source
> --
>
> Key: TEZ-2496
> URL: https://issues.apache.org/jira/browse/TEZ-2496
> Project: Apache Tez
>  Issue Type: Improvement
>Reporter: Rajesh Balamohan
>Assignee: Rajesh Balamohan
> Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, 
> TEZ-2496.4.patch, TEZ-2496.5.patch
>
>
> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source.  This would be helpful in scenarios, where there is 
> limited resources (or concurrent jobs running or multiple waves) with 
> dataskew and the task which gets large amount of data gets sceheduled much 
> later.
> e.g Consider the following hive query running in a queue with limited 
> capacity (42 slots in total) @ 200 GB scale
> {noformat}
> CREATE TEMPORARY TABLE sampleData AS
>   SELECT CASE
>WHEN ss_sold_time_sk IS NULL THEN 70429
>ELSE ss_sold_time_sk
>END AS ss_sold_time_sk,
>ss_item_sk,
>ss_customer_sk,
>ss_cdemo_sk,
>ss_hdemo_sk,
>ss_addr_sk,
>ss_store_sk,
>ss_promo_sk,
>ss_ticket_number,
>ss_quantity,
>ss_wholesale_cost,
>ss_list_price,
>ss_sales_price,
>ss_ext_discount_amt,
>ss_ext_sales_price,
>ss_ext_wholesale_cost,
>ss_ext_list_price,
>ss_ext_tax,
>ss_coupon_amt,
>ss_net_paid,
>ss_net_paid_inc_tax,
>ss_net_profit,
>ss_sold_date_sk
>   FROM store_sales distribute by ss_sold_time_sk;
> {noformat}
> This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of 
> nulls are there for ss_sold_time_sk, it would tend to have data skew towards 
> 70429.  If the reducer which gets this data gets scheduled much earlier (i.e 
> in first wave itself), entire job would finish fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source

2015-06-16 Thread TezQA (JIRA)

[ 
https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14589311#comment-14589311
 ] 

TezQA commented on TEZ-2496:


{color:red}-1 overall{color}.  Here are the results of testing the latest 
attachment
  http://issues.apache.org/jira/secure/attachment/12740034/TEZ-2496.4.patch
  against master revision d160781.

{color:green}+1 @author{color}.  The patch does not contain any @author 
tags.

{color:green}+1 tests included{color}.  The patch appears to include 6 new 
or modified test files.

{color:red}-1 javac{color:red}.  The patch appears to cause the build to 
fail.

Console output: https://builds.apache.org/job/PreCommit-TEZ-Build/832//console

This message is automatically generated.

> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source
> --
>
> Key: TEZ-2496
> URL: https://issues.apache.org/jira/browse/TEZ-2496
> Project: Apache Tez
>  Issue Type: Improvement
>Reporter: Rajesh Balamohan
>Assignee: Rajesh Balamohan
> Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, 
> TEZ-2496.4.patch
>
>
> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source.  This would be helpful in scenarios, where there is 
> limited resources (or concurrent jobs running or multiple waves) with 
> dataskew and the task which gets large amount of data gets sceheduled much 
> later.
> e.g Consider the following hive query running in a queue with limited 
> capacity (42 slots in total) @ 200 GB scale
> {noformat}
> CREATE TEMPORARY TABLE sampleData AS
>   SELECT CASE
>WHEN ss_sold_time_sk IS NULL THEN 70429
>ELSE ss_sold_time_sk
>END AS ss_sold_time_sk,
>ss_item_sk,
>ss_customer_sk,
>ss_cdemo_sk,
>ss_hdemo_sk,
>ss_addr_sk,
>ss_store_sk,
>ss_promo_sk,
>ss_ticket_number,
>ss_quantity,
>ss_wholesale_cost,
>ss_list_price,
>ss_sales_price,
>ss_ext_discount_amt,
>ss_ext_sales_price,
>ss_ext_wholesale_cost,
>ss_ext_list_price,
>ss_ext_tax,
>ss_coupon_amt,
>ss_net_paid,
>ss_net_paid_inc_tax,
>ss_net_profit,
>ss_sold_date_sk
>   FROM store_sales distribute by ss_sold_time_sk;
> {noformat}
> This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of 
> nulls are there for ss_sold_time_sk, it would tend to have data skew towards 
> 70429.  If the reducer which gets this data gets scheduled much earlier (i.e 
> in first wave itself), entire job would finish fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source

2015-06-16 Thread Rajesh Balamohan (JIRA)

[ 
https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14589272#comment-14589272
 ] 

Rajesh Balamohan commented on TEZ-2496:
---

Review board link: https://reviews.apache.org/r/35548/diff/

> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source
> --
>
> Key: TEZ-2496
> URL: https://issues.apache.org/jira/browse/TEZ-2496
> Project: Apache Tez
>  Issue Type: Improvement
>Reporter: Rajesh Balamohan
>Assignee: Rajesh Balamohan
> Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, 
> TEZ-2496.4.patch
>
>
> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source.  This would be helpful in scenarios, where there is 
> limited resources (or concurrent jobs running or multiple waves) with 
> dataskew and the task which gets large amount of data gets sceheduled much 
> later.
> e.g Consider the following hive query running in a queue with limited 
> capacity (42 slots in total) @ 200 GB scale
> {noformat}
> CREATE TEMPORARY TABLE sampleData AS
>   SELECT CASE
>WHEN ss_sold_time_sk IS NULL THEN 70429
>ELSE ss_sold_time_sk
>END AS ss_sold_time_sk,
>ss_item_sk,
>ss_customer_sk,
>ss_cdemo_sk,
>ss_hdemo_sk,
>ss_addr_sk,
>ss_store_sk,
>ss_promo_sk,
>ss_ticket_number,
>ss_quantity,
>ss_wholesale_cost,
>ss_list_price,
>ss_sales_price,
>ss_ext_discount_amt,
>ss_ext_sales_price,
>ss_ext_wholesale_cost,
>ss_ext_list_price,
>ss_ext_tax,
>ss_coupon_amt,
>ss_net_paid,
>ss_net_paid_inc_tax,
>ss_net_profit,
>ss_sold_date_sk
>   FROM store_sales distribute by ss_sold_time_sk;
> {noformat}
> This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of 
> nulls are there for ss_sold_time_sk, it would tend to have data skew towards 
> 70429.  If the reducer which gets this data gets scheduled much earlier (i.e 
> in first wave itself), entire job would finish fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source

2015-06-14 Thread Bikas Saha (JIRA)

[ 
https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14585373#comment-14585373
 ] 

Bikas Saha commented on TEZ-2496:
-

Will thread safety be an issue if a different partition writers are writing the 
their partition sizes?
{code}
+  public void setDataSizeForDest(int destIndex, long size) {
+if (partitionStatistics == null) {
+  partitionStatistics = new BitSet();
+}{code}

Since sizeInMb is used in getBucket() can the whole calculation wrt size be 
moved to getBucket() to simplify the code.
{code}+  public void setDataSizeForDest(int destIndex, long size) {
+if (partitionStatistics == null) {
+  partitionStatistics = new BitSet();
+}
+int sizeInMB = (int) (size / (1024l * 1024l));
+int index = destIndex * (BUCKET_SIZE_IN_MB.length + 1);
+int bucket = getBucket(sizeInMB) + 1;{code}

The actual int value is not being used anywhere other than as an exists check, 
right? Then why not simply writeBoolean?
{code}+int cardinality = in.readInt();
+if (cardinality > -1) {
+  BytesWritable pStats = new BytesWritable();{code}

Duplication of existing code?
{code}+  private int[] createIndices(int partitionRange, int taskIndex, int 
offSetPerTask) {
+int startIndex = taskIndex * offSetPerTask;
+int[] indices = new int[partitionRange];
+for (int currentIndex = 0; currentIndex < partitionRange; ++currentIndex) {
+  indices[currentIndex] = (startIndex + currentIndex);
+}
+return indices;
+  }{code}

The code in shufflevertexmanager can be tricky to get right :) Needs some 
targeted test coverage, ideally with odd and even number of partitions and 
basePartitionRange == & != remainderRangeForLastShuffle.

Does computing partition size and sorting have to be done every time a task is 
scheduled?

>From the API pov, this is introducing redundancy with the existing 
>setDataSize() that sets it globally. Do both need to be called now? Perhaps we 
>can create setDataSizeWithPartitions(long[] sizes) such that if partition 
>sizes are reported then this can be used, else the existing global API can be 
>used. Internally, setDataSizeWithPartitions() could 1) check all partitions 
>have been specified and then also set the global size by summing the 
>partitions.

Do we have some before after numbers that show memory usage in the AM, say for 
a 10Kx10K job. Ideally, a new case could be added in TestMemoryWithEvents that 
covers this case of sending per partition stats so that we can repro easily.


> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source
> --
>
> Key: TEZ-2496
> URL: https://issues.apache.org/jira/browse/TEZ-2496
> Project: Apache Tez
>  Issue Type: Improvement
>Reporter: Rajesh Balamohan
>Assignee: Rajesh Balamohan
> Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch
>
>
> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source.  This would be helpful in scenarios, where there is 
> limited resources (or concurrent jobs running or multiple waves) with 
> dataskew and the task which gets large amount of data gets sceheduled much 
> later.
> e.g Consider the following hive query running in a queue with limited 
> capacity (42 slots in total) @ 200 GB scale
> {noformat}
> CREATE TEMPORARY TABLE sampleData AS
>   SELECT CASE
>WHEN ss_sold_time_sk IS NULL THEN 70429
>ELSE ss_sold_time_sk
>END AS ss_sold_time_sk,
>ss_item_sk,
>ss_customer_sk,
>ss_cdemo_sk,
>ss_hdemo_sk,
>ss_addr_sk,
>ss_store_sk,
>ss_promo_sk,
>ss_ticket_number,
>ss_quantity,
>ss_wholesale_cost,
>ss_list_price,
>ss_sales_price,
>ss_ext_discount_amt,
>ss_ext_sales_price,
>ss_ext_wholesale_cost,
>ss_ext_list_price,
>ss_ext_tax,
>ss_coupon_amt,
>ss_net_paid,
>ss_net_paid_inc_tax,
>ss_net_profit,
>ss_sold_date_sk
>   FROM store_sales distribute by ss_sold_time_sk;
> {noformat}
> This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of 
> nulls are there for ss_sold_time_sk, it would tend to have data skew towards 
> 70429.  If the reducer which gets this data gets scheduled much earlier (i.e 
> in first wave itself), entire job would finish fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source

2015-06-08 Thread Bikas Saha (JIRA)

[ 
https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14578109#comment-14578109
 ] 

Bikas Saha commented on TEZ-2496:
-

Will look at it this week.

> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source
> --
>
> Key: TEZ-2496
> URL: https://issues.apache.org/jira/browse/TEZ-2496
> Project: Apache Tez
>  Issue Type: Improvement
>Reporter: Rajesh Balamohan
>Assignee: Rajesh Balamohan
> Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch
>
>
> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source.  This would be helpful in scenarios, where there is 
> limited resources (or concurrent jobs running or multiple waves) with 
> dataskew and the task which gets large amount of data gets sceheduled much 
> later.
> e.g Consider the following hive query running in a queue with limited 
> capacity (42 slots in total) @ 200 GB scale
> {noformat}
> CREATE TEMPORARY TABLE sampleData AS
>   SELECT CASE
>WHEN ss_sold_time_sk IS NULL THEN 70429
>ELSE ss_sold_time_sk
>END AS ss_sold_time_sk,
>ss_item_sk,
>ss_customer_sk,
>ss_cdemo_sk,
>ss_hdemo_sk,
>ss_addr_sk,
>ss_store_sk,
>ss_promo_sk,
>ss_ticket_number,
>ss_quantity,
>ss_wholesale_cost,
>ss_list_price,
>ss_sales_price,
>ss_ext_discount_amt,
>ss_ext_sales_price,
>ss_ext_wholesale_cost,
>ss_ext_list_price,
>ss_ext_tax,
>ss_coupon_amt,
>ss_net_paid,
>ss_net_paid_inc_tax,
>ss_net_profit,
>ss_sold_date_sk
>   FROM store_sales distribute by ss_sold_time_sk;
> {noformat}
> This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of 
> nulls are there for ss_sold_time_sk, it would tend to have data skew towards 
> 70429.  If the reducer which gets this data gets scheduled much earlier (i.e 
> in first wave itself), entire job would finish fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source

2015-06-03 Thread Rajesh Balamohan (JIRA)

[ 
https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14570714#comment-14570714
 ] 

Rajesh Balamohan commented on TEZ-2496:
---

Thanks [~sseth]
>> For large jobs (large number of partitions), does this information also 
>> become too expensive ? 
- Since it is in compressed format & not materialized unless needed, it wasn't 
adding much to memory pressure.  IIRC, it was adding couple of more bytes (much 
< 10-50 bytes in compressed form), but definitely can add the option of 
enabling it as needed.


>> The way the data is aggregated is very lossy. 100 tasks generating <1MB per 
>> partition will show up as 1MB per partition. While that's ok to determine 
>> the sort order - it can get in the way of other optimizations which may be 
>> possible with such statistics. Rather than providing aggregated stats - it 
>> may be better to provide task level statistics, and let the consumer 
>> aggregate the information as they see fit.
- Yes, this was done purposefully to reduce the memory/cpu pressure. Completely 
agree that it can be an issue for other optimizations. Might have to change 
from "getDataSize(int destIndex)" --> "getTaskDataSizes()" and let the consumer 
decide on aggregation. Will change it.

>> The APIs get interesting as well, especially when adding a flag to 
>> enable/disable these stats. Returning 0 is loss of information. May need an 
>> API to check whether such stats are being published by the specific Output.  
>> Alternately a return code which indicates that stats are not available. 
>> Beyond this - are there any semantics on whether all tasks must publish 
>> these stats if one task in the vertex is publishing them ?
No, currently all tasks publish the info. If it is not available, it would not 
be accounted for the sort order in ShuffleVertexManager. 

>> Depending on how the API is structures - invocations to get these statistics 
>> would need to indicate that these stats are disabled. On the API - what's 
>> reported is the exact size, but what's available is a range. The reporting / 
>> get API could be an ENUM indicating the range / absence of stats to be 
>> consistent.
- Agreed, will change it.

> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source
> --
>
> Key: TEZ-2496
> URL: https://issues.apache.org/jira/browse/TEZ-2496
> Project: Apache Tez
>  Issue Type: Improvement
>Reporter: Rajesh Balamohan
>Assignee: Rajesh Balamohan
> Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch
>
>
> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source.  This would be helpful in scenarios, where there is 
> limited resources (or concurrent jobs running or multiple waves) with 
> dataskew and the task which gets large amount of data gets sceheduled much 
> later.
> e.g Consider the following hive query running in a queue with limited 
> capacity (42 slots in total) @ 200 GB scale
> {noformat}
> CREATE TEMPORARY TABLE sampleData AS
>   SELECT CASE
>WHEN ss_sold_time_sk IS NULL THEN 70429
>ELSE ss_sold_time_sk
>END AS ss_sold_time_sk,
>ss_item_sk,
>ss_customer_sk,
>ss_cdemo_sk,
>ss_hdemo_sk,
>ss_addr_sk,
>ss_store_sk,
>ss_promo_sk,
>ss_ticket_number,
>ss_quantity,
>ss_wholesale_cost,
>ss_list_price,
>ss_sales_price,
>ss_ext_discount_amt,
>ss_ext_sales_price,
>ss_ext_wholesale_cost,
>ss_ext_list_price,
>ss_ext_tax,
>ss_coupon_amt,
>ss_net_paid,
>ss_net_paid_inc_tax,
>ss_net_profit,
>ss_sold_date_sk
>   FROM store_sales distribute by ss_sold_time_sk;
> {noformat}
> This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of 
> nulls are there for ss_sold_time_sk, it would tend to have data skew towards 
> 70429.  If the reducer which gets this data gets scheduled much earlier (i.e 
> in first wave itself), entire job would finish fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source

2015-06-02 Thread Siddharth Seth (JIRA)

[ 
https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14569937#comment-14569937
 ] 

Siddharth Seth commented on TEZ-2496:
-

bq. Storing absolute values of partition sizes would cost lots of memory. 
Instead, patch tries to bucketize the partition sizes into (0,1,10,100,1000 MB 
buckets) and packs all details into a single BitSet per task. This information 
is stored in IOStatisticsImpl & aggregated at vertex level (unpacked only when 
needed). ShuffleVertexManager schedules the tasks based on this information 
available in VertexImpl.
Storing exact sizes per partition will be a large overhead. Rightnow - 6 bits 
are being used to represent 5 ranges + the presence of the size. There's 
several bits to look at here
- For large jobs (large number of partitions), does this information also 
become too expensive ? I think the option to disable publishing this 
information would be useful. [~hitesh] - suggested enabling this dynamically. 
Probably in a follow up though. Another thing to be considered here is to 
partially enable it for a DAG. Smaller vertices can publish it - larger 
vertices don't.
- The way the data is aggregated is very lossy. 100 tasks generating <1MB per 
partition will show up as 1MB per partition. While that's ok to determine the 
sort order - it can get in the way of other optimizations which may be possible 
with such statistics. Rather than providing aggregated stats - it may be better 
to provide task level statistics, and let the consumer aggregate the 
information as they see fit.
- The APIs get interesting as well, especially when adding a flag to 
enable/disable these stats. Returning 0 is loss of information. May need an API 
to check whether such stats are being published by the specific Output. 
Alternately a return code which indicates that stats are not available. Beyond 
this - are there any semantics on whether all tasks must publish these stats if 
one task in the vertex is publishing them ?
- Depending on how the API is structures - invocations to get these statistics 
would need to indicate that these stats are disabled.
- On the API - what's reported is the exact size, but what's available is a 
range. The reporting / get API could be an ENUM indicating the range / absence 
of stats to be consistent.

On the patch itself.
- In JavaDocs - replace destination with 'physical output'
- Handling of values when not set - this currently returns 0. 
- Figuring out the bucket (getBucket) would be easier with an Enumeration. 
maybe a helper to convert a size to an enum.
- Use 4 bits instead of 6. I'm guessing this is 6 to optimize for the merge 
operation
- ShuffleVertexManager: PendingTaskInfo.stats - rename to what the parameter 
represents.
- Does configureTargetMapping(); need to be invoked inside the conditional 
which updates parallelism ?
- Sorter stats reporting: If I'm reading this correctly - stats are reported 
for each spill ? Shouldn't this be once at the end only. Especially since 
there's no increment API for the stats being reported. Alternately, report 
combined stats for all spills up to the last report time.

Haven't looked at the ShuffleVertexManager changes in detail. Will take a look 
based on the discussion.


> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source
> --
>
> Key: TEZ-2496
> URL: https://issues.apache.org/jira/browse/TEZ-2496
> Project: Apache Tez
>  Issue Type: Improvement
>Reporter: Rajesh Balamohan
>Assignee: Rajesh Balamohan
> Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch
>
>
> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source.  This would be helpful in scenarios, where there is 
> limited resources (or concurrent jobs running or multiple waves) with 
> dataskew and the task which gets large amount of data gets sceheduled much 
> later.
> e.g Consider the following hive query running in a queue with limited 
> capacity (42 slots in total) @ 200 GB scale
> {noformat}
> CREATE TEMPORARY TABLE sampleData AS
>   SELECT CASE
>WHEN ss_sold_time_sk IS NULL THEN 70429
>ELSE ss_sold_time_sk
>END AS ss_sold_time_sk,
>ss_item_sk,
>ss_customer_sk,
>ss_cdemo_sk,
>ss_hdemo_sk,
>ss_addr_sk,
>ss_store_sk,
>ss_promo_sk,
>ss_ticket_number,
>ss_quantity,
>ss_wholesale_cost,
>ss_list_price,
>ss_sales_price,
>ss_ext_discount_amt,
>ss_ext_sales_price,
>ss_ext_wholesale_cost,
>ss_ext_list_price,
>ss_ext_tax,
>ss_coupon_amt,
>ss_net_paid,
>ss_net_paid_inc_tax,
>  

[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source

2015-06-01 Thread Rajesh Balamohan (JIRA)

[ 
https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14568193#comment-14568193
 ] 

Rajesh Balamohan commented on TEZ-2496:
---

[~sseth], [~bikassaha] - Can you please review when you find time?

> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source
> --
>
> Key: TEZ-2496
> URL: https://issues.apache.org/jira/browse/TEZ-2496
> Project: Apache Tez
>  Issue Type: Improvement
>Reporter: Rajesh Balamohan
>Assignee: Rajesh Balamohan
> Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch
>
>
> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source.  This would be helpful in scenarios, where there is 
> limited resources (or concurrent jobs running or multiple waves) with 
> dataskew and the task which gets large amount of data gets sceheduled much 
> later.
> e.g Consider the following hive query running in a queue with limited 
> capacity (42 slots in total) @ 200 GB scale
> {noformat}
> CREATE TEMPORARY TABLE sampleData AS
>   SELECT CASE
>WHEN ss_sold_time_sk IS NULL THEN 70429
>ELSE ss_sold_time_sk
>END AS ss_sold_time_sk,
>ss_item_sk,
>ss_customer_sk,
>ss_cdemo_sk,
>ss_hdemo_sk,
>ss_addr_sk,
>ss_store_sk,
>ss_promo_sk,
>ss_ticket_number,
>ss_quantity,
>ss_wholesale_cost,
>ss_list_price,
>ss_sales_price,
>ss_ext_discount_amt,
>ss_ext_sales_price,
>ss_ext_wholesale_cost,
>ss_ext_list_price,
>ss_ext_tax,
>ss_coupon_amt,
>ss_net_paid,
>ss_net_paid_inc_tax,
>ss_net_profit,
>ss_sold_date_sk
>   FROM store_sales distribute by ss_sold_time_sk;
> {noformat}
> This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of 
> nulls are there for ss_sold_time_sk, it would tend to have data skew towards 
> 70429.  If the reducer which gets this data gets scheduled much earlier (i.e 
> in first wave itself), entire job would finish fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source

2015-05-29 Thread TezQA (JIRA)

[ 
https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14565558#comment-14565558
 ] 

TezQA commented on TEZ-2496:


{color:green}+1 overall{color}.  Here are the results of testing the latest 
attachment
  http://issues.apache.org/jira/secure/attachment/12736054/TEZ-2496.3.patch
  against master revision 87f26a2.

{color:green}+1 @author{color}.  The patch does not contain any @author 
tags.

{color:green}+1 tests included{color}.  The patch appears to include 4 new 
or modified test files.

{color:green}+1 javac{color}.  The applied patch does not increase the 
total number of javac compiler warnings.

{color:green}+1 javadoc{color}.  There were no new javadoc warning messages.

{color:green}+1 findbugs{color}.  The patch does not introduce any new 
Findbugs (version 3.0.1) warnings.

{color:green}+1 release audit{color}.  The applied patch does not increase 
the total number of release audit warnings.

{color:green}+1 core tests{color}.  The patch passed unit tests in .

Test results: https://builds.apache.org/job/PreCommit-TEZ-Build/768//testReport/
Console output: https://builds.apache.org/job/PreCommit-TEZ-Build/768//console

This message is automatically generated.

> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source
> --
>
> Key: TEZ-2496
> URL: https://issues.apache.org/jira/browse/TEZ-2496
> Project: Apache Tez
>  Issue Type: Improvement
>Reporter: Rajesh Balamohan
>Assignee: Rajesh Balamohan
> Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch
>
>
> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source.  This would be helpful in scenarios, where there is 
> limited resources (or concurrent jobs running or multiple waves) with 
> dataskew and the task which gets large amount of data gets sceheduled much 
> later.
> e.g Consider the following hive query running in a queue with limited 
> capacity (42 slots in total) @ 200 GB scale
> {noformat}
> CREATE TEMPORARY TABLE sampleData AS
>   SELECT CASE
>WHEN ss_sold_time_sk IS NULL THEN 70429
>ELSE ss_sold_time_sk
>END AS ss_sold_time_sk,
>ss_item_sk,
>ss_customer_sk,
>ss_cdemo_sk,
>ss_hdemo_sk,
>ss_addr_sk,
>ss_store_sk,
>ss_promo_sk,
>ss_ticket_number,
>ss_quantity,
>ss_wholesale_cost,
>ss_list_price,
>ss_sales_price,
>ss_ext_discount_amt,
>ss_ext_sales_price,
>ss_ext_wholesale_cost,
>ss_ext_list_price,
>ss_ext_tax,
>ss_coupon_amt,
>ss_net_paid,
>ss_net_paid_inc_tax,
>ss_net_profit,
>ss_sold_date_sk
>   FROM store_sales distribute by ss_sold_time_sk;
> {noformat}
> This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of 
> nulls are there for ss_sold_time_sk, it would tend to have data skew towards 
> 70429.  If the reducer which gets this data gets scheduled much earlier (i.e 
> in first wave itself), entire job would finish fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source

2015-05-29 Thread TezQA (JIRA)

[ 
https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14564312#comment-14564312
 ] 

TezQA commented on TEZ-2496:


{color:green}+1 overall{color}.  Here are the results of testing the latest 
attachment
  http://issues.apache.org/jira/secure/attachment/12736054/TEZ-2496.3.patch
  against master revision 42b7756.

{color:green}+1 @author{color}.  The patch does not contain any @author 
tags.

{color:green}+1 tests included{color}.  The patch appears to include 4 new 
or modified test files.

{color:green}+1 javac{color}.  The applied patch does not increase the 
total number of javac compiler warnings.

{color:green}+1 javadoc{color}.  There were no new javadoc warning messages.

{color:green}+1 findbugs{color}.  The patch does not introduce any new 
Findbugs (version ) warnings.

{color:green}+1 release audit{color}.  The applied patch does not increase 
the total number of release audit warnings.

{color:green}+1 core tests{color}.  The patch passed unit tests in .

Test results: https://builds.apache.org/job/PreCommit-TEZ-Build/763//testReport/
Console output: https://builds.apache.org/job/PreCommit-TEZ-Build/763//console

This message is automatically generated.

> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source
> --
>
> Key: TEZ-2496
> URL: https://issues.apache.org/jira/browse/TEZ-2496
> Project: Apache Tez
>  Issue Type: Improvement
>Reporter: Rajesh Balamohan
>Assignee: Rajesh Balamohan
> Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch
>
>
> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source.  This would be helpful in scenarios, where there is 
> limited resources (or concurrent jobs running or multiple waves) with 
> dataskew and the task which gets large amount of data gets sceheduled much 
> later.
> e.g Consider the following hive query running in a queue with limited 
> capacity (42 slots in total) @ 200 GB scale
> {noformat}
> CREATE TEMPORARY TABLE sampleData AS
>   SELECT CASE
>WHEN ss_sold_time_sk IS NULL THEN 70429
>ELSE ss_sold_time_sk
>END AS ss_sold_time_sk,
>ss_item_sk,
>ss_customer_sk,
>ss_cdemo_sk,
>ss_hdemo_sk,
>ss_addr_sk,
>ss_store_sk,
>ss_promo_sk,
>ss_ticket_number,
>ss_quantity,
>ss_wholesale_cost,
>ss_list_price,
>ss_sales_price,
>ss_ext_discount_amt,
>ss_ext_sales_price,
>ss_ext_wholesale_cost,
>ss_ext_list_price,
>ss_ext_tax,
>ss_coupon_amt,
>ss_net_paid,
>ss_net_paid_inc_tax,
>ss_net_profit,
>ss_sold_date_sk
>   FROM store_sales distribute by ss_sold_time_sk;
> {noformat}
> This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of 
> nulls are there for ss_sold_time_sk, it would tend to have data skew towards 
> 70429.  If the reducer which gets this data gets scheduled much earlier (i.e 
> in first wave itself), entire job would finish fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source

2015-05-28 Thread TezQA (JIRA)

[ 
https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14564233#comment-14564233
 ] 

TezQA commented on TEZ-2496:


{color:red}-1 overall{color}.  Here are the results of testing the latest 
attachment
  http://issues.apache.org/jira/secure/attachment/12736050/TEZ-2496.2.patch
  against master revision 42b7756.

{color:green}+1 @author{color}.  The patch does not contain any @author 
tags.

{color:green}+1 tests included{color}.  The patch appears to include 2 new 
or modified test files.

{color:green}+1 javac{color}.  The applied patch does not increase the 
total number of javac compiler warnings.

{color:green}+1 javadoc{color}.  There were no new javadoc warning messages.

{color:green}+1 findbugs{color}.  The patch does not introduce any new 
Findbugs (version ) warnings.

{color:green}+1 release audit{color}.  The applied patch does not increase 
the total number of release audit warnings.

{color:red}-1 core tests{color}.  The patch failed these unit tests in :
   
org.apache.tez.runtime.library.common.sort.impl.dflt.TestDefaultSorter
  
org.apache.tez.runtime.library.common.sort.impl.TestPipelinedSorter

Test results: https://builds.apache.org/job/PreCommit-TEZ-Build/762//testReport/
Console output: https://builds.apache.org/job/PreCommit-TEZ-Build/762//console

This message is automatically generated.

> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source
> --
>
> Key: TEZ-2496
> URL: https://issues.apache.org/jira/browse/TEZ-2496
> Project: Apache Tez
>  Issue Type: Improvement
>Reporter: Rajesh Balamohan
>Assignee: Rajesh Balamohan
> Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch
>
>
> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source.  This would be helpful in scenarios, where there is 
> limited resources (or concurrent jobs running or multiple waves) with 
> dataskew and the task which gets large amount of data gets sceheduled much 
> later.
> e.g Consider the following hive query running in a queue with limited 
> capacity (42 slots in total) @ 200 GB scale
> {noformat}
> CREATE TEMPORARY TABLE sampleData AS
>   SELECT CASE
>WHEN ss_sold_time_sk IS NULL THEN 70429
>ELSE ss_sold_time_sk
>END AS ss_sold_time_sk,
>ss_item_sk,
>ss_customer_sk,
>ss_cdemo_sk,
>ss_hdemo_sk,
>ss_addr_sk,
>ss_store_sk,
>ss_promo_sk,
>ss_ticket_number,
>ss_quantity,
>ss_wholesale_cost,
>ss_list_price,
>ss_sales_price,
>ss_ext_discount_amt,
>ss_ext_sales_price,
>ss_ext_wholesale_cost,
>ss_ext_list_price,
>ss_ext_tax,
>ss_coupon_amt,
>ss_net_paid,
>ss_net_paid_inc_tax,
>ss_net_profit,
>ss_sold_date_sk
>   FROM store_sales distribute by ss_sold_time_sk;
> {noformat}
> This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of 
> nulls are there for ss_sold_time_sk, it would tend to have data skew towards 
> 70429.  If the reducer which gets this data gets scheduled much earlier (i.e 
> in first wave itself), entire job would finish fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (TEZ-2496) Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source

2015-05-28 Thread Siddharth Seth (JIRA)

[ 
https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14563701#comment-14563701
 ] 

Siddharth Seth commented on TEZ-2496:
-

+1 for providing per partition information. This can also be used to 
re-distribute partitions to tasks so that all of them get the same amount of 
data. Providing this information to consumer tasks is also helpful - in terms 
of sizing buffers, and potentially prioritizing fetches.

> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source
> --
>
> Key: TEZ-2496
> URL: https://issues.apache.org/jira/browse/TEZ-2496
> Project: Apache Tez
>  Issue Type: Improvement
>Reporter: Rajesh Balamohan
> Attachments: TEZ-2496.1.patch
>
>
> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source.  This would be helpful in scenarios, where there is 
> limited resources (or concurrent jobs running or multiple waves) with 
> dataskew and the task which gets large amount of data gets sceheduled much 
> later.
> e.g Consider the following hive query running in a queue with limited 
> capacity (42 slots in total) @ 200 GB scale
> {noformat}
> CREATE TEMPORARY TABLE sampleData AS
>   SELECT CASE
>WHEN ss_sold_time_sk IS NULL THEN 70429
>ELSE ss_sold_time_sk
>END AS ss_sold_time_sk,
>ss_item_sk,
>ss_customer_sk,
>ss_cdemo_sk,
>ss_hdemo_sk,
>ss_addr_sk,
>ss_store_sk,
>ss_promo_sk,
>ss_ticket_number,
>ss_quantity,
>ss_wholesale_cost,
>ss_list_price,
>ss_sales_price,
>ss_ext_discount_amt,
>ss_ext_sales_price,
>ss_ext_wholesale_cost,
>ss_ext_list_price,
>ss_ext_tax,
>ss_coupon_amt,
>ss_net_paid,
>ss_net_paid_inc_tax,
>ss_net_profit,
>ss_sold_date_sk
>   FROM store_sales distribute by ss_sold_time_sk;
> {noformat}
> This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of 
> nulls are there for ss_sold_time_sk, it would tend to have data skew towards 
> 70429.  If the reducer which gets this data gets scheduled much earlier (i.e 
> in first wave itself), entire job would finish fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)