This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fd8d5ad  [SPARK-36928][SQL] Handle ANSI intervals in ColumnarRow, 
ColumnarBatchRow and ColumnarArray
fd8d5ad is described below

commit fd8d5ad2140d6405357b908dce2d00a21036dedb
Author: PengLei <peng.8...@gmail.com>
AuthorDate: Thu Oct 28 14:52:41 2021 +0300

    [SPARK-36928][SQL] Handle ANSI intervals in ColumnarRow, ColumnarBatchRow 
and ColumnarArray
    
    ### What changes were proposed in this pull request?
    1. add handle ansi interval type for `get`, `copy` method of ColumnarArray
    2. add handle ansi interval type for `get`, `copy` method of 
ColumnarBatchRow
    3.  add handle ansi interval type for `get`, `copy` method of ColumnarRow
    
    ### Why are the changes needed?
    [SPARK-36928](https://issues.apache.org/jira/browse/SPARK-36928)
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Add test case
    
    Closes #34421 from Peng-Lei/SPARK-36928.
    
    Authored-by: PengLei <peng.8...@gmail.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../apache/spark/sql/vectorized/ColumnarArray.java |  6 +-
 .../spark/sql/vectorized/ColumnarBatchRow.java     |  8 +--
 .../apache/spark/sql/vectorized/ColumnarRow.java   |  8 +--
 .../execution/vectorized/ColumnVectorSuite.scala   | 69 ++++++++++++++++++++++
 .../execution/vectorized/ColumnarBatchSuite.scala  | 32 ++++++++++
 5 files changed, 113 insertions(+), 10 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java
index 147dd24..2fb6b3f 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java
@@ -57,9 +57,11 @@ public final class ColumnarArray extends ArrayData {
       return UnsafeArrayData.fromPrimitiveArray(toByteArray());
     } else if (dt instanceof ShortType) {
       return UnsafeArrayData.fromPrimitiveArray(toShortArray());
-    } else if (dt instanceof IntegerType || dt instanceof DateType) {
+    } else if (dt instanceof IntegerType || dt instanceof DateType
+            || dt instanceof YearMonthIntervalType) {
       return UnsafeArrayData.fromPrimitiveArray(toIntArray());
-    } else if (dt instanceof LongType || dt instanceof TimestampType) {
+    } else if (dt instanceof LongType || dt instanceof TimestampType
+            || dt instanceof DayTimeIntervalType) {
       return UnsafeArrayData.fromPrimitiveArray(toLongArray());
     } else if (dt instanceof FloatType) {
       return UnsafeArrayData.fromPrimitiveArray(toFloatArray());
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java
index c6b7287e7..8c32d5c 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java
@@ -52,9 +52,9 @@ public final class ColumnarBatchRow extends InternalRow {
           row.setByte(i, getByte(i));
         } else if (dt instanceof ShortType) {
           row.setShort(i, getShort(i));
-        } else if (dt instanceof IntegerType) {
+        } else if (dt instanceof IntegerType || dt instanceof 
YearMonthIntervalType) {
           row.setInt(i, getInt(i));
-        } else if (dt instanceof LongType) {
+        } else if (dt instanceof LongType || dt instanceof 
DayTimeIntervalType) {
           row.setLong(i, getLong(i));
         } else if (dt instanceof FloatType) {
           row.setFloat(i, getFloat(i));
@@ -151,9 +151,9 @@ public final class ColumnarBatchRow extends InternalRow {
       return getByte(ordinal);
     } else if (dataType instanceof ShortType) {
       return getShort(ordinal);
-    } else if (dataType instanceof IntegerType) {
+    } else if (dataType instanceof IntegerType || dataType instanceof 
YearMonthIntervalType) {
       return getInt(ordinal);
-    } else if (dataType instanceof LongType) {
+    } else if (dataType instanceof LongType || dataType instanceof 
DayTimeIntervalType) {
       return getLong(ordinal);
     } else if (dataType instanceof FloatType) {
       return getFloat(ordinal);
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
index 4b9d3c5..da4b242 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
@@ -61,9 +61,9 @@ public final class ColumnarRow extends InternalRow {
           row.setByte(i, getByte(i));
         } else if (dt instanceof ShortType) {
           row.setShort(i, getShort(i));
-        } else if (dt instanceof IntegerType) {
+        } else if (dt instanceof IntegerType || dt instanceof 
YearMonthIntervalType) {
           row.setInt(i, getInt(i));
-        } else if (dt instanceof LongType) {
+        } else if (dt instanceof LongType || dt instanceof 
DayTimeIntervalType) {
           row.setLong(i, getLong(i));
         } else if (dt instanceof FloatType) {
           row.setFloat(i, getFloat(i));
@@ -160,9 +160,9 @@ public final class ColumnarRow extends InternalRow {
       return getByte(ordinal);
     } else if (dataType instanceof ShortType) {
       return getShort(ordinal);
-    } else if (dataType instanceof IntegerType) {
+    } else if (dataType instanceof IntegerType || dataType instanceof 
YearMonthIntervalType) {
       return getInt(ordinal);
-    } else if (dataType instanceof LongType) {
+    } else if (dataType instanceof LongType || dataType instanceof 
DayTimeIntervalType) {
       return getLong(ordinal);
     } else if (dataType instanceof FloatType) {
       return getFloat(ordinal);
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
index 43f48ab..cdf41ed 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
@@ -209,6 +209,44 @@ class ColumnVectorSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     }
   }
 
+  DataTypeTestUtils.yearMonthIntervalTypes.foreach {
+    dt =>
+      testVectors(dt.typeName,
+        10,
+        dt) { testVector =>
+        (0 until 10).foreach { i =>
+          testVector.appendInt(i)
+        }
+
+        val array = new ColumnarArray(testVector, 0, 10)
+        val arrayCopy = array.copy()
+
+        (0 until 10).foreach { i =>
+          assert(array.get(i, dt) === i)
+          assert(arrayCopy.get(i, dt) === i)
+        }
+      }
+  }
+
+  DataTypeTestUtils.dayTimeIntervalTypes.foreach {
+    dt =>
+      testVectors(dt.typeName,
+        10,
+        dt) { testVector =>
+        (0 until 10).foreach { i =>
+          testVector.appendLong(i)
+        }
+
+        val array = new ColumnarArray(testVector, 0, 10)
+        val arrayCopy = array.copy()
+
+        (0 until 10).foreach { i =>
+          assert(array.get(i, dt) === i)
+          assert(arrayCopy.get(i, dt) === i)
+        }
+      }
+  }
+
   testVectors("mutable ColumnarRow", 10, IntegerType) { testVector =>
     val mutableRow = new MutableColumnarRow(Array(testVector))
     (0 until 10).foreach { i =>
@@ -536,5 +574,36 @@ class ColumnVectorSuite extends SparkFunSuite with 
BeforeAndAfterEach {
       }
     }
   }
+
+  DataTypeTestUtils.yearMonthIntervalTypes.foreach { dt =>
+    val structType = new StructType().add(dt.typeName, dt)
+    testVectors("ColumnarRow " + dt.typeName, 10, structType) { v =>
+      val column = v.getChild(0)
+      (0 until 10).foreach { i =>
+        column.putInt(i, i)
+      }
+      (0 until 10).foreach { i =>
+        val row = v.getStruct(i)
+        val rowCopy = row.copy()
+        assert(row.get(0, dt) === i)
+        assert(rowCopy.get(0, dt) === i)
+      }
+    }
+  }
+  DataTypeTestUtils.dayTimeIntervalTypes.foreach { dt =>
+    val structType = new StructType().add(dt.typeName, dt)
+    testVectors("ColumnarRow " + dt.typeName, 10, structType) { v =>
+      val column = v.getChild(0)
+      (0 until 10).foreach { i =>
+        column.putLong(i, i)
+      }
+      (0 until 10).foreach { i =>
+        val row = v.getStruct(i)
+        val rowCopy = row.copy()
+        assert(row.get(0, dt) === i)
+        assert(rowCopy.get(0, dt) === i)
+      }
+    }
+  }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index f01b27e..8921500 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -1758,4 +1758,36 @@ class ColumnarBatchSuite extends SparkFunSuite {
       assert(ex.getMessage.contains(
           "Cannot reserve additional contiguous bytes in the vectorized reader 
(integer overflow)"))
   }
+
+  DataTypeTestUtils.yearMonthIntervalTypes.foreach { dt =>
+    testVector(dt.typeName, 10, dt) {
+      column =>
+        (0 until 10).foreach{ i =>
+          column.putInt(i, i)
+        }
+        val bachRow = new ColumnarBatchRow(Array(column))
+        (0 until 10).foreach { i =>
+          bachRow.rowId = i
+          assert(bachRow.get(0, dt) === i)
+          val batchRowCopy = bachRow.copy()
+          assert(batchRowCopy.get(0, dt) === i)
+        }
+    }
+  }
+
+  DataTypeTestUtils.dayTimeIntervalTypes.foreach { dt =>
+    testVector(dt.typeName, 10, dt) {
+      column =>
+        (0 until 10).foreach{ i =>
+          column.putLong(i, i)
+        }
+        val bachRow = new ColumnarBatchRow(Array(column))
+        (0 until 10).foreach { i =>
+          bachRow.rowId = i
+          assert(bachRow.get(0, dt) === i)
+          val batchRowCopy = bachRow.copy()
+          assert(batchRowCopy.get(0, dt) === i)
+        }
+    }
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to