Fixed by creating a new netezza Dialect and registered in jdbcDialects using JdbcDialects.registerDialect(NetezzaDialect) method (spark/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala)
package com.citi.ocean.spark.elt /** * Created by st84879 on 26/01/2016. */ import java.sql.{Connection, Types} import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils import org.apache.spark.sql.jdbc.JdbcDialect import org.apache.spark.sql.types._ import org.apache.spark.sql.jdbc.JdbcType private object NetezzaDialect extends JdbcDialect{ override def canHandle(url: String): Boolean = url.startsWith("jdbc:netezza") override def getCatalystType( sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { if (sqlType == Types.BIT && typeName.equals("bit") && size != 1) { Some(BinaryType) } else if (sqlType == Types.OTHER) { toCatalystType(typeName).filter(_ == StringType) } else if (sqlType == Types.ARRAY && typeName.length > 1 && typeName(0) == '_') { toCatalystType(typeName.drop(1)).map(ArrayType(_)) } else None } // TODO: support more type names. private def toCatalystType(typeName: String): Option[DataType] = typeName match { case "bool" => Some(BooleanType) case "bit" => Some(BinaryType) case "int2" => Some(ShortType) case "int4" => Some(IntegerType) case "int8" | "oid" => Some(LongType) case "float4" => Some(FloatType) case "money" | "float8" => Some(DoubleType) case "text" | "varchar" | "char" | "cidr" | "inet" | "json" | "jsonb" | "uuid" => Some(StringType) case "bytea" => Some(BinaryType) case "timestamp" | "timestamptz" | "time" | "timetz" => Some(TimestampType) case "date" => Some(DateType) case "numeric" => Some(DecimalType.SYSTEM_DEFAULT) case _ => None } override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { case StringType => Some(JdbcType("VARCHAR(1000)", Types.CHAR)) case BinaryType => Some(JdbcType("BYTEA", Types.BINARY)) case BooleanType => Some(JdbcType("BOOLEAN", Types.BOOLEAN)) case FloatType => Some(JdbcType("FLOAT4", Types.FLOAT)) case DoubleType => Some(JdbcType("FLOAT8", Types.DOUBLE)) case ArrayType(et, _) if et.isInstanceOf[AtomicType] => getJDBCType(et).map(_.databaseTypeDefinition) .orElse(JdbcUtils.getCommonJDBCType(et).map(_.databaseTypeDefinition)) .map(typeName => JdbcType(s"$typeName[]", java.sql.Types.ARRAY)) case ByteType => throw new IllegalArgumentException(s"Unsupported type in netezza: $dt"); case _ => None } override def getTableExistsQuery(table: String): String = { s"SELECT 1 FROM $table LIMIT 1" } override def beforeFetch(connection: Connection, properties: Map[String, String]): Unit = { super.beforeFetch(connection, properties) // According to the postgres jdbc documentation we need to be in autocommit=false if we actually // want to have fetchsize be non 0 (all the rows). This allows us to not have to cache all the // rows inside the driver when fetching. // // See: https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor // if (properties.getOrElse("fetchsize", "0").toInt > 0) { connection.setAutoCommit(false) } } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/org-netezza-error-NzSQLException-ERROR-Invalid-datatype-TEXT-tp26072p26075.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org