[jira] [Commented] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.

2019-04-09 Thread ketan kunde (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813158#comment-16813158
 ] 

ketan kunde commented on SPARK-9858:


[~aroberts] : did this exchangecordinator suite test cases pass for your big 
endian environment, exclusively test cases by the following name

 

test(s"determining the number of reducers: complex query 1

test(s"determining the number of reducers: complex query 2 

The above test cases are also seen failing on my big endian environment with 
the below respective logs
 * determining the number of reducers: complex query 1 *** FAILED ***
 Set(1, 2) did not equal Set(2, 3) (ExchangeCoordinatorSuite.scala:424)
- determining the number of reducers: complex query 2 *** FAILED ***
 Set(4, 2) did not equal Set(5, 3) (ExchangeCoordinatorSuite.scala:476)

Since this ticket is RESOLVED i would like to know from you what is the change 
u did to ensure passing of this test cases

Also could you also highlight which exact feature of spark does this test case 
test

I would be very greatful for your reply.

 

Regards

Ketan 

 

> Introduce an ExchangeCoordinator to estimate the number of post-shuffle 
> partitions.
> ---
>
> Key: SPARK-9858
> URL: https://issues.apache.org/jira/browse/SPARK-9858
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Yin Huai
>Assignee: Yin Huai
>Priority: Major
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.

2015-12-11 Thread Adam Roberts (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15052603#comment-15052603
 ] 

Adam Roberts commented on SPARK-9858:
-

Modifying the UnsafeRowSerializer to always write/read in LE fixes the problem, 
therefore enabling tungsten features to be fully exploited regardless of 
endianness (not yet sure why only the aggregate functions are impacted, thought 
we'd have plenty of test failures). We can use 
LittleEndianDataInput/OutputStream to achieve this; part of the same package as 
ByteStreams. Will ensure the regular SparkSqlSerializer is OK too.

We're hitting a similar problem with the DatasetAggregatorSuite (instead of 1 
we get 9, instead of 2 we get 10, etc), I expect the root cause to be the same.

I'll get to work on the pull request, cheers 

> Introduce an ExchangeCoordinator to estimate the number of post-shuffle 
> partitions.
> ---
>
> Key: SPARK-9858
> URL: https://issues.apache.org/jira/browse/SPARK-9858
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Yin Huai
>Assignee: Yin Huai
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.

2015-12-11 Thread Yin Huai (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15053402#comment-15053402
 ] 

Yin Huai commented on SPARK-9858:
-

[~aroberts] Can you create a new jira and cc me and [~joshrosen] from there?

> Introduce an ExchangeCoordinator to estimate the number of post-shuffle 
> partitions.
> ---
>
> Key: SPARK-9858
> URL: https://issues.apache.org/jira/browse/SPARK-9858
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Yin Huai
>Assignee: Yin Huai
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.

2015-12-09 Thread Adam Roberts (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15048636#comment-15048636
 ] 

Adam Roberts commented on SPARK-9858:
-

Thanks for the prompt reply, rowBuffer is a variable in 
org.apache.spark.sql.execution.UnsafeRowSerializer within the 
asKeyValueIterator method. I experimented with the Exchange class, same 
problems are observed using the SparkSqlSeriaizer; suggesting the 
UnsafeRowSerializer is probably fine.

I agree with your second comment, I think the code within 
org.apache.spark.unsafe.Platform is OK or we'd be hitting problems elsewhere.

It'll be useful to determine how the values in the assertions can be determined 
programatically, I think the partitioning algorithm itself is working as 
expected but for some reason stages require more bytes on the platforms I'm 
using.

spark.sql.shuffle.partitions is unchanged, I'm working off the latest master 
code.

Is there something special about the aggregate, join, and complex query 2 tests?

Can we print exactly what the bytes are for each stage? I know rdd.count is 
always correct and the DataFrames are the same (printed each row, written to 
json and parquet - no concerns).

Potential clue: if we set SQLConf.SHUFFLE_PARTITIONS.key to 4, the aggregate 
test passes.

> Introduce an ExchangeCoordinator to estimate the number of post-shuffle 
> partitions.
> ---
>
> Key: SPARK-9858
> URL: https://issues.apache.org/jira/browse/SPARK-9858
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Yin Huai
>Assignee: Yin Huai
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.

2015-12-09 Thread Yin Huai (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15049139#comment-15049139
 ] 

Yin Huai commented on SPARK-9858:
-

My understanding is that every task should get a new instance of the shuffle 
serializer (SparkSqlSerializer or UnsafeRowSerializer). So, we should not need 
to worry about the thread safety issue of the serializer. It will be good to 
confirm it with the identify hash code. Can you do it?

> Introduce an ExchangeCoordinator to estimate the number of post-shuffle 
> partitions.
> ---
>
> Key: SPARK-9858
> URL: https://issues.apache.org/jira/browse/SPARK-9858
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Yin Huai
>Assignee: Yin Huai
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.

2015-12-09 Thread Adam Roberts (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15049284#comment-15049284
 ] 

Adam Roberts commented on SPARK-9858:
-

Yep, I added System.identityHashCode(serializer) prints in both the creation 
method and when it's used (both in the Exchange class)


Creating new unsafe row serializer
ADAMTEST. myUnsafeRowSerializer identity hash: -555078685
Creating new unsafe row serializer
ADAMTEST. myUnsafeRowSerializer identity hash: 1088823803
preparing shuffle dependency
ADAMTEST. In needToCopy function and serializer hash is: 1088823803


New development, on Intel (LE platform) if we take the 200 elements and print 
them, we get 20 rows containing (3,[0,13,5,ff00]) in a row. On our 
BE platforms this isn't the case, everything is 
(3,[0,13,5,0]) - the same as the rest of the file on Intel. This print is in 
DAGScheduler's submitMapStage method:

  val rdd = dependency.rdd
  rdd.take(200).foreach(println)

> Introduce an ExchangeCoordinator to estimate the number of post-shuffle 
> partitions.
> ---
>
> Key: SPARK-9858
> URL: https://issues.apache.org/jira/browse/SPARK-9858
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Yin Huai
>Assignee: Yin Huai
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.

2015-12-08 Thread Adam Roberts (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15047194#comment-15047194
 ] 

Adam Roberts commented on SPARK-9858:
-

Several potential issues here, may well not be with this code itself though - 
I'm consistently encountering problems for two different big endian platforms 
while testing this

1) is this thread safe? I've noticed if we print the rowBuffer when using more 
than one thread for our SQLContext, the ordering of elements is not consistent 
and we sometimes have two rows printed consecutively

2) For the aggregate, join, and complex query 2 tests, I consistently receive 
more bytes per partition and instead of estimating (0, 2) for the indices we 
get (0, 2, 4). I know we're using the UnsafeRowSerializer and so wary if the 
issue lies here instead, I see it's using Google's ByteStreams class to read in 
the bytes. Specifically I have 800, 800, 800, 800, 720 bytes per partition 
instead of 600, 600, 600, 600, 600

3) Where do the values used in the assertions for the test suite come from?

If we print the rows we see differences between the two platforms: (the 63 and 
70 is on our BE platform and this value differs each time we run the test)

Works perfectly on various architectures that are LE and hence the current 
endianness/serialization theory. Apologies if this would be better suited to 
the dev mailing list, although I expect I'm one of the few to be testing this 
on BE...

> Introduce an ExchangeCoordinator to estimate the number of post-shuffle 
> partitions.
> ---
>
> Key: SPARK-9858
> URL: https://issues.apache.org/jira/browse/SPARK-9858
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Yin Huai
>Assignee: Yin Huai
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.

2015-12-08 Thread Yin Huai (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15047222#comment-15047222
 ] 

Yin Huai commented on SPARK-9858:
-

[~aroberts] Thanks for your comments.

For 1, can you provide more details? What is the rowBuffer you referred to?

For 2 and 3, I feel the size differences are caused by the differences of 
platforms. In our tests, I got those numbers in assertion  from my machine. 
Those numbers work well with jenkins. Do you have any suggestion on how we can 
make these tests robust to different platforms? 

btw, have you changed {{spark.sql.shuffle.partitions}}?

> Introduce an ExchangeCoordinator to estimate the number of post-shuffle 
> partitions.
> ---
>
> Key: SPARK-9858
> URL: https://issues.apache.org/jira/browse/SPARK-9858
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Yin Huai
>Assignee: Yin Huai
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.

2015-11-03 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14988510#comment-14988510
 ] 

Apache Spark commented on SPARK-9858:
-

User 'yhuai' has created a pull request for this issue:
https://github.com/apache/spark/pull/9453

> Introduce an ExchangeCoordinator to estimate the number of post-shuffle 
> partitions.
> ---
>
> Key: SPARK-9858
> URL: https://issues.apache.org/jira/browse/SPARK-9858
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Yin Huai
>Assignee: Yin Huai
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.

2015-10-25 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14973782#comment-14973782
 ] 

Apache Spark commented on SPARK-9858:
-

User 'yhuai' has created a pull request for this issue:
https://github.com/apache/spark/pull/9276

> Introduce an ExchangeCoordinator to estimate the number of post-shuffle 
> partitions.
> ---
>
> Key: SPARK-9858
> URL: https://issues.apache.org/jira/browse/SPARK-9858
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Yin Huai
>Assignee: Yin Huai
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org