[jira] [Comment Edited] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.
[ https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15048636#comment-15048636 ] Adam Roberts edited comment on SPARK-9858 at 12/9/15 1:07 PM: -- Thanks for the prompt reply, rowBuffer is a variable in org.apache.spark.sql.execution.UnsafeRowSerializer within the asKeyValueIterator method. I experimented with the Exchange class, same problems are observed using the SparkSqlSeriaizer; suggesting the UnsafeRowSerializer is probably fine. I agree with your second comment, I think the code within org.apache.spark.unsafe.Platform is OK or we'd be hitting problems elsewhere. It'll be useful to determine how the values in the assertions can be determined programatically, I think the partitioning algorithm itself is working as expected but for some reason stages require more bytes on the platforms I'm using. spark.sql.shuffle.partitions is unchanged, I'm working off the latest master code. Is there something special about the aggregate, join, and complex query 2 tests? Can we print exactly what the bytes are for each stage? I know rdd.count is always correct and the DataFrames are the same (printed each row, written to json and parquet - no concerns). Potential clue: if we set SQLConf.SHUFFLE_PARTITIONS.key to 4, the aggregate test passes. I'm wondering if there's an extra factor we should take into account when determining the indices regardless of platform. was (Author: aroberts): Thanks for the prompt reply, rowBuffer is a variable in org.apache.spark.sql.execution.UnsafeRowSerializer within the asKeyValueIterator method. I experimented with the Exchange class, same problems are observed using the SparkSqlSeriaizer; suggesting the UnsafeRowSerializer is probably fine. I agree with your second comment, I think the code within org.apache.spark.unsafe.Platform is OK or we'd be hitting problems elsewhere. It'll be useful to determine how the values in the assertions can be determined programatically, I think the partitioning algorithm itself is working as expected but for some reason stages require more bytes on the platforms I'm using. spark.sql.shuffle.partitions is unchanged, I'm working off the latest master code. Is there something special about the aggregate, join, and complex query 2 tests? Can we print exactly what the bytes are for each stage? I know rdd.count is always correct and the DataFrames are the same (printed each row, written to json and parquet - no concerns). Potential clue: if we set SQLConf.SHUFFLE_PARTITIONS.key to 4, the aggregate test passes. > Introduce an ExchangeCoordinator to estimate the number of post-shuffle > partitions. > --- > > Key: SPARK-9858 > URL: https://issues.apache.org/jira/browse/SPARK-9858 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Yin Huai >Assignee: Yin Huai > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.
[ https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15048636#comment-15048636 ] Adam Roberts edited comment on SPARK-9858 at 12/9/15 2:35 PM: -- Thanks for the prompt reply, rowBuffer is a variable in org.apache.spark.sql.execution.UnsafeRowSerializer within the asKeyValueIterator method. I experimented with the Exchange class, same problems are observed using the SparkSqlSeriaizer; suggesting the UnsafeRowSerializer is probably fine. I agree with your second comment, I think the code within org.apache.spark.unsafe.Platform is OK or we'd be hitting problems elsewhere. It'll be useful to determine how the values in the assertions can be determined programatically, I think the partitioning algorithm itself is working as expected but for some reason stages require more bytes on the platforms I'm using. spark.sql.shuffle.partitions is unchanged, I'm working off the latest master code. Is there something special about the aggregate, join, and complex query 2 tests? Can we print exactly what the bytes are for each stage? I know rdd.count is always correct and the DataFrames are the same (printed each row, written to json and parquet - no concerns). Potential clue: if we set SQLConf.SHUFFLE_PARTITIONS.key to 4, the aggregate test passes. I'm wondering if there's an extra factor we should be taking into account was (Author: aroberts): Thanks for the prompt reply, rowBuffer is a variable in org.apache.spark.sql.execution.UnsafeRowSerializer within the asKeyValueIterator method. I experimented with the Exchange class, same problems are observed using the SparkSqlSeriaizer; suggesting the UnsafeRowSerializer is probably fine. I agree with your second comment, I think the code within org.apache.spark.unsafe.Platform is OK or we'd be hitting problems elsewhere. It'll be useful to determine how the values in the assertions can be determined programatically, I think the partitioning algorithm itself is working as expected but for some reason stages require more bytes on the platforms I'm using. spark.sql.shuffle.partitions is unchanged, I'm working off the latest master code. Is there something special about the aggregate, join, and complex query 2 tests? Can we print exactly what the bytes are for each stage? I know rdd.count is always correct and the DataFrames are the same (printed each row, written to json and parquet - no concerns). Potential clue: if we set SQLConf.SHUFFLE_PARTITIONS.key to 4, the aggregate test passes. I'm wondering if there's an extra factor we should take into account when determining the indices regardless of platform. > Introduce an ExchangeCoordinator to estimate the number of post-shuffle > partitions. > --- > > Key: SPARK-9858 > URL: https://issues.apache.org/jira/browse/SPARK-9858 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Yin Huai >Assignee: Yin Huai > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.
[ https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15049284#comment-15049284 ] Adam Roberts edited comment on SPARK-9858 at 12/9/15 10:24 PM: --- Yep, I added System.identityHashCode(serializer) prints in both the creation method and when it's used (both in the Exchange class) Creating new unsafe row serializer myUnsafeRowSerializer identity hash: -555078685 Creating new unsafe row serializer myUnsafeRowSerializer identity hash: 1088823803 Preparing shuffle dependency In needToCopy function and serializer hash is: 1088823803 New development, on Intel (the LE platform) if we take the 200 elements and print them, we get 20 rows containing (3,[0,13,5,ff00]) in a row. On our BE platforms this isn't the case, everything is (3,[0,13,5,0]) - the same as the rest of the RDD on Intel. I added this print in DAGScheduler's submitMapStage method: val rdd = dependency.rdd rdd.take(200).foreach(println) was (Author: aroberts): Yep, I added System.identityHashCode(serializer) prints in both the creation method and when it's used (both in the Exchange class) Creating new unsafe row serializer myUnsafeRowSerializer identity hash: -555078685 Creating new unsafe row serializer myUnsafeRowSerializer identity hash: 1088823803 Preparing shuffle dependency In needToCopy function and serializer hash is: 1088823803 New development, on Intel (LE platform) if we take the 200 elements and print them, we get 20 rows containing (3,[0,13,5,ff00]) in a row. On our BE platforms this isn't the case, everything is (3,[0,13,5,0]) - the same as the rest of the RDD on Intel. I added this print in DAGScheduler's submitMapStage method: val rdd = dependency.rdd rdd.take(200).foreach(println) > Introduce an ExchangeCoordinator to estimate the number of post-shuffle > partitions. > --- > > Key: SPARK-9858 > URL: https://issues.apache.org/jira/browse/SPARK-9858 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Yin Huai >Assignee: Yin Huai > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.
[ https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15049284#comment-15049284 ] Adam Roberts edited comment on SPARK-9858 at 12/9/15 7:53 PM: -- Yep, I added System.identityHashCode(serializer) prints in both the creation method and when it's used (both in the Exchange class) Creating new unsafe row serializer ADAMTEST. myUnsafeRowSerializer identity hash: -555078685 Creating new unsafe row serializer ADAMTEST. myUnsafeRowSerializer identity hash: 1088823803 preparing shuffle dependency ADAMTEST. In needToCopy function and serializer hash is: 1088823803 New development, on Intel (LE platform) if we take the 200 elements and print them, we get 20 rows containing (3,[0,13,5,ff00]) in a row. On our BE platforms this isn't the case, everything is (3,[0,13,5,0]) - the same as the rest of the RDD on Intel. I added this print in DAGScheduler's submitMapStage method: val rdd = dependency.rdd rdd.take(200).foreach(println) was (Author: aroberts): Yep, I added System.identityHashCode(serializer) prints in both the creation method and when it's used (both in the Exchange class) Creating new unsafe row serializer ADAMTEST. myUnsafeRowSerializer identity hash: -555078685 Creating new unsafe row serializer ADAMTEST. myUnsafeRowSerializer identity hash: 1088823803 preparing shuffle dependency ADAMTEST. In needToCopy function and serializer hash is: 1088823803 New development, on Intel (LE platform) if we take the 200 elements and print them, we get 20 rows containing (3,[0,13,5,ff00]) in a row. On our BE platforms this isn't the case, everything is (3,[0,13,5,0]) - the same as the rest of the file on Intel. This print is in DAGScheduler's submitMapStage method: val rdd = dependency.rdd rdd.take(200).foreach(println) > Introduce an ExchangeCoordinator to estimate the number of post-shuffle > partitions. > --- > > Key: SPARK-9858 > URL: https://issues.apache.org/jira/browse/SPARK-9858 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Yin Huai >Assignee: Yin Huai > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.
[ https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15049284#comment-15049284 ] Adam Roberts edited comment on SPARK-9858 at 12/9/15 7:53 PM: -- Yep, I added System.identityHashCode(serializer) prints in both the creation method and when it's used (both in the Exchange class) Creating new unsafe row serializer myUnsafeRowSerializer identity hash: -555078685 Creating new unsafe row serializer myUnsafeRowSerializer identity hash: 1088823803 Preparing shuffle dependency In needToCopy function and serializer hash is: 1088823803 New development, on Intel (LE platform) if we take the 200 elements and print them, we get 20 rows containing (3,[0,13,5,ff00]) in a row. On our BE platforms this isn't the case, everything is (3,[0,13,5,0]) - the same as the rest of the RDD on Intel. I added this print in DAGScheduler's submitMapStage method: val rdd = dependency.rdd rdd.take(200).foreach(println) was (Author: aroberts): Yep, I added System.identityHashCode(serializer) prints in both the creation method and when it's used (both in the Exchange class) Creating new unsafe row serializer ADAMTEST. myUnsafeRowSerializer identity hash: -555078685 Creating new unsafe row serializer ADAMTEST. myUnsafeRowSerializer identity hash: 1088823803 preparing shuffle dependency ADAMTEST. In needToCopy function and serializer hash is: 1088823803 New development, on Intel (LE platform) if we take the 200 elements and print them, we get 20 rows containing (3,[0,13,5,ff00]) in a row. On our BE platforms this isn't the case, everything is (3,[0,13,5,0]) - the same as the rest of the RDD on Intel. I added this print in DAGScheduler's submitMapStage method: val rdd = dependency.rdd rdd.take(200).foreach(println) > Introduce an ExchangeCoordinator to estimate the number of post-shuffle > partitions. > --- > > Key: SPARK-9858 > URL: https://issues.apache.org/jira/browse/SPARK-9858 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Yin Huai >Assignee: Yin Huai > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.
[ https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15047194#comment-15047194 ] Adam Roberts edited comment on SPARK-9858 at 12/8/15 6:33 PM: -- Several potential issues here, may well not be with this code itself though - I'm consistently encountering problems for two different big endian platforms while testing this 1) is this thread safe? I've noticed if we print the rowBuffer when using more than one thread for our SQLContext, the ordering of elements is not consistent and we sometimes have two rows printed consecutively 2) For the aggregate, join, and complex query 2 tests, I consistently receive more bytes per partition (or should that be per stage?) and instead of estimating (0, 2) for the indices we get (0, 2, 4). I know we're using the UnsafeRowSerializer and so wary if the issue lies here instead, I see it's using Google's ByteStreams class to read in the bytes. Specifically I have 800, 800, 800, 800, 720 bytes per partition instead of 600, 600, 600, 600, 600. Is there a way I can print what these bytes are? 3) Where do the values used in the assertions for the test suite come from? If we print the rows we see differences between the two platforms: (the 63 and 70 is on our BE platform and this value differs each time we run the test) Works perfectly on various architectures that are LE and hence the current endianness/serialization theory. This might be better suited to the dev mailing list although I expect I'm one of the few to be testing this on BE. It occurs regardless of Java vendor and whether we're running in interpreted mode or not. was (Author: aroberts): Several potential issues here, may well not be with this code itself though - I'm consistently encountering problems for two different big endian platforms while testing this 1) is this thread safe? I've noticed if we print the rowBuffer when using more than one thread for our SQLContext, the ordering of elements is not consistent and we sometimes have two rows printed consecutively 2) For the aggregate, join, and complex query 2 tests, I consistently receive more bytes per partition (or should that be per stage?) and instead of estimating (0, 2) for the indices we get (0, 2, 4). I know we're using the UnsafeRowSerializer and so wary if the issue lies here instead, I see it's using Google's ByteStreams class to read in the bytes. Specifically I have 800, 800, 800, 800, 720 bytes per partition instead of 600, 600, 600, 600, 600. Is there a way I can print what these bytes are? 3) Where do the values used in the assertions for the test suite come from? If we print the rows we see differences between the two platforms: (the 63 and 70 is on our BE platform and this value differs each time we run the test) Works perfectly on various architectures that are LE and hence the current endianness/serialization theory. Apologies if this would be better suited to the dev mailing list, although I expect I'm one of the few to be testing this on BE... > Introduce an ExchangeCoordinator to estimate the number of post-shuffle > partitions. > --- > > Key: SPARK-9858 > URL: https://issues.apache.org/jira/browse/SPARK-9858 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Yin Huai >Assignee: Yin Huai > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-9858) Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.
[ https://issues.apache.org/jira/browse/SPARK-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15047194#comment-15047194 ] Adam Roberts edited comment on SPARK-9858 at 12/8/15 6:28 PM: -- Several potential issues here, may well not be with this code itself though - I'm consistently encountering problems for two different big endian platforms while testing this 1) is this thread safe? I've noticed if we print the rowBuffer when using more than one thread for our SQLContext, the ordering of elements is not consistent and we sometimes have two rows printed consecutively 2) For the aggregate, join, and complex query 2 tests, I consistently receive more bytes per partition (or should that be per stage?) and instead of estimating (0, 2) for the indices we get (0, 2, 4). I know we're using the UnsafeRowSerializer and so wary if the issue lies here instead, I see it's using Google's ByteStreams class to read in the bytes. Specifically I have 800, 800, 800, 800, 720 bytes per partition instead of 600, 600, 600, 600, 600. Is there a way I can print what these bytes are? 3) Where do the values used in the assertions for the test suite come from? If we print the rows we see differences between the two platforms: (the 63 and 70 is on our BE platform and this value differs each time we run the test) Works perfectly on various architectures that are LE and hence the current endianness/serialization theory. Apologies if this would be better suited to the dev mailing list, although I expect I'm one of the few to be testing this on BE... was (Author: aroberts): Several potential issues here, may well not be with this code itself though - I'm consistently encountering problems for two different big endian platforms while testing this 1) is this thread safe? I've noticed if we print the rowBuffer when using more than one thread for our SQLContext, the ordering of elements is not consistent and we sometimes have two rows printed consecutively 2) For the aggregate, join, and complex query 2 tests, I consistently receive more bytes per partition and instead of estimating (0, 2) for the indices we get (0, 2, 4). I know we're using the UnsafeRowSerializer and so wary if the issue lies here instead, I see it's using Google's ByteStreams class to read in the bytes. Specifically I have 800, 800, 800, 800, 720 bytes per partition instead of 600, 600, 600, 600, 600 3) Where do the values used in the assertions for the test suite come from? If we print the rows we see differences between the two platforms: (the 63 and 70 is on our BE platform and this value differs each time we run the test) Works perfectly on various architectures that are LE and hence the current endianness/serialization theory. Apologies if this would be better suited to the dev mailing list, although I expect I'm one of the few to be testing this on BE... > Introduce an ExchangeCoordinator to estimate the number of post-shuffle > partitions. > --- > > Key: SPARK-9858 > URL: https://issues.apache.org/jira/browse/SPARK-9858 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Yin Huai >Assignee: Yin Huai > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org