[jira] [Comment Edited] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.

2015-12-09 Thread Adam Roberts (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15048636#comment-15048636
 ] 

Adam Roberts edited comment on SPARK-9858 at 12/9/15 1:07 PM:
--

Thanks for the prompt reply, rowBuffer is a variable in 
org.apache.spark.sql.execution.UnsafeRowSerializer within the 
asKeyValueIterator method. I experimented with the Exchange class, same 
problems are observed using the SparkSqlSeriaizer; suggesting the 
UnsafeRowSerializer is probably fine.

I agree with your second comment, I think the code within 
org.apache.spark.unsafe.Platform is OK or we'd be hitting problems elsewhere.

It'll be useful to determine how the values in the assertions can be determined 
programatically, I think the partitioning algorithm itself is working as 
expected but for some reason stages require more bytes on the platforms I'm 
using.

spark.sql.shuffle.partitions is unchanged, I'm working off the latest master 
code.

Is there something special about the aggregate, join, and complex query 2 tests?

Can we print exactly what the bytes are for each stage? I know rdd.count is 
always correct and the DataFrames are the same (printed each row, written to 
json and parquet - no concerns).

Potential clue: if we set SQLConf.SHUFFLE_PARTITIONS.key to 4, the aggregate 
test passes.

I'm wondering if there's an extra factor we should take into account when 
determining the indices regardless of platform.


was (Author: aroberts):
Thanks for the prompt reply, rowBuffer is a variable in 
org.apache.spark.sql.execution.UnsafeRowSerializer within the 
asKeyValueIterator method. I experimented with the Exchange class, same 
problems are observed using the SparkSqlSeriaizer; suggesting the 
UnsafeRowSerializer is probably fine.

I agree with your second comment, I think the code within 
org.apache.spark.unsafe.Platform is OK or we'd be hitting problems elsewhere.

It'll be useful to determine how the values in the assertions can be determined 
programatically, I think the partitioning algorithm itself is working as 
expected but for some reason stages require more bytes on the platforms I'm 
using.

spark.sql.shuffle.partitions is unchanged, I'm working off the latest master 
code.

Is there something special about the aggregate, join, and complex query 2 tests?

Can we print exactly what the bytes are for each stage? I know rdd.count is 
always correct and the DataFrames are the same (printed each row, written to 
json and parquet - no concerns).

Potential clue: if we set SQLConf.SHUFFLE_PARTITIONS.key to 4, the aggregate 
test passes.

> Introduce an ExchangeCoordinator to estimate the number of post-shuffle 
> partitions.
> ---
>
> Key: SPARK-9858
> URL: https://issues.apache.org/jira/browse/SPARK-9858
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Yin Huai
>Assignee: Yin Huai
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.

2015-12-09 Thread Adam Roberts (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15048636#comment-15048636
 ] 

Adam Roberts edited comment on SPARK-9858 at 12/9/15 2:35 PM:
--

Thanks for the prompt reply, rowBuffer is a variable in 
org.apache.spark.sql.execution.UnsafeRowSerializer within the 
asKeyValueIterator method. I experimented with the Exchange class, same 
problems are observed using the SparkSqlSeriaizer; suggesting the 
UnsafeRowSerializer is probably fine.

I agree with your second comment, I think the code within 
org.apache.spark.unsafe.Platform is OK or we'd be hitting problems elsewhere.

It'll be useful to determine how the values in the assertions can be determined 
programatically, I think the partitioning algorithm itself is working as 
expected but for some reason stages require more bytes on the platforms I'm 
using.

spark.sql.shuffle.partitions is unchanged, I'm working off the latest master 
code.

Is there something special about the aggregate, join, and complex query 2 tests?

Can we print exactly what the bytes are for each stage? I know rdd.count is 
always correct and the DataFrames are the same (printed each row, written to 
json and parquet - no concerns).

Potential clue: if we set SQLConf.SHUFFLE_PARTITIONS.key to 4, the aggregate 
test passes.

I'm wondering if there's an extra factor we should be taking into account


was (Author: aroberts):
Thanks for the prompt reply, rowBuffer is a variable in 
org.apache.spark.sql.execution.UnsafeRowSerializer within the 
asKeyValueIterator method. I experimented with the Exchange class, same 
problems are observed using the SparkSqlSeriaizer; suggesting the 
UnsafeRowSerializer is probably fine.

I agree with your second comment, I think the code within 
org.apache.spark.unsafe.Platform is OK or we'd be hitting problems elsewhere.

It'll be useful to determine how the values in the assertions can be determined 
programatically, I think the partitioning algorithm itself is working as 
expected but for some reason stages require more bytes on the platforms I'm 
using.

spark.sql.shuffle.partitions is unchanged, I'm working off the latest master 
code.

Is there something special about the aggregate, join, and complex query 2 tests?

Can we print exactly what the bytes are for each stage? I know rdd.count is 
always correct and the DataFrames are the same (printed each row, written to 
json and parquet - no concerns).

Potential clue: if we set SQLConf.SHUFFLE_PARTITIONS.key to 4, the aggregate 
test passes.

I'm wondering if there's an extra factor we should take into account when 
determining the indices regardless of platform.

> Introduce an ExchangeCoordinator to estimate the number of post-shuffle 
> partitions.
> ---
>
> Key: SPARK-9858
> URL: https://issues.apache.org/jira/browse/SPARK-9858
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Yin Huai
>Assignee: Yin Huai
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.

2015-12-09 Thread Adam Roberts (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15049284#comment-15049284
 ] 

Adam Roberts edited comment on SPARK-9858 at 12/9/15 10:24 PM:
---

Yep, I added System.identityHashCode(serializer) prints in both the creation 
method and when it's used (both in the Exchange class)


Creating new unsafe row serializer
myUnsafeRowSerializer identity hash: -555078685
Creating new unsafe row serializer
myUnsafeRowSerializer identity hash: 1088823803
Preparing shuffle dependency
In needToCopy function and serializer hash is: 1088823803


New development, on Intel (the LE platform) if we take the 200 elements and 
print them, we get 20 rows containing (3,[0,13,5,ff00]) in a row. 
On our BE platforms this isn't the case, everything is 
(3,[0,13,5,0]) - the same as the rest of the RDD on Intel. I added this print 
in DAGScheduler's submitMapStage method:

  val rdd = dependency.rdd
  rdd.take(200).foreach(println)


was (Author: aroberts):
Yep, I added System.identityHashCode(serializer) prints in both the creation 
method and when it's used (both in the Exchange class)


Creating new unsafe row serializer
myUnsafeRowSerializer identity hash: -555078685
Creating new unsafe row serializer
myUnsafeRowSerializer identity hash: 1088823803
Preparing shuffle dependency
In needToCopy function and serializer hash is: 1088823803


New development, on Intel (LE platform) if we take the 200 elements and print 
them, we get 20 rows containing (3,[0,13,5,ff00]) in a row. On our 
BE platforms this isn't the case, everything is 
(3,[0,13,5,0]) - the same as the rest of the RDD on Intel. I added this print 
in DAGScheduler's submitMapStage method:

  val rdd = dependency.rdd
  rdd.take(200).foreach(println)

> Introduce an ExchangeCoordinator to estimate the number of post-shuffle 
> partitions.
> ---
>
> Key: SPARK-9858
> URL: https://issues.apache.org/jira/browse/SPARK-9858
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Yin Huai
>Assignee: Yin Huai
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.

2015-12-09 Thread Adam Roberts (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15049284#comment-15049284
 ] 

Adam Roberts edited comment on SPARK-9858 at 12/9/15 7:53 PM:
--

Yep, I added System.identityHashCode(serializer) prints in both the creation 
method and when it's used (both in the Exchange class)


Creating new unsafe row serializer
ADAMTEST. myUnsafeRowSerializer identity hash: -555078685
Creating new unsafe row serializer
ADAMTEST. myUnsafeRowSerializer identity hash: 1088823803
preparing shuffle dependency
ADAMTEST. In needToCopy function and serializer hash is: 1088823803


New development, on Intel (LE platform) if we take the 200 elements and print 
them, we get 20 rows containing (3,[0,13,5,ff00]) in a row. On our 
BE platforms this isn't the case, everything is 
(3,[0,13,5,0]) - the same as the rest of the RDD on Intel. I added this print 
in DAGScheduler's submitMapStage method:

  val rdd = dependency.rdd
  rdd.take(200).foreach(println)


was (Author: aroberts):
Yep, I added System.identityHashCode(serializer) prints in both the creation 
method and when it's used (both in the Exchange class)


Creating new unsafe row serializer
ADAMTEST. myUnsafeRowSerializer identity hash: -555078685
Creating new unsafe row serializer
ADAMTEST. myUnsafeRowSerializer identity hash: 1088823803
preparing shuffle dependency
ADAMTEST. In needToCopy function and serializer hash is: 1088823803


New development, on Intel (LE platform) if we take the 200 elements and print 
them, we get 20 rows containing (3,[0,13,5,ff00]) in a row. On our 
BE platforms this isn't the case, everything is 
(3,[0,13,5,0]) - the same as the rest of the file on Intel. This print is in 
DAGScheduler's submitMapStage method:

  val rdd = dependency.rdd
  rdd.take(200).foreach(println)

> Introduce an ExchangeCoordinator to estimate the number of post-shuffle 
> partitions.
> ---
>
> Key: SPARK-9858
> URL: https://issues.apache.org/jira/browse/SPARK-9858
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Yin Huai
>Assignee: Yin Huai
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.

2015-12-09 Thread Adam Roberts (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15049284#comment-15049284
 ] 

Adam Roberts edited comment on SPARK-9858 at 12/9/15 7:53 PM:
--

Yep, I added System.identityHashCode(serializer) prints in both the creation 
method and when it's used (both in the Exchange class)


Creating new unsafe row serializer
myUnsafeRowSerializer identity hash: -555078685
Creating new unsafe row serializer
myUnsafeRowSerializer identity hash: 1088823803
Preparing shuffle dependency
In needToCopy function and serializer hash is: 1088823803


New development, on Intel (LE platform) if we take the 200 elements and print 
them, we get 20 rows containing (3,[0,13,5,ff00]) in a row. On our 
BE platforms this isn't the case, everything is 
(3,[0,13,5,0]) - the same as the rest of the RDD on Intel. I added this print 
in DAGScheduler's submitMapStage method:

  val rdd = dependency.rdd
  rdd.take(200).foreach(println)


was (Author: aroberts):
Yep, I added System.identityHashCode(serializer) prints in both the creation 
method and when it's used (both in the Exchange class)


Creating new unsafe row serializer
ADAMTEST. myUnsafeRowSerializer identity hash: -555078685
Creating new unsafe row serializer
ADAMTEST. myUnsafeRowSerializer identity hash: 1088823803
preparing shuffle dependency
ADAMTEST. In needToCopy function and serializer hash is: 1088823803


New development, on Intel (LE platform) if we take the 200 elements and print 
them, we get 20 rows containing (3,[0,13,5,ff00]) in a row. On our 
BE platforms this isn't the case, everything is 
(3,[0,13,5,0]) - the same as the rest of the RDD on Intel. I added this print 
in DAGScheduler's submitMapStage method:

  val rdd = dependency.rdd
  rdd.take(200).foreach(println)

> Introduce an ExchangeCoordinator to estimate the number of post-shuffle 
> partitions.
> ---
>
> Key: SPARK-9858
> URL: https://issues.apache.org/jira/browse/SPARK-9858
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Yin Huai
>Assignee: Yin Huai
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.

2015-12-08 Thread Adam Roberts (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15047194#comment-15047194
 ] 

Adam Roberts edited comment on SPARK-9858 at 12/8/15 6:33 PM:
--

Several potential issues here, may well not be with this code itself though - 
I'm consistently encountering problems for two different big endian platforms 
while testing this

1) is this thread safe? I've noticed if we print the rowBuffer when using more 
than one thread for our SQLContext, the ordering of elements is not consistent 
and we sometimes have two rows printed consecutively

2) For the aggregate, join, and complex query 2 tests, I consistently receive 
more bytes per partition (or should that be per stage?) and instead of 
estimating (0, 2) for the indices we get (0, 2, 4). I know we're using the 
UnsafeRowSerializer and so wary if the issue lies here instead, I see it's 
using Google's ByteStreams class to read in the bytes. Specifically I have 800, 
800, 800, 800, 720 bytes per partition instead of 600, 600, 600, 600, 600. Is 
there a way I can print what these bytes are?

3) Where do the values used in the assertions for the test suite come from?

If we print the rows we see differences between the two platforms: (the 63 and 
70 is on our BE platform and this value differs each time we run the test)

Works perfectly on various architectures that are LE and hence the current 
endianness/serialization theory. This might be better suited to the dev mailing 
list although I expect I'm one of the few to be testing this on BE. It occurs 
regardless of Java vendor and whether we're running in interpreted mode or not.


was (Author: aroberts):
Several potential issues here, may well not be with this code itself though - 
I'm consistently encountering problems for two different big endian platforms 
while testing this

1) is this thread safe? I've noticed if we print the rowBuffer when using more 
than one thread for our SQLContext, the ordering of elements is not consistent 
and we sometimes have two rows printed consecutively

2) For the aggregate, join, and complex query 2 tests, I consistently receive 
more bytes per partition (or should that be per stage?) and instead of 
estimating (0, 2) for the indices we get (0, 2, 4). I know we're using the 
UnsafeRowSerializer and so wary if the issue lies here instead, I see it's 
using Google's ByteStreams class to read in the bytes. Specifically I have 800, 
800, 800, 800, 720 bytes per partition instead of 600, 600, 600, 600, 600. Is 
there a way I can print what these bytes are?

3) Where do the values used in the assertions for the test suite come from?

If we print the rows we see differences between the two platforms: (the 63 and 
70 is on our BE platform and this value differs each time we run the test)

Works perfectly on various architectures that are LE and hence the current 
endianness/serialization theory. Apologies if this would be better suited to 
the dev mailing list, although I expect I'm one of the few to be testing this 
on BE...

> Introduce an ExchangeCoordinator to estimate the number of post-shuffle 
> partitions.
> ---
>
> Key: SPARK-9858
> URL: https://issues.apache.org/jira/browse/SPARK-9858
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Yin Huai
>Assignee: Yin Huai
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.

2015-12-08 Thread Adam Roberts (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15047194#comment-15047194
 ] 

Adam Roberts edited comment on SPARK-9858 at 12/8/15 6:28 PM:
--

Several potential issues here, may well not be with this code itself though - 
I'm consistently encountering problems for two different big endian platforms 
while testing this

1) is this thread safe? I've noticed if we print the rowBuffer when using more 
than one thread for our SQLContext, the ordering of elements is not consistent 
and we sometimes have two rows printed consecutively

2) For the aggregate, join, and complex query 2 tests, I consistently receive 
more bytes per partition (or should that be per stage?) and instead of 
estimating (0, 2) for the indices we get (0, 2, 4). I know we're using the 
UnsafeRowSerializer and so wary if the issue lies here instead, I see it's 
using Google's ByteStreams class to read in the bytes. Specifically I have 800, 
800, 800, 800, 720 bytes per partition instead of 600, 600, 600, 600, 600. Is 
there a way I can print what these bytes are?

3) Where do the values used in the assertions for the test suite come from?

If we print the rows we see differences between the two platforms: (the 63 and 
70 is on our BE platform and this value differs each time we run the test)

Works perfectly on various architectures that are LE and hence the current 
endianness/serialization theory. Apologies if this would be better suited to 
the dev mailing list, although I expect I'm one of the few to be testing this 
on BE...


was (Author: aroberts):
Several potential issues here, may well not be with this code itself though - 
I'm consistently encountering problems for two different big endian platforms 
while testing this

1) is this thread safe? I've noticed if we print the rowBuffer when using more 
than one thread for our SQLContext, the ordering of elements is not consistent 
and we sometimes have two rows printed consecutively

2) For the aggregate, join, and complex query 2 tests, I consistently receive 
more bytes per partition and instead of estimating (0, 2) for the indices we 
get (0, 2, 4). I know we're using the UnsafeRowSerializer and so wary if the 
issue lies here instead, I see it's using Google's ByteStreams class to read in 
the bytes. Specifically I have 800, 800, 800, 800, 720 bytes per partition 
instead of 600, 600, 600, 600, 600

3) Where do the values used in the assertions for the test suite come from?

If we print the rows we see differences between the two platforms: (the 63 and 
70 is on our BE platform and this value differs each time we run the test)

Works perfectly on various architectures that are LE and hence the current 
endianness/serialization theory. Apologies if this would be better suited to 
the dev mailing list, although I expect I'm one of the few to be testing this 
on BE...

> Introduce an ExchangeCoordinator to estimate the number of post-shuffle 
> partitions.
> ---
>
> Key: SPARK-9858
> URL: https://issues.apache.org/jira/browse/SPARK-9858
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Yin Huai
>Assignee: Yin Huai
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org