sunchao commented on a change in pull request #29654:
URL: https://github.com/apache/spark/pull/29654#discussion_r487263461



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
##########
@@ -195,16 +193,16 @@ private[columnar] case object RunLengthEncoding extends 
CompressionScheme {
       val actualSize = columnType.actualSize(row, ordinal)
       _uncompressedSize += actualSize
 
-      if (lastValue.isNullAt(0)) {
-        columnType.copyField(row, ordinal, lastValue, 0)
+      if (lastValue == null) {
+        lastValue = columnType.clone(columnType.getField(row, ordinal))

Review comment:
       @kiszk Yes it is because `UTF8String`. We reuse `InternalRow` during 
each iteration and since `UTF8String` is backed by the same memory region in 
the `InternalRow`, it will be updated each time as we load a new row, which is 
not correct.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
##########
@@ -195,16 +193,16 @@ private[columnar] case object RunLengthEncoding extends 
CompressionScheme {
       val actualSize = columnType.actualSize(row, ordinal)
       _uncompressedSize += actualSize
 
-      if (lastValue.isNullAt(0)) {
-        columnType.copyField(row, ordinal, lastValue, 0)
+      if (lastValue == null) {
+        lastValue = columnType.clone(columnType.getField(row, ordinal))

Review comment:
       @kiszk Yes it is because `UTF8String`. We reuse `InternalRow` during 
each iteration and since `UTF8String` is backed by the same memory region in 
the `InternalRow`, it will be updated each time as we load a new row, which is 
not correct.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
##########
@@ -195,16 +193,16 @@ private[columnar] case object RunLengthEncoding extends 
CompressionScheme {
       val actualSize = columnType.actualSize(row, ordinal)
       _uncompressedSize += actualSize
 
-      if (lastValue.isNullAt(0)) {
-        columnType.copyField(row, ordinal, lastValue, 0)
+      if (lastValue == null) {
+        lastValue = columnType.clone(columnType.getField(row, ordinal))

Review comment:
       @kiszk Yes it is because `UTF8String`. We reuse `InternalRow` during 
each iteration and since `UTF8String` is backed by the same memory region in 
the `InternalRow`, it will be updated each time as we load a new row, which is 
not correct.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
##########
@@ -195,16 +193,16 @@ private[columnar] case object RunLengthEncoding extends 
CompressionScheme {
       val actualSize = columnType.actualSize(row, ordinal)
       _uncompressedSize += actualSize
 
-      if (lastValue.isNullAt(0)) {
-        columnType.copyField(row, ordinal, lastValue, 0)
+      if (lastValue == null) {
+        lastValue = columnType.clone(columnType.getField(row, ordinal))

Review comment:
       @kiszk Yes it is because `UTF8String`. We reuse `InternalRow` during 
each iteration and since `UTF8String` is backed by the same memory region in 
the `InternalRow`, it will be updated each time as we load a new row, which is 
not correct.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
##########
@@ -195,16 +193,16 @@ private[columnar] case object RunLengthEncoding extends 
CompressionScheme {
       val actualSize = columnType.actualSize(row, ordinal)
       _uncompressedSize += actualSize
 
-      if (lastValue.isNullAt(0)) {
-        columnType.copyField(row, ordinal, lastValue, 0)
+      if (lastValue == null) {
+        lastValue = columnType.clone(columnType.getField(row, ordinal))

Review comment:
       @kiszk Yes it is because `UTF8String`. We reuse `InternalRow` during 
each iteration and since `UTF8String` is backed by the same memory region in 
the `InternalRow`, it will be updated each time as we load a new row, which is 
not correct.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
##########
@@ -195,16 +193,16 @@ private[columnar] case object RunLengthEncoding extends 
CompressionScheme {
       val actualSize = columnType.actualSize(row, ordinal)
       _uncompressedSize += actualSize
 
-      if (lastValue.isNullAt(0)) {
-        columnType.copyField(row, ordinal, lastValue, 0)
+      if (lastValue == null) {
+        lastValue = columnType.clone(columnType.getField(row, ordinal))

Review comment:
       @kiszk Yes it is because `UTF8String`. We reuse `InternalRow` during 
each iteration and since `UTF8String` is backed by the same memory region in 
the `InternalRow`, it will be updated each time as we load a new row, which is 
not correct.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
##########
@@ -195,16 +193,16 @@ private[columnar] case object RunLengthEncoding extends 
CompressionScheme {
       val actualSize = columnType.actualSize(row, ordinal)
       _uncompressedSize += actualSize
 
-      if (lastValue.isNullAt(0)) {
-        columnType.copyField(row, ordinal, lastValue, 0)
+      if (lastValue == null) {
+        lastValue = columnType.clone(columnType.getField(row, ordinal))

Review comment:
       @kiszk Yes it is because `UTF8String`. We reuse `InternalRow` during 
each iteration and since `UTF8String` is backed by the same memory region in 
the `InternalRow`, it will be updated each time as we load a new row, which is 
not correct.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
##########
@@ -195,16 +193,16 @@ private[columnar] case object RunLengthEncoding extends 
CompressionScheme {
       val actualSize = columnType.actualSize(row, ordinal)
       _uncompressedSize += actualSize
 
-      if (lastValue.isNullAt(0)) {
-        columnType.copyField(row, ordinal, lastValue, 0)
+      if (lastValue == null) {
+        lastValue = columnType.clone(columnType.getField(row, ordinal))

Review comment:
       @kiszk Yes it is because `UTF8String`. We reuse `InternalRow` during 
each iteration and since `UTF8String` is backed by the same memory region in 
the `InternalRow`, it will be updated each time as we load a new row, which is 
not correct.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
##########
@@ -195,16 +193,16 @@ private[columnar] case object RunLengthEncoding extends 
CompressionScheme {
       val actualSize = columnType.actualSize(row, ordinal)
       _uncompressedSize += actualSize
 
-      if (lastValue.isNullAt(0)) {
-        columnType.copyField(row, ordinal, lastValue, 0)
+      if (lastValue == null) {
+        lastValue = columnType.clone(columnType.getField(row, ordinal))

Review comment:
       @kiszk Yes it is because `UTF8String`. We reuse `InternalRow` during 
each iteration and since `UTF8String` is backed by the same memory region in 
the `InternalRow`, it will be updated each time as we load a new row, which is 
not correct.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to