[jira] [Commented] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.
[ https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813158#comment-16813158 ] ketan kunde commented on SPARK-9858: [~aroberts] : did this exchangecordinator suite test cases pass for your big endian environment, exclusively test cases by the following name test(s"determining the number of reducers: complex query 1 test(s"determining the number of reducers: complex query 2 The above test cases are also seen failing on my big endian environment with the below respective logs * determining the number of reducers: complex query 1 *** FAILED *** Set(1, 2) did not equal Set(2, 3) (ExchangeCoordinatorSuite.scala:424) - determining the number of reducers: complex query 2 *** FAILED *** Set(4, 2) did not equal Set(5, 3) (ExchangeCoordinatorSuite.scala:476) Since this ticket is RESOLVED i would like to know from you what is the change u did to ensure passing of this test cases Also could you also highlight which exact feature of spark does this test case test I would be very greatful for your reply. Regards Ketan > Introduce an ExchangeCoordinator to estimate the number of post-shuffle > partitions. > --- > > Key: SPARK-9858 > URL: https://issues.apache.org/jira/browse/SPARK-9858 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Yin Huai >Assignee: Yin Huai >Priority: Major > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.
[ https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15052603#comment-15052603 ] Adam Roberts commented on SPARK-9858: - Modifying the UnsafeRowSerializer to always write/read in LE fixes the problem, therefore enabling tungsten features to be fully exploited regardless of endianness (not yet sure why only the aggregate functions are impacted, thought we'd have plenty of test failures). We can use LittleEndianDataInput/OutputStream to achieve this; part of the same package as ByteStreams. Will ensure the regular SparkSqlSerializer is OK too. We're hitting a similar problem with the DatasetAggregatorSuite (instead of 1 we get 9, instead of 2 we get 10, etc), I expect the root cause to be the same. I'll get to work on the pull request, cheers > Introduce an ExchangeCoordinator to estimate the number of post-shuffle > partitions. > --- > > Key: SPARK-9858 > URL: https://issues.apache.org/jira/browse/SPARK-9858 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Yin Huai >Assignee: Yin Huai > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.
[ https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15053402#comment-15053402 ] Yin Huai commented on SPARK-9858: - [~aroberts] Can you create a new jira and cc me and [~joshrosen] from there? > Introduce an ExchangeCoordinator to estimate the number of post-shuffle > partitions. > --- > > Key: SPARK-9858 > URL: https://issues.apache.org/jira/browse/SPARK-9858 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Yin Huai >Assignee: Yin Huai > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.
[ https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15048636#comment-15048636 ] Adam Roberts commented on SPARK-9858: - Thanks for the prompt reply, rowBuffer is a variable in org.apache.spark.sql.execution.UnsafeRowSerializer within the asKeyValueIterator method. I experimented with the Exchange class, same problems are observed using the SparkSqlSeriaizer; suggesting the UnsafeRowSerializer is probably fine. I agree with your second comment, I think the code within org.apache.spark.unsafe.Platform is OK or we'd be hitting problems elsewhere. It'll be useful to determine how the values in the assertions can be determined programatically, I think the partitioning algorithm itself is working as expected but for some reason stages require more bytes on the platforms I'm using. spark.sql.shuffle.partitions is unchanged, I'm working off the latest master code. Is there something special about the aggregate, join, and complex query 2 tests? Can we print exactly what the bytes are for each stage? I know rdd.count is always correct and the DataFrames are the same (printed each row, written to json and parquet - no concerns). Potential clue: if we set SQLConf.SHUFFLE_PARTITIONS.key to 4, the aggregate test passes. > Introduce an ExchangeCoordinator to estimate the number of post-shuffle > partitions. > --- > > Key: SPARK-9858 > URL: https://issues.apache.org/jira/browse/SPARK-9858 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Yin Huai >Assignee: Yin Huai > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.
[ https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15049139#comment-15049139 ] Yin Huai commented on SPARK-9858: - My understanding is that every task should get a new instance of the shuffle serializer (SparkSqlSerializer or UnsafeRowSerializer). So, we should not need to worry about the thread safety issue of the serializer. It will be good to confirm it with the identify hash code. Can you do it? > Introduce an ExchangeCoordinator to estimate the number of post-shuffle > partitions. > --- > > Key: SPARK-9858 > URL: https://issues.apache.org/jira/browse/SPARK-9858 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Yin Huai >Assignee: Yin Huai > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.
[ https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15049284#comment-15049284 ] Adam Roberts commented on SPARK-9858: - Yep, I added System.identityHashCode(serializer) prints in both the creation method and when it's used (both in the Exchange class) Creating new unsafe row serializer ADAMTEST. myUnsafeRowSerializer identity hash: -555078685 Creating new unsafe row serializer ADAMTEST. myUnsafeRowSerializer identity hash: 1088823803 preparing shuffle dependency ADAMTEST. In needToCopy function and serializer hash is: 1088823803 New development, on Intel (LE platform) if we take the 200 elements and print them, we get 20 rows containing (3,[0,13,5,ff00]) in a row. On our BE platforms this isn't the case, everything is (3,[0,13,5,0]) - the same as the rest of the file on Intel. This print is in DAGScheduler's submitMapStage method: val rdd = dependency.rdd rdd.take(200).foreach(println) > Introduce an ExchangeCoordinator to estimate the number of post-shuffle > partitions. > --- > > Key: SPARK-9858 > URL: https://issues.apache.org/jira/browse/SPARK-9858 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Yin Huai >Assignee: Yin Huai > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.
[ https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15047194#comment-15047194 ] Adam Roberts commented on SPARK-9858: - Several potential issues here, may well not be with this code itself though - I'm consistently encountering problems for two different big endian platforms while testing this 1) is this thread safe? I've noticed if we print the rowBuffer when using more than one thread for our SQLContext, the ordering of elements is not consistent and we sometimes have two rows printed consecutively 2) For the aggregate, join, and complex query 2 tests, I consistently receive more bytes per partition and instead of estimating (0, 2) for the indices we get (0, 2, 4). I know we're using the UnsafeRowSerializer and so wary if the issue lies here instead, I see it's using Google's ByteStreams class to read in the bytes. Specifically I have 800, 800, 800, 800, 720 bytes per partition instead of 600, 600, 600, 600, 600 3) Where do the values used in the assertions for the test suite come from? If we print the rows we see differences between the two platforms: (the 63 and 70 is on our BE platform and this value differs each time we run the test) Works perfectly on various architectures that are LE and hence the current endianness/serialization theory. Apologies if this would be better suited to the dev mailing list, although I expect I'm one of the few to be testing this on BE... > Introduce an ExchangeCoordinator to estimate the number of post-shuffle > partitions. > --- > > Key: SPARK-9858 > URL: https://issues.apache.org/jira/browse/SPARK-9858 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Yin Huai >Assignee: Yin Huai > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.
[ https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15047222#comment-15047222 ] Yin Huai commented on SPARK-9858: - [~aroberts] Thanks for your comments. For 1, can you provide more details? What is the rowBuffer you referred to? For 2 and 3, I feel the size differences are caused by the differences of platforms. In our tests, I got those numbers in assertion from my machine. Those numbers work well with jenkins. Do you have any suggestion on how we can make these tests robust to different platforms? btw, have you changed {{spark.sql.shuffle.partitions}}? > Introduce an ExchangeCoordinator to estimate the number of post-shuffle > partitions. > --- > > Key: SPARK-9858 > URL: https://issues.apache.org/jira/browse/SPARK-9858 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Yin Huai >Assignee: Yin Huai > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.
[ https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14988510#comment-14988510 ] Apache Spark commented on SPARK-9858: - User 'yhuai' has created a pull request for this issue: https://github.com/apache/spark/pull/9453 > Introduce an ExchangeCoordinator to estimate the number of post-shuffle > partitions. > --- > > Key: SPARK-9858 > URL: https://issues.apache.org/jira/browse/SPARK-9858 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Yin Huai >Assignee: Yin Huai > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.
[ https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14973782#comment-14973782 ] Apache Spark commented on SPARK-9858: - User 'yhuai' has created a pull request for this issue: https://github.com/apache/spark/pull/9276 > Introduce an ExchangeCoordinator to estimate the number of post-shuffle > partitions. > --- > > Key: SPARK-9858 > URL: https://issues.apache.org/jira/browse/SPARK-9858 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Yin Huai >Assignee: Yin Huai > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org