My executor will be OOM when use spark-sql to read data from Mysql.

In 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala,
 I see the following lines.I'm wandering why JDBC_BATCH_FETCH_SIZE should be 
bigger than 0?
val fetchSize = {
val size = parameters.getOrElse(JDBC_BATCH_FETCH_SIZE, "0").toInt
  require(size >= 0,
s"Invalid value `${size.toString}` for parameter " +s"`$JDBC_BATCH_FETCH_SIZE`. 
The minimum value is 0. When the value is 0, " +
"the JDBC driver ignores the value and does the estimates.")
  size

}




According to this,  fetchSize should be Integer.MIN_VALUE  to stream result 
sets row-by-row. And in core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala 
(looks like an old style? ),  I see fetchSize is set to Integer.MIN_VALUE too.
override def compute(thePart: Partition, context: TaskContext): Iterator[T] = 
new NextIterator[T]
{
  context.addTaskCompletionListener{ context => closeIfNeeded() }
val part = thePart.asInstanceOf[JdbcPartition]
val conn = getConnection()
val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY)

val url = conn.getMetaData.getURL
if (url.startsWith("jdbc:mysql:")) {
// setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force
    // streaming results, rather than pulling entire resultset into memory.
    // See the below URL
    // 
dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html

stmt.setFetchSize(Integer.MIN_VALUE)
  } else {
stmt.setFetchSize(100)
  }



Thanks

Reply via email to