Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14907#discussion_r77099971
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
    @@ -154,6 +163,288 @@ object JdbcUtils extends Logging {
           throw new IllegalArgumentException(s"Can't get JDBC type for 
${dt.simpleString}"))
       }
     
    +  /**
    +   * Maps a JDBC type to a Catalyst type.  This function is called only 
when
    +   * the JdbcDialect class corresponding to your database driver returns 
null.
    +   *
    +   * @param sqlType - A field of java.sql.Types
    +   * @return The Catalyst type corresponding to sqlType.
    +   */
    +  private def getCatalystType(
    +      sqlType: Int,
    +      precision: Int,
    +      scale: Int,
    +      signed: Boolean): DataType = {
    +    val answer = sqlType match {
    +      // scalastyle:off
    +      case java.sql.Types.ARRAY         => null
    +      case java.sql.Types.BIGINT        => if (signed) { LongType } else { 
DecimalType(20,0) }
    +      case java.sql.Types.BINARY        => BinaryType
    +      case java.sql.Types.BIT           => BooleanType // @see JdbcDialect 
for quirks
    +      case java.sql.Types.BLOB          => BinaryType
    +      case java.sql.Types.BOOLEAN       => BooleanType
    +      case java.sql.Types.CHAR          => StringType
    +      case java.sql.Types.CLOB          => StringType
    +      case java.sql.Types.DATALINK      => null
    +      case java.sql.Types.DATE          => DateType
    +      case java.sql.Types.DECIMAL
    +        if precision != 0 || scale != 0 => DecimalType.bounded(precision, 
scale)
    +      case java.sql.Types.DECIMAL       => DecimalType.SYSTEM_DEFAULT
    +      case java.sql.Types.DISTINCT      => null
    +      case java.sql.Types.DOUBLE        => DoubleType
    +      case java.sql.Types.FLOAT         => FloatType
    +      case java.sql.Types.INTEGER       => if (signed) { IntegerType } 
else { LongType }
    +      case java.sql.Types.JAVA_OBJECT   => null
    +      case java.sql.Types.LONGNVARCHAR  => StringType
    +      case java.sql.Types.LONGVARBINARY => BinaryType
    +      case java.sql.Types.LONGVARCHAR   => StringType
    +      case java.sql.Types.NCHAR         => StringType
    +      case java.sql.Types.NCLOB         => StringType
    +      case java.sql.Types.NULL          => null
    +      case java.sql.Types.NUMERIC
    +        if precision != 0 || scale != 0 => DecimalType.bounded(precision, 
scale)
    +      case java.sql.Types.NUMERIC       => DecimalType.SYSTEM_DEFAULT
    +      case java.sql.Types.NVARCHAR      => StringType
    +      case java.sql.Types.OTHER         => null
    +      case java.sql.Types.REAL          => DoubleType
    +      case java.sql.Types.REF           => StringType
    +      case java.sql.Types.ROWID         => LongType
    +      case java.sql.Types.SMALLINT      => IntegerType
    +      case java.sql.Types.SQLXML        => StringType
    +      case java.sql.Types.STRUCT        => StringType
    +      case java.sql.Types.TIME          => TimestampType
    +      case java.sql.Types.TIMESTAMP     => TimestampType
    +      case java.sql.Types.TINYINT       => IntegerType
    +      case java.sql.Types.VARBINARY     => BinaryType
    +      case java.sql.Types.VARCHAR       => StringType
    +      case _                            => null
    +      // scalastyle:on
    +    }
    +
    +    if (answer == null) throw new SQLException("Unsupported type " + 
sqlType)
    +    answer
    +  }
    +
    +  /**
    +   * Takes a [[ResultSet]] and returns its Catalyst schema.
    +   *
    +   * @return A [[StructType]] giving the Catalyst schema.
    +   * @throws SQLException if the schema contains an unsupported type.
    +   */
    +  def getSchema(resultSet: ResultSet, dialect: JdbcDialect): StructType = {
    +    val rsmd = resultSet.getMetaData
    +    val ncols = rsmd.getColumnCount
    +    val fields = new Array[StructField](ncols)
    +    var i = 0
    +    while (i < ncols) {
    +      val columnName = rsmd.getColumnLabel(i + 1)
    +      val dataType = rsmd.getColumnType(i + 1)
    +      val typeName = rsmd.getColumnTypeName(i + 1)
    +      val fieldSize = rsmd.getPrecision(i + 1)
    +      val fieldScale = rsmd.getScale(i + 1)
    +      val isSigned = rsmd.isSigned(i + 1)
    +      val nullable = rsmd.isNullable(i + 1) != 
ResultSetMetaData.columnNoNulls
    +      val metadata = new MetadataBuilder()
    +        .putString("name", columnName)
    +        .putLong("scale", fieldScale)
    +      val columnType =
    +        dialect.getCatalystType(dataType, typeName, fieldSize, 
metadata).getOrElse(
    +          getCatalystType(dataType, fieldSize, fieldScale, isSigned))
    +      fields(i) = StructField(columnName, columnType, nullable, 
metadata.build())
    +      i = i + 1
    +    }
    +    new StructType(fields)
    +  }
    +
    +  /**
    +   * Convert a [[ResultSet]] into an iterator of Catalyst Rows.
    +   */
    +  def resultSetToRows(resultSet: ResultSet, schema: StructType): 
Iterator[Row] = {
    +    val inputMetrics =
    +      
Option(TaskContext.get()).map(_.taskMetrics().inputMetrics).getOrElse(new 
InputMetrics)
    +    val encoder = RowEncoder(schema).resolveAndBind()
    +    val internalRows = resultSetToSparkInternalRows(resultSet, schema, 
inputMetrics)
    +    internalRows.map(encoder.fromRow)
    +  }
    +
    +  private[spark] def resultSetToSparkInternalRows(
    +      resultSet: ResultSet,
    +      schema: StructType,
    +      inputMetrics: InputMetrics): Iterator[InternalRow] = {
    +    new NextIterator[InternalRow] {
    --- End diff --
    
    Here, I've used `NextIterator` to simplify the implementation of this code. 
Take a look at the `NextIterator` source and note the similarities between it 
and the `hasNext` code in the old `JDBCRDD` iterator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to