Hi! Sorry for the late reply. Which Flink version are you using? For current Flink master there is no JdbcTableSource.
Qihua Yang <yang...@gmail.com> 于2022年1月19日周三 16:00写道: > Should I change the query? something like below to add a limit? If no > limit, does that mean flink will read whole huge table to memory and fetch > and return 20 records each time? > > val query = String.format("SELECT * FROM %s limit 1000", tableName) > > > On Tue, Jan 18, 2022 at 11:56 PM Qihua Yang <yang...@gmail.com> wrote: > >> Hi Caizhi, >> >> Thank you for your reply. The heap size is 512m. Fetching from the DB >> table is the only costly operation. After fetching from DB, I simply >> ingested a kafka topic. That should not be the bottleneck. >> Here is the jdbc configuration. Is that correct config? >> >> val query = String.format("SELECT * FROM %s", tableName) >> >> val options = JdbcOptions.builder() >> .setDBUrl(url) >> .setTableName(tableName) >> .setDriverName(DRIVER_NAME) >> .setUsername(userName) >> .setPassword(password) >> .build() >> val readOptions = JdbcReadOptions.builder() >> .setQuery(query) >> .setPartitionColumnName(PARTITION_KEY) >> .setPartitionLowerBound(dbLowerBound) >> .setPartitionUpperBound(dbUpperBound) >> .setNumPartitions(50) >> .setFetchSize(20) >> .build() >> val lookupOptions = JdbcLookupOptions.builder() >> .setCacheMaxSize(-1) >> .setCacheExpireMs(1000) >> .setMaxRetryTimes(2) >> .build() >> val rawSource = JdbcTableSource.builder() >> .setOptions(options) >> .setReadOptions(readOptions) >> .setLookupOptions(lookupOptions) >> .setSchema(schema) >> .build().getDataStream(env) >> >> >> On Tue, Jan 18, 2022 at 11:48 PM Caizhi Weng <tsreape...@gmail.com> >> wrote: >> >>> Hi! >>> >>> This is not the desired behavior. As you have set fetchSize to 20 there >>> will be only 20 records in each parallelism of the source. How large is >>> your heap size? Does your job have any other operations which consume a lot >>> of heap memory? >>> >>> Qihua Yang <yang...@gmail.com> 于2022年1月19日周三 15:27写道: >>> >>>> Here is the errors >>>> Exception: java.lang.OutOfMemoryError thrown from the >>>> UncaughtExceptionHandler in thread "server-timer" >>>> Exception: java.lang.OutOfMemoryError thrown from the >>>> UncaughtExceptionHandler in thread "I/O dispatcher 16" >>>> Exception: java.lang.OutOfMemoryError thrown from the >>>> UncaughtExceptionHandler in thread "HTTP-Dispatcher" >>>> Exception: java.lang.OutOfMemoryError thrown from the >>>> UncaughtExceptionHandler in thread "I/O dispatcher 11" >>>> Exception: java.lang.OutOfMemoryError thrown from the >>>> UncaughtExceptionHandler in thread "I/O dispatcher 9" >>>> >>>> On Tue, Jan 18, 2022 at 11:25 PM Qihua Yang <yang...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> I have a flink cluster(50 hosts, each host runs a task manager). >>>>> I am using Flink JDBC to consume data from a database. The db table is >>>>> pretty large, around 187340000 rows. I configured the JDBC number of >>>>> partitions to 50. fetchSize is 20. Flink application has 50 task managers. >>>>> Anyone know why I got OutOfMemoryError? How should I config it? >>>>> >>>>> Thanks, >>>>> Qihua >>>>> >>>>>