Hi,

We are using flink jdbc connector to read whole database table line by
line. A few things I don't quite understand.
We configured BATCH_SIZE=100 and PARTITION_NUM=1000, table is pretty big.
What is the flink internal behavior to read data from table?
Flink read BATCH_SIZE data each time? Or it read (tableSize/PARTITION_NUM)
data each time? Or it read whole table into memory each time?
database metrics show the sql latency is extremely high, almost 20s.
is there any way to optimize it?

val query = String.format("SELECT * FROM %s", tableName)

val options = JdbcOptions.builder()
    .setDBUrl(url)
    .setTableName(tableName)
    .setDriverName(DRIVER_NAME)
    .setUsername(userName)
    .setPassword(password)
    .build()
val readOptions = JdbcReadOptions.builder()
    .setQuery(query)
    .setPartitionColumnName(PARTITION_KEY)
    .setPartitionLowerBound(esSinkConf.dbLowerBound)
    .setPartitionUpperBound(esSinkConf.dbUpperBound)
    .setNumPartitions(PARTITION_NUM)
    .setFetchSize(BATCH_SIZE)
    .build()
val lookupOptions = JdbcLookupOptions.builder()
    .setCacheMaxSize(-1)
    .setCacheExpireMs(CACHE_SIZE)
    .setMaxRetryTimes(2)
    .build()
val rawSource = JdbcTableSource.builder()
    .setOptions(options)
    .setReadOptions(readOptions)
    .setLookupOptions(lookupOptions)
    .setSchema(schema)
    .build().getDataStream(env)

Reply via email to