[jira] [Comment Edited] (SPARK-17788) RangePartitioner results in few very large tasks and many small to empty tasks

2017-12-06 Thread Darren Govoni (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16280271#comment-16280271
 ] 

Darren Govoni edited comment on SPARK-17788 at 12/6/17 2:57 PM:


I'm also running into this error on spark 2.1.0
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 42 in 
stage 11.0 failed 4 times, most recent failure: Lost task 42.3 in stage 11.0 
(TID 7544,xxx.xxx.xxx.xxx.xx, executor 2): java.lang.IllegalArgumentException: 
Cannot allocate a page with more than 17179869176 bytes



was (Author: sesshomurai):
I'm also running into this error on spark 2.1.0
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 42 in 
stage 11.0 failed 4 times, most recent failure: Lost task 42.3 in stage 11.0 
(TID 7544, bdr-itwp-hdfs-2.dev.uspto.gov, executor 2): 
java.lang.IllegalArgumentException: Cannot allocate a page with more than 
17179869176 bytes


> RangePartitioner results in few very large tasks and many small to empty 
> tasks 
> ---
>
> Key: SPARK-17788
> URL: https://issues.apache.org/jira/browse/SPARK-17788
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.0.0
> Environment: Ubuntu 14.04 64bit
> Java 1.8.0_101
>Reporter: Babak Alipour
>Assignee: Wenchen Fan
> Fix For: 2.3.0
>
>
> Greetings everyone,
> I was trying to read a single field of a Hive table stored as Parquet in 
> Spark (~140GB for the entire table, this single field is a Double, ~1.4B 
> records) and look at the sorted output using the following:
> sql("SELECT " + field + " FROM MY_TABLE ORDER BY " + field + " DESC") 
> ​But this simple line of code gives:
> Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with 
> more than 17179869176 bytes
> Same error for:
> sql("SELECT " + field + " FROM MY_TABLE).sort(field)
> and:
> sql("SELECT " + field + " FROM MY_TABLE).orderBy(field)
> After doing some searching, the issue seems to lie in the RangePartitioner 
> trying to create equal ranges. [1]
> [1] 
> https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/RangePartitioner.html
>  
>  The Double values I'm trying to sort are mostly in the range [0,1] (~70% of 
> the data which roughly equates 1 billion records), other numbers in the 
> dataset are as high as 2000. With the RangePartitioner trying to create equal 
> ranges, some tasks are becoming almost empty while others are extremely 
> large, due to the heavily skewed distribution. 
> This is either a bug in Apache Spark or a major limitation of the framework. 
> I hope one of the devs can help solve this issue.
> P.S. Email thread on Spark user mailing list:
> http://mail-archives.apache.org/mod_mbox/spark-user/201610.mbox/%3CCA%2B_of14hTVYTUHXC%3DmS9Kqd6qegVvkoF-ry3Yj2%2BRT%2BWSBNzhg%40mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-17788) RangePartitioner results in few very large tasks and many small to empty tasks

2016-11-25 Thread Herman van Hovell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15696321#comment-15696321
 ] 

Herman van Hovell edited comment on SPARK-17788 at 11/25/16 5:09 PM:
-

That is fair. The solution is not that straightforward TBH:
- Always add some kind of tie breaking value to the range. This could be 
random, but I'd rather add something like monotonically_increasing_id(). This 
always incurs some cost.
- Only add a tie-breaker when the you have (suspect) skew. Here we need to add 
some heavy hitter algorithm, which is potentially much more resource intensive 
than reservoir sampling. The other thing is that when we suspect skew, we would 
need to scan the data again (which would make the total of scans 3).

So I would be slightly in favor of option 1 and a flag to disable it.


was (Author: hvanhovell):
That is fair. The solution is not that straightforward TBH:
- Always add some kind of tie breaking value to the range. This could be 
random, but I'd rather add something like monotonically_increasing_id(). This 
always incurs some cost.
- Only add a tie-breaker when the you have (suspect) skew. Here we need to add 
some heavy hitter algorithm, which is potentially much more resource intensive 
than reservoir sampling. The other thing is that when we suspect skew, we would 
need to scan the data again (which would make the total of scans 3).
So I would be slightly in favor of option 1 and a flag to disable it.

> RangePartitioner results in few very large tasks and many small to empty 
> tasks 
> ---
>
> Key: SPARK-17788
> URL: https://issues.apache.org/jira/browse/SPARK-17788
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.0.0
> Environment: Ubuntu 14.04 64bit
> Java 1.8.0_101
>Reporter: Babak Alipour
>
> Greetings everyone,
> I was trying to read a single field of a Hive table stored as Parquet in 
> Spark (~140GB for the entire table, this single field is a Double, ~1.4B 
> records) and look at the sorted output using the following:
> sql("SELECT " + field + " FROM MY_TABLE ORDER BY " + field + " DESC") 
> ​But this simple line of code gives:
> Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with 
> more than 17179869176 bytes
> Same error for:
> sql("SELECT " + field + " FROM MY_TABLE).sort(field)
> and:
> sql("SELECT " + field + " FROM MY_TABLE).orderBy(field)
> After doing some searching, the issue seems to lie in the RangePartitioner 
> trying to create equal ranges. [1]
> [1] 
> https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/RangePartitioner.html
>  
>  The Double values I'm trying to sort are mostly in the range [0,1] (~70% of 
> the data which roughly equates 1 billion records), other numbers in the 
> dataset are as high as 2000. With the RangePartitioner trying to create equal 
> ranges, some tasks are becoming almost empty while others are extremely 
> large, due to the heavily skewed distribution. 
> This is either a bug in Apache Spark or a major limitation of the framework. 
> I hope one of the devs can help solve this issue.
> P.S. Email thread on Spark user mailing list:
> http://mail-archives.apache.org/mod_mbox/spark-user/201610.mbox/%3CCA%2B_of14hTVYTUHXC%3DmS9Kqd6qegVvkoF-ry3Yj2%2BRT%2BWSBNzhg%40mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-17788) RangePartitioner results in few very large tasks and many small to empty tasks

2016-11-25 Thread Herman van Hovell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15696134#comment-15696134
 ] 

Herman van Hovell edited comment on SPARK-17788 at 11/25/16 4:56 PM:
-

Spark makes a sketch of your data as soon when you want to order the entire 
dataset. Based on that sketch Spark tries to create equally sized partitions. 
As [~holdenk] said, your problem is caused by skew (a lot of rows with the same 
key), and none of the current partitioning schemes can help you with this. On 
the short run, you could follow her suggestion and add noise to the order (this 
only works for global ordering and not for joins/aggregation with skewed 
values). On the long run, there is an ongoing effort to reduce skew for 
joining, see SPARK-9862 for more information.

I have created the follow little spark program to illustrate how range 
partitioning works:
{noformat}
import org.apache.spark.sql.Row

// Set the partitions and parallelism to relatively low value so we can read 
the results.
spark.conf.set("spark.default.parallelism", "20")
spark.conf.set("spark.sql.shuffle.partitions", "20")

// Create a skewed data frame.
val df = spark
  .range(1000)
  .select(
$"id",
(rand(34) * when($"id" % 10 <= 7, 
lit(1.0)).otherwise(lit(10.0))).as("value"))

// Make a summary per partition. The partition intervals should not overlap and 
the number of
// elements in a partition should roughly be the same for all partitions.
case class PartitionSummary(count: Long, min: Double, max: Double, range: 
Double)
val res = df.orderBy($"value").mapPartitions { iterator =>
  val (count, min, max) = iterator.foldLeft((0L, Double.PositiveInfinity, 
Double.NegativeInfinity)) {
case ((count, min, max), Row(_, value: Double)) =>
  (count + 1L, Math.min(min, value), Math.max(max, value))
  }
  Iterator.single(PartitionSummary(count, min, max, max - min))
}

// Get results and make them look nice
res.orderBy($"min")
  .select($"count", $"min".cast("decimal(5,3)"), $"max".cast("decimal(5,3)"), 
$"range".cast("decimal(5,3)"))
  .show(30)
{noformat}

This yields the following results (notice how the partition range varies and 
the row count is relatively similar):
{noformat}
+--+-+--+-+ 
| count|  min|   max|range|
+--+-+--+-+
|484005|0.000| 0.059|0.059|
|426212|0.059| 0.111|0.052|
|381796|0.111| 0.157|0.047|
|519954|0.157| 0.221|0.063|
|496842|0.221| 0.281|0.061|
|539082|0.281| 0.347|0.066|
|516798|0.347| 0.410|0.063|
|558487|0.410| 0.478|0.068|
|419825|0.478| 0.529|0.051|
|402257|0.529| 0.578|0.049|
|557225|0.578| 0.646|0.068|
|518626|0.646| 0.710|0.063|
|611478|0.710| 0.784|0.075|
|544556|0.784| 0.851|0.066|
|454356|0.851| 0.906|0.055|
|450535|0.906| 0.961|0.055|
|575996|0.961| 2.290|1.329|
|525915|2.290| 4.920|2.630|
|518757|4.920| 7.510|2.590|
|497298|7.510|10.000|2.490|
+--+-+--+-+
{noformat}


was (Author: hvanhovell):
Spark makes a sketch of your data as soon when you want to order the entire 
dataset. Based on that sketch Spark tries to create equally sized partitions. 
As [~holdenk] said, your problem is caused by skew (a lot of rows with the same 
key), and none of the current partitioning schemes can help you with this. On 
the short run, you could follow her suggestion and add noise to the order (this 
only works for global ordering and not for joins/aggregation with skewed 
values). On the long run, there is an ongoing effort to reduce skew for 
joining, see SPARK-9862 for more information.

I have creates the follow little spark program to illustrate how range 
partitioning works:
{noformat}
import org.apache.spark.sql.Row

// Set the partitions and parallelism to relatively low value so we can read 
the results.
spark.conf.set("spark.default.parallelism", "20")
spark.conf.set("spark.sql.shuffle.partitions", "20")

// Create a skewed data frame.
val df = spark
  .range(1000)
  .select(
$"id",
(rand(34) * when($"id" % 10 <= 7, 
lit(1.0)).otherwise(lit(10.0))).as("value"))

// Make a summary per partition. The partition intervals should not overlap and 
the number of
// elements in a partition should roughly be the same for all partitions.
case class PartitionSummary(count: Long, min: Double, max: Double, range: 
Double)
val res = df.orderBy($"value").mapPartitions { iterator =>
  val (count, min, max) = iterator.foldLeft((0L, Double.PositiveInfinity, 
Double.NegativeInfinity)) {
case ((count, min, max), Row(_, value: Double)) =>
  (count + 1L, Math.min(min, value), Math.max(max, value))
  }
  Iterator.single(PartitionSummary(count, min, max, max - min))
}

// Get results and make them look nice
res.orderBy($"min")
  .select($"count", $"min".cast("decimal(5,3)"), $"max".cast("decimal(5,3)"), 
$"range".cast("decimal(5,3)"))
  .show(30)
{noformat}

This yields the following 

[jira] [Comment Edited] (SPARK-17788) RangePartitioner results in few very large tasks and many small to empty tasks

2016-11-25 Thread Herman van Hovell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15696134#comment-15696134
 ] 

Herman van Hovell edited comment on SPARK-17788 at 11/25/16 4:10 PM:
-

Spark makes a sketch of your data as soon when you want to order the entire 
dataset. Based on that sketch Spark tries to create equally sized partitions. 
As [~holdenk] said, your problem is caused by skew (a lot of rows with the same 
key), and none of the current partitioning schemes can help you with this. On 
the short run, you could follow her suggestion and add noise to the order (this 
only works for global ordering and not for joins/aggregation with skewed 
values). On the long run, there is an ongoing effort to reduce skew for 
joining, see SPARK-9862 for more information.

I have creates the follow little spark program to illustrate how range 
partitioning works:
{noformat}
import org.apache.spark.sql.Row

// Set the partitions and parallelism to relatively low value so we can read 
the results.
spark.conf.set("spark.default.parallelism", "20")
spark.conf.set("spark.sql.shuffle.partitions", "20")

// Create a skewed data frame.
val df = spark
  .range(1000)
  .select(
$"id",
(rand(34) * when($"id" % 10 <= 7, 
lit(1.0)).otherwise(lit(10.0))).as("value"))

// Make a summary per partition. The partition intervals should not overlap and 
the number of
// elements in a partition should roughly be the same for all partitions.
case class PartitionSummary(count: Long, min: Double, max: Double, range: 
Double)
val res = df.orderBy($"value").mapPartitions { iterator =>
  val (count, min, max) = iterator.foldLeft((0L, Double.PositiveInfinity, 
Double.NegativeInfinity)) {
case ((count, min, max), Row(_, value: Double)) =>
  (count + 1L, Math.min(min, value), Math.max(max, value))
  }
  Iterator.single(PartitionSummary(count, min, max, max - min))
}

// Get results and make them look nice
res.orderBy($"min")
  .select($"count", $"min".cast("decimal(5,3)"), $"max".cast("decimal(5,3)"), 
$"range".cast("decimal(5,3)"))
  .show(30)
{noformat}

This yields the following results (notice how the partition range varies and 
the row count is relatively similar):
{noformat}
+--+-+--+-+ 
| count|  min|   max|range|
+--+-+--+-+
|484005|0.000| 0.059|0.059|
|426212|0.059| 0.111|0.052|
|381796|0.111| 0.157|0.047|
|519954|0.157| 0.221|0.063|
|496842|0.221| 0.281|0.061|
|539082|0.281| 0.347|0.066|
|516798|0.347| 0.410|0.063|
|558487|0.410| 0.478|0.068|
|419825|0.478| 0.529|0.051|
|402257|0.529| 0.578|0.049|
|557225|0.578| 0.646|0.068|
|518626|0.646| 0.710|0.063|
|611478|0.710| 0.784|0.075|
|544556|0.784| 0.851|0.066|
|454356|0.851| 0.906|0.055|
|450535|0.906| 0.961|0.055|
|575996|0.961| 2.290|1.329|
|525915|2.290| 4.920|2.630|
|518757|4.920| 7.510|2.590|
|497298|7.510|10.000|2.490|
+--+-+--+-+
{noformat}


was (Author: hvanhovell):
Spark makes a sketch of your data as soon when you want to order the entire 
dataset. Based on that sketch Spark tries to create equally sized partitions. 
As [~holdenk]] said, your problem is caused by skew (a lot of rows with the 
same key), and none of the current partitioning schemes can help you with this. 
On the short run, you could follow her suggestion and add noise to the order 
(this only works for global ordering and not for joins/aggregation with skewed 
values). On the long run, there is an ongoing effort to reduce skew for 
joining, see SPARK-9862 for more information.

I have creates the follow little spark program to illustrate how range 
partitioning works:
{noformat}
import org.apache.spark.sql.Row

// Set the partitions and parallelism to relatively low value so we can read 
the results.
spark.conf.set("spark.default.parallelism", "20")
spark.conf.set("spark.sql.shuffle.partitions", "20")

// Create a skewed data frame.
val df = spark
  .range(1000)
  .select(
$"id",
(rand(34) * when($"id" % 10 <= 7, 
lit(1.0)).otherwise(lit(10.0))).as("value"))

// Make a summary per partition. The partition intervals should not overlap and 
the number of
// elements in a partition should roughly be the same for all partitions.
case class PartitionSummary(count: Long, min: Double, max: Double, range: 
Double)
val res = df.orderBy($"value").mapPartitions { iterator =>
  val (count, min, max) = iterator.foldLeft((0L, Double.PositiveInfinity, 
Double.NegativeInfinity)) {
case ((count, min, max), Row(_, value: Double)) =>
  (count + 1L, Math.min(min, value), Math.max(max, value))
  }
  Iterator.single(PartitionSummary(count, min, max, max - min))
}

// Get results and make them look nice
res.orderBy($"min")
  .select($"count", $"min".cast("decimal(5,3)"), $"max".cast("decimal(5,3)"), 
$"range".cast("decimal(5,3)"))
  .show(30)
{noformat}

This yields the following