Repository: spark Updated Branches: refs/heads/branch-2.1 fd2e40614 -> e669dd7ea
[SPARK-14536][SQL][BACKPORT-2.1] fix to handle null value in array type column for postgres. ## What changes were proposed in this pull request? JDBC read is failing with NPE due to missing null value check for array data type if the source table has null values in the array type column. For null values Resultset.getArray() returns null. This PR adds null safe check to the Resultset.getArray() value before invoking method on the Array object ## How was this patch tested? Updated the PostgresIntegration test suite to test null values. Ran docker integration tests on my laptop. Author: sureshthalamati <suresh.thalam...@gmail.com> Closes #17460 from sureshthalamati/jdbc_array_null_fix_spark_2.1-SPARK-14536. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e669dd7e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e669dd7e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e669dd7e Branch: refs/heads/branch-2.1 Commit: e669dd7ea474f65fea0d5df011a333bda9de91b4 Parents: fd2e406 Author: sureshthalamati <suresh.thalam...@gmail.com> Authored: Tue Mar 28 14:02:01 2017 -0700 Committer: Xiao Li <gatorsm...@gmail.com> Committed: Tue Mar 28 14:02:01 2017 -0700 ---------------------------------------------------------------------- .../spark/sql/jdbc/PostgresIntegrationSuite.scala | 12 ++++++++++-- .../sql/execution/datasources/jdbc/JdbcUtils.scala | 6 +++--- 2 files changed, 13 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e669dd7e/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala ---------------------------------------------------------------------- diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index c9325de..a1a065a 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -51,12 +51,17 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { + "B'1000100101', E'\\\\xDEADBEEF', true, '172.16.0.42', '192.168.0.0/16', " + """'{1, 2}', '{"a", null, "b"}', '{0.11, 0.22}', '{0.11, 0.22}', 'd1', 1.01, 1)""" ).executeUpdate() + conn.prepareStatement("INSERT INTO bar VALUES (null, null, null, null, null, " + + "null, null, null, null, null, " + + "null, null, null, null, null, null, null)" + ).executeUpdate() } test("Type mapping for various types") { val df = sqlContext.read.jdbc(jdbcUrl, "bar", new Properties) - val rows = df.collect() - assert(rows.length == 1) + val rows = df.collect().sortBy(_.toString()) + assert(rows.length == 2) + // Test the types, and values using the first row. val types = rows(0).toSeq.map(x => x.getClass) assert(types.length == 17) assert(classOf[String].isAssignableFrom(types(0))) @@ -96,6 +101,9 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { assert(rows(0).getString(14) == "d1") assert(rows(0).getFloat(15) == 1.01f) assert(rows(0).getShort(16) == 1) + + // Test reading null values using the second row. + assert(0.until(16).forall(rows(1).isNullAt(_))) } test("Basic write test") { http://git-wip-us.apache.org/repos/asf/spark/blob/e669dd7e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 41edb65..81fdf69 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -423,9 +423,9 @@ object JdbcUtils extends Logging { } (rs: ResultSet, row: InternalRow, pos: Int) => - val array = nullSafeConvert[Object]( - rs.getArray(pos + 1).getArray, - array => new GenericArrayData(elementConversion.apply(array))) + val array = nullSafeConvert[java.sql.Array]( + input = rs.getArray(pos + 1), + array => new GenericArrayData(elementConversion.apply(array.getArray))) row.update(pos, array) case _ => throw new IllegalArgumentException(s"Unsupported type ${dt.simpleString}") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org