[jira] [Comment Edited] (SPARK-23989) When using `SortShuffleWriter`, the data will be overwritten
[ https://issues.apache.org/jira/browse/SPARK-23989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16442568#comment-16442568 ] Wenchen Fan edited comment on SPARK-23989 at 4/18/18 2:47 PM: -- OK now I see the problem, `ShuffleExchangeExec.needToCopyObjectsBeforeShuffle` doesn't catch all the cases, so we may produce wrong result. was (Author: cloud_fan): OK now I see the problem, `ShuffleExchangeExec.needToCopyObjectsBeforeShuffle` doesn't catch all the cases, so we may reproduce wrong result. > When using `SortShuffleWriter`, the data will be overwritten > > > Key: SPARK-23989 > URL: https://issues.apache.org/jira/browse/SPARK-23989 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: liuxian >Priority: Critical > > {color:#33}When using `SortShuffleWriter`, we only insert > '{color}{color:#cc7832}AnyRef{color}{color:#33}' into > '{color}PartitionedAppendOnlyMap{color:#33}' or > '{color}PartitionedPairBuffer{color:#33}'.{color} > {color:#33}For this function:{color} > {color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: > {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832}, > {color}{color:#4e807d}V{color}]]) > the value of 'records' is `UnsafeRow`, so the value will be overwritten > {color:#33} {color} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23989) When using `SortShuffleWriter`, the data will be overwritten
[ https://issues.apache.org/jira/browse/SPARK-23989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16442176#comment-16442176 ] liuxian edited comment on SPARK-23989 at 4/18/18 9:21 AM: -- test({color:#6a8759}"groupBy"{color}) { {color:#808080} spark.conf.set("spark.sql.shuffle.partitions", 16777217){color} {color:#cc7832}val {color}df1 = {color:#9876aa}Seq{color}(({color:#6a8759}"a"{color}{color:#cc7832}, {color}{color:#6897bb}1{color}{color:#cc7832}, {color}{color:#6897bb}0{color}{color:#cc7832}, {color}{color:#6a8759}"b"{color}){color:#cc7832}, {color}({color:#6a8759}"b"{color}{color:#cc7832}, {color}{color:#6897bb}2{color}{color:#cc7832}, {color}{color:#6897bb}4{color}{color:#cc7832}, {color}{color:#6a8759}"c"{color}){color:#cc7832}, {color}({color:#6a8759}"a"{color}{color:#cc7832}, {color}{color:#6897bb}2{color}{color:#cc7832}, {color}{color:#6897bb}3{color}{color:#cc7832}, {color}{color:#6a8759}"d"{color})) .toDF({color:#6a8759}"key"{color}{color:#cc7832}, {color}{color:#6a8759}"value1"{color}{color:#cc7832}, {color}{color:#6a8759}"value2"{color}{color:#cc7832}, {color}{color:#6a8759}"rest"{color}) checkAnswer( df1.groupBy({color:#6a8759}"key"{color}).min({color:#6a8759}"value2"{color}){color:#cc7832},{color} {color:#9876aa}Seq{color}(Row({color:#6a8759}"a"{color}{color:#cc7832}, {color}{color:#6897bb}0{color}){color:#cc7832}, {color}Row({color:#6a8759}"b"{color}{color:#cc7832}, {color}{color:#6897bb}4{color})) ) } Because the number of partitions is too large, it will run for a long time. The number of partitions is so large that the purpose is to go `SortShuffleWriter` was (Author: 10110346): test({color:#6a8759}"groupBy"{color}) { {color:#808080} spark.conf.set("spark.sql.shuffle.partitions", 16777217){color}{color:#808080} {color} {color:#cc7832}val {color}df1 = {color:#9876aa}Seq{color}(({color:#6a8759}"a"{color}{color:#cc7832}, {color}{color:#6897bb}1{color}{color:#cc7832}, {color}{color:#6897bb}0{color}{color:#cc7832}, {color}{color:#6a8759}"b"{color}){color:#cc7832}, {color}({color:#6a8759}"b"{color}{color:#cc7832}, {color}{color:#6897bb}2{color}{color:#cc7832}, {color}{color:#6897bb}4{color}{color:#cc7832}, {color}{color:#6a8759}"c"{color}){color:#cc7832}, {color}({color:#6a8759}"a"{color}{color:#cc7832}, {color}{color:#6897bb}2{color}{color:#cc7832}, {color}{color:#6897bb}3{color}{color:#cc7832}, {color}{color:#6a8759}"d"{color})) .toDF({color:#6a8759}"key"{color}{color:#cc7832}, {color}{color:#6a8759}"value1"{color}{color:#cc7832}, {color}{color:#6a8759}"value2"{color}{color:#cc7832}, {color}{color:#6a8759}"rest"{color}) checkAnswer( df1.groupBy({color:#6a8759}"key"{color}).min({color:#6a8759}"value2"{color}){color:#cc7832}, {color} {color:#9876aa}Seq{color}(Row({color:#6a8759}"a"{color}{color:#cc7832}, {color}{color:#6897bb}0{color}){color:#cc7832}, {color}Row({color:#6a8759}"b"{color}{color:#cc7832}, {color}{color:#6897bb}4{color})) ) } Because the number of partitions is too large, it will run for a long time. The number of partitions is so large that the purpose is to go `SortShuffleWriter` > When using `SortShuffleWriter`, the data will be overwritten > > > Key: SPARK-23989 > URL: https://issues.apache.org/jira/browse/SPARK-23989 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: liuxian >Priority: Critical > > {color:#33}When using `SortShuffleWriter`, we only insert > '{color}{color:#cc7832}AnyRef{color}{color:#33}' into > '{color}PartitionedAppendOnlyMap{color:#33}' or > '{color}PartitionedPairBuffer{color:#33}'.{color} > {color:#33}For this function:{color} > {color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: > {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832}, > {color}{color:#4e807d}V{color}]]) > the value of 'records' is `UnsafeRow`, so the value will be overwritten > {color:#33} {color} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23989) When using `SortShuffleWriter`, the data will be overwritten
[ https://issues.apache.org/jira/browse/SPARK-23989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16441952#comment-16441952 ] liuxian edited comment on SPARK-23989 at 4/18/18 6:21 AM: -- 1. Make 'BypassMergeSortShuffleHandle' and 'SerializedShuffleHandle' disable {color:#cc7832}override def {color}{color:#ffc66d}registerShuffle{color}[{color:#4e807d}K{color}{color:#cc7832}, {color}{color:#4e807d}V{color}{color:#cc7832}, {color}{color:#4e807d}C{color}]( shuffleId: {color:#cc7832}Int,{color} numMaps: {color:#cc7832}Int,{color} dependency: ShuffleDependency[{color:#4e807d}K{color}{color:#cc7832}, {color}{color:#4e807d}V{color}{color:#cc7832}, {color}{color:#4e807d}C{color}]): ShuffleHandle = { {color:#cc7832}if {color}(SortShuffleWriter.shouldBypassMergeSort(conf{color:#cc7832}, {color}dependency){color:#14892c} && false {color}) { {color:#808080}// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't{color}{color:#808080} // need map-side aggregation, then write numPartitions files directly and just concatenate{color}{color:#808080} // them at the end. This avoids doing serialization and deserialization twice to merge{color}{color:#808080} // together the spilled files, which would happen with the normal code path. The downside is{color}{color:#808080} // having multiple files open at a time and thus more memory allocated to buffers.{color} {color:#cc7832}new {color}BypassMergeSortShuffleHandle[{color:#4e807d}K{color}{color:#cc7832}, {color}{color:#4e807d}V{color}]( shuffleId{color:#cc7832}, {color}numMaps{color:#cc7832}, {color}dependency.asInstanceOf[ShuffleDependency[{color:#4e807d}K{color}{color:#cc7832}, {color}{color:#4e807d}V{color}{color:#cc7832}, {color}{color:#4e807d}V{color}]]) } {color:#cc7832}else if {color}(SortShuffleManager.canUseSerializedShuffle(dependency) {color:#14892c}&& false{color}) { {color:#808080}// Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:{color} {color:#cc7832}new {color}SerializedShuffleHandle[{color:#4e807d}K{color}{color:#cc7832}, {color}{color:#4e807d}V{color}]( shuffleId{color:#cc7832}, {color}numMaps{color:#cc7832}, {color}dependency.asInstanceOf[ShuffleDependency[{color:#4e807d}K{color}{color:#cc7832}, {color}{color:#4e807d}V{color}{color:#cc7832}, {color}{color:#4e807d}V{color}]]) } {color:#cc7832}else {color}{ {color:#808080}// Otherwise, buffer map outputs in a deserialized form:{color} {color:#cc7832}new {color}BaseShuffleHandle(shuffleId{color:#cc7832}, {color}numMaps{color:#cc7832}, {color}dependency) } } 2. Run this unit test in 'DataFrameAggregateSuite.scala' test({color:#6a8759}"SPARK-21580 ints in aggregation expressions are taken as group-by ordinal."{color}) 3. I have been debugging in IDEA, grab this information: {{ _buffer = \{PartitionedPairBuffer@9817}_ }} {{ _capacity = 64_}} {{ _curSize = 2_}} {{ _data = {Object[128]@9832}_ }} {{ _0 = \{Tuple2@9834} "(3,3)"_}} {{ {color:#14892c}_1 = \{UnsafeRow@9835} "[0,2,2]"_{color}}} {{ _2 = \{Tuple2@9841} "(4,4)"_}} {{ _{color:#14892c}3 = \{UnsafeRow@9835} "[0,2,2]"{color}_}} was (Author: 10110346): 1. Make 'BypassMergeSortShuffleHandle' and 'SerializedShuffleHandle' disable {color:#cc7832}override def {color}{color:#ffc66d}registerShuffle{color}[{color:#4e807d}K{color}{color:#cc7832}, {color}{color:#4e807d}V{color}{color:#cc7832}, {color}{color:#4e807d}C{color}]( shuffleId: {color:#cc7832}Int, {color} numMaps: {color:#cc7832}Int, {color} dependency: ShuffleDependency[{color:#4e807d}K{color}{color:#cc7832}, {color}{color:#4e807d}V{color}{color:#cc7832}, {color}{color:#4e807d}C{color}]): ShuffleHandle = { {color:#cc7832}if {color}(SortShuffleWriter.shouldBypassMergeSort(conf{color:#cc7832}, {color}dependency){color:#14892c} && false {color}) { {color:#808080}// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't {color}{color:#808080} // need map-side aggregation, then write numPartitions files directly and just concatenate {color}{color:#808080} // them at the end. This avoids doing serialization and deserialization twice to merge {color}{color:#808080} // together the spilled files, which would happen with the normal code path. The downside is {color}{color:#808080} // having multiple files open at a time and thus more memory allocated to buffers. {color} {color:#cc7832}new {color}BypassMergeSortShuffleHandle[{color:#4e807d}K{color}{color:#cc7832}, {color}{color:#4e807d}V{color}]( shuffleId{color:#cc7832}, {color}numMaps{color:#cc7832}, {color}dependency.asInstanceOf[ShuffleDependency[{color:#4e807d}K{color}{color:#cc7832}, {color}{color:#4e807d}V{color}{color:#cc7832}, {color}{color:#4e807d}V{color}]]) } {color:#cc7832}else if {color}(SortShuffleManager.canUseSerializedShuffle(dependency) {color:#14892c}&&
[jira] [Comment Edited] (SPARK-23989) When using `SortShuffleWriter`, the data will be overwritten
[ https://issues.apache.org/jira/browse/SPARK-23989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16439231#comment-16439231 ] liuxian edited comment on SPARK-23989 at 4/16/18 10:18 AM: --- For {color:#33}`SortShuffleWriter`{color}, `records: {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832}, {color}{color:#4e807d}V{color}]]` is key-value pair, but the value is 'UnsafeRow' type. For example ,we insert the first record {color:#33}into `PartitionedPairBuffer`, we only save the 'AnyRef{color}', but the {color:#33} 'AnyRef{color}' of next record(only value, not key) is same as the first record , so the first record is overwritten. was (Author: 10110346): For {color:#33}`SortShuffleWriter`{color}, `records: {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832}, {color}{color:#4e807d}V{color}]]` is key-value pair, but the value is 'UnsafeRow' type. For example ,we insert the first record {color:#33}into `PartitionedPairBuffer`, we only save the '{color:#cc7832}AnyRef{color}', but the {color:#33} '{color:#cc7832}AnyRef{color}'{color} of next {color}record(only value, not key) is same as the first record , so the first record is overwritten. h1. overwritten > When using `SortShuffleWriter`, the data will be overwritten > > > Key: SPARK-23989 > URL: https://issues.apache.org/jira/browse/SPARK-23989 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: liuxian >Priority: Critical > > {color:#33}When using `SortShuffleWriter`, we only insert > '{color}{color:#cc7832}AnyRef{color}{color:#33}' into > '{color}PartitionedAppendOnlyMap{color:#33}' or > '{color}PartitionedPairBuffer{color:#33}'.{color} > {color:#33}For this function:{color} > {color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: > {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832}, > {color}{color:#4e807d}V{color}]]) > the value of 'records' is `UnsafeRow`, so the value will be overwritten > {color:#33} {color} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23989) When using `SortShuffleWriter`, the data will be overwritten
[ https://issues.apache.org/jira/browse/SPARK-23989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16439148#comment-16439148 ] liuxian edited comment on SPARK-23989 at 4/16/18 9:00 AM: -- [~joshrosen] [~cloud_fan] was (Author: 10110346): [~joshrosen] > When using `SortShuffleWriter`, the data will be overwritten > > > Key: SPARK-23989 > URL: https://issues.apache.org/jira/browse/SPARK-23989 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: liuxian >Priority: Critical > > {color:#33}When using `SortShuffleWriter`, we only insert > '{color}{color:#cc7832}AnyRef{color}{color:#33}' into > '{color}PartitionedAppendOnlyMap{color:#33}' or > '{color}PartitionedPairBuffer{color:#33}'.{color} > {color:#33}For this function:{color} > {color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: > {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832}, > {color}{color:#4e807d}V{color}]]) > the value of 'records' is `UnsafeRow`, so the value will be overwritten > {color:#33} {color} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org