Hi!

Sorry for the late reply. Which Flink version are you using? For current
Flink master there is no JdbcTableSource.

Qihua Yang <yang...@gmail.com> 于2022年1月19日周三 16:00写道:

> Should I change the query? something like below to add a limit? If no
> limit, does that mean flink will read whole huge table to memory and fetch
> and return 20 records each time?
>
> val query = String.format("SELECT * FROM %s limit 1000", tableName)
>
>
> On Tue, Jan 18, 2022 at 11:56 PM Qihua Yang <yang...@gmail.com> wrote:
>
>> Hi Caizhi,
>>
>> Thank you for your reply. The heap size is 512m. Fetching from the DB
>> table is the only costly operation. After fetching from DB, I simply
>> ingested a kafka topic. That should not be the bottleneck.
>> Here is the jdbc configuration. Is that correct config?
>>
>> val query = String.format("SELECT * FROM %s", tableName)
>>
>> val options = JdbcOptions.builder()
>>     .setDBUrl(url)
>>     .setTableName(tableName)
>>     .setDriverName(DRIVER_NAME)
>>     .setUsername(userName)
>>     .setPassword(password)
>>     .build()
>> val readOptions = JdbcReadOptions.builder()
>>     .setQuery(query)
>>     .setPartitionColumnName(PARTITION_KEY)
>>     .setPartitionLowerBound(dbLowerBound)
>>     .setPartitionUpperBound(dbUpperBound)
>>     .setNumPartitions(50)
>>     .setFetchSize(20)
>>     .build()
>> val lookupOptions = JdbcLookupOptions.builder()
>>     .setCacheMaxSize(-1)
>>     .setCacheExpireMs(1000)
>>     .setMaxRetryTimes(2)
>>     .build()
>> val rawSource = JdbcTableSource.builder()
>>     .setOptions(options)
>>     .setReadOptions(readOptions)
>>     .setLookupOptions(lookupOptions)
>>     .setSchema(schema)
>>     .build().getDataStream(env)
>>
>>
>> On Tue, Jan 18, 2022 at 11:48 PM Caizhi Weng <tsreape...@gmail.com>
>> wrote:
>>
>>> Hi!
>>>
>>> This is not the desired behavior. As you have set fetchSize to 20 there
>>> will be only 20 records in each parallelism of the source. How large is
>>> your heap size? Does your job have any other operations which consume a lot
>>> of heap memory?
>>>
>>> Qihua Yang <yang...@gmail.com> 于2022年1月19日周三 15:27写道:
>>>
>>>> Here is the errors
>>>> Exception: java.lang.OutOfMemoryError thrown from the
>>>> UncaughtExceptionHandler in thread "server-timer"
>>>> Exception: java.lang.OutOfMemoryError thrown from the
>>>> UncaughtExceptionHandler in thread "I/O dispatcher 16"
>>>> Exception: java.lang.OutOfMemoryError thrown from the
>>>> UncaughtExceptionHandler in thread "HTTP-Dispatcher"
>>>> Exception: java.lang.OutOfMemoryError thrown from the
>>>> UncaughtExceptionHandler in thread "I/O dispatcher 11"
>>>> Exception: java.lang.OutOfMemoryError thrown from the
>>>> UncaughtExceptionHandler in thread "I/O dispatcher 9"
>>>>
>>>> On Tue, Jan 18, 2022 at 11:25 PM Qihua Yang <yang...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have a flink cluster(50 hosts, each host runs a task manager).
>>>>> I am using Flink JDBC to consume data from a database. The db table is
>>>>> pretty large, around 187340000 rows. I configured the JDBC number of
>>>>> partitions to 50. fetchSize is 20. Flink application has 50 task managers.
>>>>> Anyone know why I got OutOfMemoryError? How should I config it?
>>>>>
>>>>> Thanks,
>>>>> Qihua
>>>>>
>>>>>

Reply via email to