[jira] [Comment Edited] (SPARK-23989) When using `SortShuffleWriter`, the data will be overwritten

2018-04-18 Thread Wenchen Fan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16442568#comment-16442568
 ] 

Wenchen Fan edited comment on SPARK-23989 at 4/18/18 2:47 PM:
--

OK now I see the problem, `ShuffleExchangeExec.needToCopyObjectsBeforeShuffle` 
doesn't catch all the cases, so we may produce wrong result.


was (Author: cloud_fan):
OK now I see the problem, `ShuffleExchangeExec.needToCopyObjectsBeforeShuffle` 
doesn't catch all the cases, so we may reproduce wrong result.

> When using `SortShuffleWriter`, the data will be overwritten
> 
>
> Key: SPARK-23989
> URL: https://issues.apache.org/jira/browse/SPARK-23989
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: liuxian
>Priority: Critical
>
> {color:#33}When using `SortShuffleWriter`, we only insert  
> '{color}{color:#cc7832}AnyRef{color}{color:#33}' into 
> '{color}PartitionedAppendOnlyMap{color:#33}' or 
> '{color}PartitionedPairBuffer{color:#33}'.{color}
> {color:#33}For this function:{color}
> {color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: 
> {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832},
>  {color}{color:#4e807d}V{color}]])
> the value of 'records' is `UnsafeRow`, so  the value will be overwritten
> {color:#33} {color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23989) When using `SortShuffleWriter`, the data will be overwritten

2018-04-18 Thread liuxian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16442176#comment-16442176
 ] 

liuxian edited comment on SPARK-23989 at 4/18/18 9:21 AM:
--

test({color:#6a8759}"groupBy"{color}) {
 {color:#808080} spark.conf.set("spark.sql.shuffle.partitions", 16777217){color}

{color:#cc7832}val {color}df1 = 
{color:#9876aa}Seq{color}(({color:#6a8759}"a"{color}{color:#cc7832}, 
{color}{color:#6897bb}1{color}{color:#cc7832}, 
{color}{color:#6897bb}0{color}{color:#cc7832}, 
{color}{color:#6a8759}"b"{color}){color:#cc7832}, 
{color}({color:#6a8759}"b"{color}{color:#cc7832}, 
{color}{color:#6897bb}2{color}{color:#cc7832}, 
{color}{color:#6897bb}4{color}{color:#cc7832}, 
{color}{color:#6a8759}"c"{color}){color:#cc7832}, 
{color}({color:#6a8759}"a"{color}{color:#cc7832}, 
{color}{color:#6897bb}2{color}{color:#cc7832}, 
{color}{color:#6897bb}3{color}{color:#cc7832}, 
{color}{color:#6a8759}"d"{color}))
 .toDF({color:#6a8759}"key"{color}{color:#cc7832}, 
{color}{color:#6a8759}"value1"{color}{color:#cc7832}, 
{color}{color:#6a8759}"value2"{color}{color:#cc7832}, 
{color}{color:#6a8759}"rest"{color})

checkAnswer(
 
df1.groupBy({color:#6a8759}"key"{color}).min({color:#6a8759}"value2"{color}){color:#cc7832},{color}
 {color:#9876aa}Seq{color}(Row({color:#6a8759}"a"{color}{color:#cc7832}, 
{color}{color:#6897bb}0{color}){color:#cc7832}, 
{color}Row({color:#6a8759}"b"{color}{color:#cc7832}, 
{color}{color:#6897bb}4{color}))
 )
 }

Because the number of partitions is too large, it will run for a long time.

The number of partitions is so large that the purpose is to go 
`SortShuffleWriter`

 


was (Author: 10110346):
test({color:#6a8759}"groupBy"{color}) {
{color:#808080} spark.conf.set("spark.sql.shuffle.partitions", 
16777217){color}{color:#808080}
{color} {color:#cc7832}val {color}df1 = 
{color:#9876aa}Seq{color}(({color:#6a8759}"a"{color}{color:#cc7832}, 
{color}{color:#6897bb}1{color}{color:#cc7832}, 
{color}{color:#6897bb}0{color}{color:#cc7832}, 
{color}{color:#6a8759}"b"{color}){color:#cc7832}, 
{color}({color:#6a8759}"b"{color}{color:#cc7832}, 
{color}{color:#6897bb}2{color}{color:#cc7832}, 
{color}{color:#6897bb}4{color}{color:#cc7832}, 
{color}{color:#6a8759}"c"{color}){color:#cc7832}, 
{color}({color:#6a8759}"a"{color}{color:#cc7832}, 
{color}{color:#6897bb}2{color}{color:#cc7832}, 
{color}{color:#6897bb}3{color}{color:#cc7832}, 
{color}{color:#6a8759}"d"{color}))
 .toDF({color:#6a8759}"key"{color}{color:#cc7832}, 
{color}{color:#6a8759}"value1"{color}{color:#cc7832}, 
{color}{color:#6a8759}"value2"{color}{color:#cc7832}, 
{color}{color:#6a8759}"rest"{color})

 checkAnswer(
 
df1.groupBy({color:#6a8759}"key"{color}).min({color:#6a8759}"value2"{color}){color:#cc7832},
{color} {color:#9876aa}Seq{color}(Row({color:#6a8759}"a"{color}{color:#cc7832}, 
{color}{color:#6897bb}0{color}){color:#cc7832}, 
{color}Row({color:#6a8759}"b"{color}{color:#cc7832}, 
{color}{color:#6897bb}4{color}))
 )
 }

Because the number of partitions is too large, it will run for a long time.

The number of partitions is so large that the purpose is to go 
`SortShuffleWriter`

 

> When using `SortShuffleWriter`, the data will be overwritten
> 
>
> Key: SPARK-23989
> URL: https://issues.apache.org/jira/browse/SPARK-23989
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: liuxian
>Priority: Critical
>
> {color:#33}When using `SortShuffleWriter`, we only insert  
> '{color}{color:#cc7832}AnyRef{color}{color:#33}' into 
> '{color}PartitionedAppendOnlyMap{color:#33}' or 
> '{color}PartitionedPairBuffer{color:#33}'.{color}
> {color:#33}For this function:{color}
> {color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: 
> {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832},
>  {color}{color:#4e807d}V{color}]])
> the value of 'records' is `UnsafeRow`, so  the value will be overwritten
> {color:#33} {color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23989) When using `SortShuffleWriter`, the data will be overwritten

2018-04-18 Thread liuxian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16441952#comment-16441952
 ] 

liuxian edited comment on SPARK-23989 at 4/18/18 6:21 AM:
--

1.  Make 'BypassMergeSortShuffleHandle' and 'SerializedShuffleHandle' disable

{color:#cc7832}override def 
{color}{color:#ffc66d}registerShuffle{color}[{color:#4e807d}K{color}{color:#cc7832},
 {color}{color:#4e807d}V{color}{color:#cc7832}, {color}{color:#4e807d}C{color}](
 shuffleId: {color:#cc7832}Int,{color} numMaps: {color:#cc7832}Int,{color} 
dependency: ShuffleDependency[{color:#4e807d}K{color}{color:#cc7832}, 
{color}{color:#4e807d}V{color}{color:#cc7832}, 
{color}{color:#4e807d}C{color}]): ShuffleHandle = {
 {color:#cc7832}if 
{color}(SortShuffleWriter.shouldBypassMergeSort(conf{color:#cc7832}, 
{color}dependency){color:#14892c} && false {color}) {
 {color:#808080}// If there are fewer than 
spark.shuffle.sort.bypassMergeThreshold partitions and we 
don't{color}{color:#808080} // need map-side aggregation, then write 
numPartitions files directly and just concatenate{color}{color:#808080} // them 
at the end. This avoids doing serialization and deserialization twice to 
merge{color}{color:#808080} // together the spilled files, which would happen 
with the normal code path. The downside is{color}{color:#808080} // having 
multiple files open at a time and thus more memory allocated to buffers.{color} 
{color:#cc7832}new 
{color}BypassMergeSortShuffleHandle[{color:#4e807d}K{color}{color:#cc7832}, 
{color}{color:#4e807d}V{color}](
 shuffleId{color:#cc7832}, {color}numMaps{color:#cc7832}, 
{color}dependency.asInstanceOf[ShuffleDependency[{color:#4e807d}K{color}{color:#cc7832},
 {color}{color:#4e807d}V{color}{color:#cc7832}, 
{color}{color:#4e807d}V{color}]])
 } {color:#cc7832}else if 
{color}(SortShuffleManager.canUseSerializedShuffle(dependency) 
{color:#14892c}&& false{color}) {
 {color:#808080}// Otherwise, try to buffer map outputs in a serialized form, 
since this is more efficient:{color} {color:#cc7832}new 
{color}SerializedShuffleHandle[{color:#4e807d}K{color}{color:#cc7832}, 
{color}{color:#4e807d}V{color}](
 shuffleId{color:#cc7832}, {color}numMaps{color:#cc7832}, 
{color}dependency.asInstanceOf[ShuffleDependency[{color:#4e807d}K{color}{color:#cc7832},
 {color}{color:#4e807d}V{color}{color:#cc7832}, 
{color}{color:#4e807d}V{color}]])
 } {color:#cc7832}else {color}{
 {color:#808080}// Otherwise, buffer map outputs in a deserialized form:{color} 
{color:#cc7832}new {color}BaseShuffleHandle(shuffleId{color:#cc7832}, 
{color}numMaps{color:#cc7832}, {color}dependency)
 }
 }

 

2. Run this unit test in 'DataFrameAggregateSuite.scala'

test({color:#6a8759}"SPARK-21580 ints in aggregation expressions are taken as 
group-by ordinal."{color})

3.  I have been debugging in IDEA, grab this information:

{{ _buffer = \{PartitionedPairBuffer@9817}_ }}
 {{ _capacity = 64_}}
 {{ _curSize = 2_}}
 {{ _data = {Object[128]@9832}_ }}
 {{  _0 = \{Tuple2@9834} "(3,3)"_}}
 {{  {color:#14892c}_1 = \{UnsafeRow@9835} "[0,2,2]"_{color}}}
 {{  _2 = \{Tuple2@9841} "(4,4)"_}}
 {{  _{color:#14892c}3 = \{UnsafeRow@9835} "[0,2,2]"{color}_}}

 

 


was (Author: 10110346):
1.  Make 'BypassMergeSortShuffleHandle' and 'SerializedShuffleHandle' disable

{color:#cc7832}override def 
{color}{color:#ffc66d}registerShuffle{color}[{color:#4e807d}K{color}{color:#cc7832},
 {color}{color:#4e807d}V{color}{color:#cc7832}, {color}{color:#4e807d}C{color}](
 shuffleId: {color:#cc7832}Int,
{color} numMaps: {color:#cc7832}Int,
{color} dependency: ShuffleDependency[{color:#4e807d}K{color}{color:#cc7832}, 
{color}{color:#4e807d}V{color}{color:#cc7832}, 
{color}{color:#4e807d}C{color}]): ShuffleHandle = {
 {color:#cc7832}if 
{color}(SortShuffleWriter.shouldBypassMergeSort(conf{color:#cc7832}, 
{color}dependency){color:#14892c} && false {color}) {
 {color:#808080}// If there are fewer than 
spark.shuffle.sort.bypassMergeThreshold partitions and we don't
{color}{color:#808080} // need map-side aggregation, then write numPartitions 
files directly and just concatenate
{color}{color:#808080} // them at the end. This avoids doing serialization and 
deserialization twice to merge
{color}{color:#808080} // together the spilled files, which would happen with 
the normal code path. The downside is
{color}{color:#808080} // having multiple files open at a time and thus more 
memory allocated to buffers.
{color} {color:#cc7832}new 
{color}BypassMergeSortShuffleHandle[{color:#4e807d}K{color}{color:#cc7832}, 
{color}{color:#4e807d}V{color}](
 shuffleId{color:#cc7832}, {color}numMaps{color:#cc7832}, 
{color}dependency.asInstanceOf[ShuffleDependency[{color:#4e807d}K{color}{color:#cc7832},
 {color}{color:#4e807d}V{color}{color:#cc7832}, 
{color}{color:#4e807d}V{color}]])
 } {color:#cc7832}else if 
{color}(SortShuffleManager.canUseSerializedShuffle(dependency) 
{color:#14892c}&& 

[jira] [Comment Edited] (SPARK-23989) When using `SortShuffleWriter`, the data will be overwritten

2018-04-16 Thread liuxian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16439231#comment-16439231
 ] 

liuxian edited comment on SPARK-23989 at 4/16/18 10:18 AM:
---

For {color:#33}`SortShuffleWriter`{color},  `records: 
{color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832}, 
{color}{color:#4e807d}V{color}]]` is key-value pair, but the value is 
'UnsafeRow' type.

For example ,we insert the first record  {color:#33}into 
`PartitionedPairBuffer`, we only save the  'AnyRef{color}',   but the 
{color:#33} 'AnyRef{color}'  of  next  record(only value, not key)  is same 
as the first record  , so the first record  is overwritten.


was (Author: 10110346):
For {color:#33}`SortShuffleWriter`{color},  `records: 
{color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832}, 
{color}{color:#4e807d}V{color}]]` is key-value pair, but the value is 
'UnsafeRow' type.

For example ,we insert the first record  {color:#33}into 
`PartitionedPairBuffer`, we only save the  '{color:#cc7832}AnyRef{color}',   
but the {color:#33} '{color:#cc7832}AnyRef{color}'{color}  of  next  
{color}record(only value, not key)  is same as the first record  , so the first 
record  is overwritten.
h1. overwritten

> When using `SortShuffleWriter`, the data will be overwritten
> 
>
> Key: SPARK-23989
> URL: https://issues.apache.org/jira/browse/SPARK-23989
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: liuxian
>Priority: Critical
>
> {color:#33}When using `SortShuffleWriter`, we only insert  
> '{color}{color:#cc7832}AnyRef{color}{color:#33}' into 
> '{color}PartitionedAppendOnlyMap{color:#33}' or 
> '{color}PartitionedPairBuffer{color:#33}'.{color}
> {color:#33}For this function:{color}
> {color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: 
> {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832},
>  {color}{color:#4e807d}V{color}]])
> the value of 'records' is `UnsafeRow`, so  the value will be overwritten
> {color:#33} {color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23989) When using `SortShuffleWriter`, the data will be overwritten

2018-04-16 Thread liuxian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16439148#comment-16439148
 ] 

liuxian edited comment on SPARK-23989 at 4/16/18 9:00 AM:
--

[~joshrosen] [~cloud_fan]


was (Author: 10110346):
[~joshrosen]

> When using `SortShuffleWriter`, the data will be overwritten
> 
>
> Key: SPARK-23989
> URL: https://issues.apache.org/jira/browse/SPARK-23989
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: liuxian
>Priority: Critical
>
> {color:#33}When using `SortShuffleWriter`, we only insert  
> '{color}{color:#cc7832}AnyRef{color}{color:#33}' into 
> '{color}PartitionedAppendOnlyMap{color:#33}' or 
> '{color}PartitionedPairBuffer{color:#33}'.{color}
> {color:#33}For this function:{color}
> {color:#cc7832}override def {color}{color:#ffc66d}write{color}(records: 
> {color:#4e807d}Iterator{color}[Product2[{color:#4e807d}K{color}{color:#cc7832},
>  {color}{color:#4e807d}V{color}]])
> the value of 'records' is `UnsafeRow`, so  the value will be overwritten
> {color:#33} {color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org