Hi Marco,

as Robert already mentioned, the BatchTableEnvironment is simply build on top of the DataSet API, partitioning functionality is also available in DataSet API.

So using the JdbcInputFormat directly should work in DataSet API. Otherwise I would recommend to use some initial pipeline to transfer the data from JDBC maybe to a CSV file. Flink should support that.

Regards,
Timo


On 17.06.21 17:43, Marco Villalobos wrote:
I need to bootstrap a keyed process function.

So, I was hoping to use the Table SQL API because I thought it could parallelize the work more efficiently via partitioning. I need to boot strap keyed state for a keyed process function, with Flnk 1.12.1, thus I think I am required to use the DataSet API.

Is my only option JdbcInputFormat?

ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment batchTableEnv = BatchTableEnvironment.create(batchEnv);
batchTableEnv.executeSql("
CREATE TABLE my_table (
....
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = '?',
    'connector.username' = '?',
    'connector.password' = '?',
    'connector.table' = 'my_table'
)");

Table table = batchTableEnv.sqlQuery("SELECT name, step FROM my_table");
DataSet<Row> rowDataSet = batchTableEnv.toDataSet(table, Row.class);
rowDataSet.print();

This ends up throwing this exception:

org.apache.flink.table.api.TableException: Only BatchTableSource and InputFormatTableSource are supported in BatchTableEnvironment. at org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:116) at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:580) at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:555) at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:537) at org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl.toDataSet(BatchTableEnvironmentImpl.scala:101)

On Thu, Jun 17, 2021 at 12:51 AM Timo Walther <twal...@apache.org <mailto:twal...@apache.org>> wrote:

    Hi Marco,

    which operations do you want to execute in the bootstrap pipeline?

    Maybe you don't need to use SQL and old planner. At least this would
    simplify the friction by going through another API layer.

    The JDBC connector can be directly be used in DataSet API as well.

    Regards,
    Timo



    On 17.06.21 07:33, Marco Villalobos wrote:
     > Thank you very much!
     >
     > I tried using Flink's SQL JDBC connector, and ran into issues.
     > According to the flink documentation, only the old planner is
    compatible
     > with the DataSet API.
     >
     > When I connect to the table:
     >
     > CREATE TABLE my_table (
     > ....
     > ) WITH (
     >     'connector.type' = 'jdbc',
     >     'connector.url' = '?',
     >     'connector.username' = '?',
     >     'connector.password' = '?',
     >     'connector.table' = 'my_table'
     > )
     >
     > It creates a JdbcTableSource, but only BatchTableSource and
     > InputFormatTableSource are supported in BatchTableEnvironment.
     >
     > By the way, it was very challenging to figure out how to create that
     > connection string, because its a different format than what is in
    the
     > documentation. I had to comb through JdbcTableSourceSinkFactory to
     > figure out how to connect.
     >
     > Is it even possible to use the DataSet API with the Table SQL api in
     > Flink 1.12.1?
     >
     >
     > On Wed, Jun 16, 2021 at 4:55 AM Robert Metzger
    <rmetz...@apache.org <mailto:rmetz...@apache.org>
     > <mailto:rmetz...@apache.org <mailto:rmetz...@apache.org>>> wrote:
     >
     >     Hi Marco,
     >
     >     The DataSet API will not run out of memory, as it spills to
    disk if
     >     the data doesn't fit anymore.
     >     Load is distributed by partitioning data.
     >
     >     Giving you advice depends a bit on the use-case. I would
    explore two
     >     major options:
     >     a) reading the data from postgres using Flink's SQL JDBC
    connector
     >     [1]. 200 GB is not much data. A 1gb network link needs ~30
    minutes
     >     to transfer that (125 megabytes / second)
     >     b) Using the DataSet API and state processor API. I would
    first try
     >     to see how much effort it is to read the data using the
    DataSet API
     >     (could be less convenient than the Flink SQL JDBC connector).
     >
     >     [1]
     >
    
https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/
    
<https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/>
>  <https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/ <https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/>>
     >
     >
     >     On Wed, Jun 16, 2021 at 6:50 AM Marco Villalobos
     >     <mvillalo...@kineteque.com <mailto:mvillalo...@kineteque.com>
    <mailto:mvillalo...@kineteque.com
    <mailto:mvillalo...@kineteque.com>>> wrote:
     >
     >         I must bootstrap state from postgres (approximately 200 GB of
     >         data) and I notice that the state processor API requires the
     >         DataSet API in order to bootstrap state for the Stream API.
     >
     >         I wish there was a way to use the SQL API and use a
    partitioned
     >         scan, but I don't know if that is even possible with the
    DataSet
     >         API.
     >
     >         I never used the DataSet API, and I am unsure how it manages
     >         memory, or distributes load, when handling large state.
     >
     >         Would it run out of memory if I map data from a
    JDBCInputFormat
     >         into a large DataSet and then use that to bootstrap state
    for my
     >         stream job?
     >
     >         Any advice on how I should proceed with this would be greatly
     >         appreciated.
     >
     >         Thank you.
     >


Reply via email to