Re: Nondeterministic results with SQL job when parallelism is > 1
Jark, Thanks for the heads up! I didn’t see this behavior when running in batch mode with parallelism turned on. Is it safe to do this kind of join in batch mode right now, or am I just getting lucky? Dylan From: Jark Wu Date: Friday, April 16, 2021 at 5:10 AM To: Dylan Forciea Cc: Timo Walther , Piotr Nowojski , "user@flink.apache.org" Subject: Re: Nondeterministic results with SQL job when parallelism is > 1 HI Dylan, I think this has the same reason as https://issues.apache.org/jira/browse/FLINK-20374. The root cause is that changelogs are shuffled by `attr` at second join, and thus records with the same `id` will be shuffled to different join tasks (also different sink tasks). So the data arrived at sinks are not ordered on the sink primary key. We may need something like primary key ordering mechanism in the whole planner to fix this. Best, Jark On Thu, 15 Apr 2021 at 01:33, Dylan Forciea mailto:dy...@oseberg.io>> wrote: On a side note - I changed to use the batch mode per your suggestion Timo, and my job ran much faster and with deterministic counts with parallelism turned on. So I'll probably utilize that for now. However, it would still be nice to dig down into why streaming isn't working in case I need that in the future. Dylan On 4/14/21, 10:27 AM, "Dylan Forciea" mailto:dy...@oseberg.io>> wrote: Timo, Here is the plan (hopefully I properly cleansed it of company proprietary info without garbling it) Dylan == Abstract Syntax Tree == LogicalSink(table=[default_catalog.default_database.sink], fields=[id, attr, attr_mapped]) +- LogicalProject(id=[CASE(IS NOT NULL($0), $0, $2)], attr=[CASE(IS NOT NULL($3), $3, $1)], attr_mapped=[CASE(IS NOT NULL($6), $6, IS NOT NULL($3), $3, $1)]) +- LogicalJoin(condition=[=($4, $5)], joinType=[left]) :- LogicalProject(id1=[$0], attr=[$1], id2=[$2], attr0=[$3], $f4=[CASE(IS NOT NULL($3), $3, $1)]) : +- LogicalJoin(condition=[=($0, $2)], joinType=[full]) : :- LogicalTableScan(table=[[default_catalog, default_database, table1]]) : +- LogicalAggregate(group=[{0}], attr=[MAX($1)]) :+- LogicalProject(id2=[$1], attr=[$0]) : +- LogicalTableScan(table=[[default_catalog, default_database, table2]]) +- LogicalTableScan(table=[[default_catalog, default_database, table3]]) == Optimized Logical Plan == Sink(table=[default_catalog.default_database.sink], fields=[id, attr, attr_mapped], changelogMode=[NONE]) +- Calc(select=[CASE(IS NOT NULL(id1), id1, id2) AS id, CASE(IS NOT NULL(attr0), attr0, attr) AS attr, CASE(IS NOT NULL(attr_mapped), attr_mapped, IS NOT NULL(attr0), attr0, attr) AS attr_mapped], changelogMode=[I,UB,UA,D]) +- Join(joinType=[LeftOuterJoin], where=[=($f4, attr)], select=[id1, attr, id2, attr0, $f4, attr, attr_mapped], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D]) :- Exchange(distribution=[hash[$f4]], changelogMode=[I,UB,UA,D]) : +- Calc(select=[id1, attr, id2, attr0, CASE(IS NOT NULL(attr0), attr0, attr) AS $f4], changelogMode=[I,UB,UA,D]) : +- Join(joinType=[FullOuterJoin], where=[=(id1, id2)], select=[id1, attr, id2, attr0], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D]) ::- Exchange(distribution=[hash[id1]], changelogMode=[I]) :: +- TableSourceScan(table=[[default_catalog, default_database, table1]], fields=[id1, attr], changelogMode=[I]) :+- Exchange(distribution=[hash[id2]], changelogMode=[I,UB,UA]) : +- GroupAggregate(groupBy=[id2], select=[id2, MAX(attr) AS attr], changelogMode=[I,UB,UA]) : +- Exchange(distribution=[hash[id2]], changelogMode=[I]) : +- TableSourceScan(table=[[default_catalog, default_database, table2]], fields=[attr, id2], changelogMode=[I]) +- Exchange(distribution=[hash[attr]], changelogMode=[I]) +- TableSourceScan(table=[[default_catalog, default_database, table3]], fields=[attr, attr_mapped], changelogMode=[I]) == Physical Execution Plan == Stage 1 : Data Source content : Source: TableSourceScan(table=[[default_catalog, default_database, table1]], fields=[id1, attr]) Stage 3 : Data Source content : Source: TableSourceScan(table=[[default_catalog, default_database, table2]], fields=[attr, id2]) Stage 5 : Attr content : GroupAggregate(groupBy=[id2], select=[id2, MAX(attr) AS attr]) ship_strategy : HASH Stage 7 : Attr content : Join(joinType=[FullOuterJoin], where=[(id1 = id2)], select=[id1, attr, id2, attr0], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
Re: Nondeterministic results with SQL job when parallelism is > 1
On a side note - I changed to use the batch mode per your suggestion Timo, and my job ran much faster and with deterministic counts with parallelism turned on. So I'll probably utilize that for now. However, it would still be nice to dig down into why streaming isn't working in case I need that in the future. Dylan On 4/14/21, 10:27 AM, "Dylan Forciea" wrote: Timo, Here is the plan (hopefully I properly cleansed it of company proprietary info without garbling it) Dylan == Abstract Syntax Tree == LogicalSink(table=[default_catalog.default_database.sink], fields=[id, attr, attr_mapped]) +- LogicalProject(id=[CASE(IS NOT NULL($0), $0, $2)], attr=[CASE(IS NOT NULL($3), $3, $1)], attr_mapped=[CASE(IS NOT NULL($6), $6, IS NOT NULL($3), $3, $1)]) +- LogicalJoin(condition=[=($4, $5)], joinType=[left]) :- LogicalProject(id1=[$0], attr=[$1], id2=[$2], attr0=[$3], $f4=[CASE(IS NOT NULL($3), $3, $1)]) : +- LogicalJoin(condition=[=($0, $2)], joinType=[full]) : :- LogicalTableScan(table=[[default_catalog, default_database, table1]]) : +- LogicalAggregate(group=[{0}], attr=[MAX($1)]) :+- LogicalProject(id2=[$1], attr=[$0]) : +- LogicalTableScan(table=[[default_catalog, default_database, table2]]) +- LogicalTableScan(table=[[default_catalog, default_database, table3]]) == Optimized Logical Plan == Sink(table=[default_catalog.default_database.sink], fields=[id, attr, attr_mapped], changelogMode=[NONE]) +- Calc(select=[CASE(IS NOT NULL(id1), id1, id2) AS id, CASE(IS NOT NULL(attr0), attr0, attr) AS attr, CASE(IS NOT NULL(attr_mapped), attr_mapped, IS NOT NULL(attr0), attr0, attr) AS attr_mapped], changelogMode=[I,UB,UA,D]) +- Join(joinType=[LeftOuterJoin], where=[=($f4, attr)], select=[id1, attr, id2, attr0, $f4, attr, attr_mapped], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D]) :- Exchange(distribution=[hash[$f4]], changelogMode=[I,UB,UA,D]) : +- Calc(select=[id1, attr, id2, attr0, CASE(IS NOT NULL(attr0), attr0, attr) AS $f4], changelogMode=[I,UB,UA,D]) : +- Join(joinType=[FullOuterJoin], where=[=(id1, id2)], select=[id1, attr, id2, attr0], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D]) ::- Exchange(distribution=[hash[id1]], changelogMode=[I]) :: +- TableSourceScan(table=[[default_catalog, default_database, table1]], fields=[id1, attr], changelogMode=[I]) :+- Exchange(distribution=[hash[id2]], changelogMode=[I,UB,UA]) : +- GroupAggregate(groupBy=[id2], select=[id2, MAX(attr) AS attr], changelogMode=[I,UB,UA]) : +- Exchange(distribution=[hash[id2]], changelogMode=[I]) : +- TableSourceScan(table=[[default_catalog, default_database, table2]], fields=[attr, id2], changelogMode=[I]) +- Exchange(distribution=[hash[attr]], changelogMode=[I]) +- TableSourceScan(table=[[default_catalog, default_database, table3]], fields=[attr, attr_mapped], changelogMode=[I]) == Physical Execution Plan == Stage 1 : Data Source content : Source: TableSourceScan(table=[[default_catalog, default_database, table1]], fields=[id1, attr]) Stage 3 : Data Source content : Source: TableSourceScan(table=[[default_catalog, default_database, table2]], fields=[attr, id2]) Stage 5 : Attr content : GroupAggregate(groupBy=[id2], select=[id2, MAX(attr) AS attr]) ship_strategy : HASH Stage 7 : Attr content : Join(joinType=[FullOuterJoin], where=[(id1 = id2)], select=[id1, attr, id2, attr0], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) ship_strategy : HASH Stage 8 : Attr content : Calc(select=[id1, attr, id2, attr0, (attr0 IS NOT NULL CASE attr0 CASE attr) AS $f4]) ship_strategy : FORWARD Stage 10 : Data Source content : Source: TableSourceScan(table=[[default_catalog, default_database, table3]], fields=[attr, attr_mapped]) Stage 12 : Attr content : Join(joinType=[LeftOuterJoin], where=[($f4 = attr)], select=[id1, attr, id2, attr0, $f4, attr, attr_mapped], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) ship_strategy : HASH Stage 13 : Attr content : Calc(select=[(id1 IS NOT NULL CASE id1 CASE id2) AS id, (attr0 IS NOT NULL CASE attr0 CASE attr) AS attr, (attr_mapped IS NOT NULL CASE attr_mapped CASE attr0 IS NOT NULL CASE attr0 CASE attr) AS attr_mapped]) ship_strategy
Re: Nondeterministic results with SQL job when parallelism is > 1
:47, Dylan Forciea wrote: > Piotrek, > > I am looking at the count of records present in the sink table in > Postgres after the entire job completes, not the number of > inserts/retracts. I can see as the job runs that records are added and > removed from the “sink” table. With parallelism set to 1, it always > comes out to the same number (which is consistent with the number of ids > in the source tables “table1” and “table2”), at about 491k records in > table “sink” when the job is complete. With the parallelism set to 16, > the “sink” table will have somewhere around 360k records +/- 20k when > the job is complete. I truncate the “sink” table before I run the job, > and this is a test environment where the source databases are static. > > I removed my line for setting to Batch mode per Timo’s suggestion, and > am still running with MAX which should have deterministic output. > > Dylan > > *From: *Piotr Nowojski > *Date: *Wednesday, April 14, 2021 at 9:38 AM > *To: *Dylan Forciea > *Cc: *"user@flink.apache.org" > *Subject: *Re: Nondeterministic results with SQL job when parallelism is > 1 > > Hi Dylan, > > But if you are running your query in Streaming mode, aren't you counting > retractions from the FULL JOIN? AFAIK in Streaming mode in FULL JOIN, > when the first record comes in it will be immediately emitted with NULLs > (not matched, as the other table is empty). Later if a matching record > is received from the second table, the previous result will be retracted > and the new one, updated, will be re-emitted. Maybe this is what you are > observing in the varying output? > > Maybe you could try to analyse how the results differ between different > runs? > > Best, > > Piotrek > > śr., 14 kwi 2021 o 16:22 Dylan Forciea <mailto:dy...@oseberg.io>> napisał(a): > > I replaced the FIRST_VALUE with MAX to ensure that the results > should be identical even in their content, and my problem still > remains – I end up with a nondeterministic count of records being > emitted into the sink when the parallelism is over 1, and that count > is about 20-25% short (and not consistent) of what comes out > consistently when parallelism is set to 1. > > Dylan > > *From: *Dylan Forciea mailto:dy...@oseberg.io>> > *Date: *Wednesday, April 14, 2021 at 9:08 AM > *To: *Piotr Nowojski <mailto:pnowoj...@apache.org>> > *Cc: *"user@flink.apache.org <mailto:user@flink.apache.org>" > mailto:user@flink.apache.org>> > *Subject: *Re: Nondeterministic results with SQL job when > parallelism is > 1 > > Pitorek, > > I was actually originally using a group function that WAS > deterministic (but was a custom UDF I made), but chose something > here built in. By non-deterministic, I mean that the number of > records coming out is not consistent. Since the FIRST_VALUE here is > on an attribute that is not part of the key, that shouldn’t affect > the number of records coming out I wouldn’t think. > > Dylan > > *From: *Piotr Nowojski <mailto:pnowoj...@apache.org>> > *Date: *Wednesday, April 14, 2021 at 9:06 AM > *To: *Dylan Forciea mailto:dy...@oseberg.io>> > *Cc: *"user@flink.apache.org <mailto:user@flink.apache.org>" > mailto:user@flink.apache.org>> > *Subject: *Re: Nondeterministic results with SQL job when > parallelism is > 1 > > Hi, > > Yes, it looks like your query is non deterministic because of > `FIRST_VALUE` used inside `GROUP BY`. If you have many different > parallel sources, each time you run your query your first value > might be different. If that's the case, you could try to confirm it > with even smaller query: > > SELECT >id2, >FIRST_VALUE(attr) AS attr > FROM table2 > GROUP BY id2 > > Best, > > Piotrek > > śr., 14 kwi 2021 o 14:45 Dylan Forciea <mailto:dy...@oseberg.io>> napisał(a): > > I am running Flink 1.12.2, and I was trying to up the > parallelism of my Flink SQL job to see what happened. However, > once I
Re: Nondeterministic results with SQL job when parallelism is > 1
Piotrek, I am looking at the count of records present in the sink table in Postgres after the entire job completes, not the number of inserts/retracts. I can see as the job runs that records are added and removed from the “sink” table. With parallelism set to 1, it always comes out to the same number (which is consistent with the number of ids in the source tables “table1” and “table2”), at about 491k records in table “sink” when the job is complete. With the parallelism set to 16, the “sink” table will have somewhere around 360k records +/- 20k when the job is complete. I truncate the “sink” table before I run the job, and this is a test environment where the source databases are static. I removed my line for setting to Batch mode per Timo’s suggestion, and am still running with MAX which should have deterministic output. Dylan From: Piotr Nowojski Date: Wednesday, April 14, 2021 at 9:38 AM To: Dylan Forciea Cc: "user@flink.apache.org" Subject: Re: Nondeterministic results with SQL job when parallelism is > 1 Hi Dylan, But if you are running your query in Streaming mode, aren't you counting retractions from the FULL JOIN? AFAIK in Streaming mode in FULL JOIN, when the first record comes in it will be immediately emitted with NULLs (not matched, as the other table is empty). Later if a matching record is received from the second table, the previous result will be retracted and the new one, updated, will be re-emitted. Maybe this is what you are observing in the varying output? Maybe you could try to analyse how the results differ between different runs? Best, Piotrek śr., 14 kwi 2021 o 16:22 Dylan Forciea mailto:dy...@oseberg.io>> napisał(a): I replaced the FIRST_VALUE with MAX to ensure that the results should be identical even in their content, and my problem still remains – I end up with a nondeterministic count of records being emitted into the sink when the parallelism is over 1, and that count is about 20-25% short (and not consistent) of what comes out consistently when parallelism is set to 1. Dylan From: Dylan Forciea mailto:dy...@oseberg.io>> Date: Wednesday, April 14, 2021 at 9:08 AM To: Piotr Nowojski mailto:pnowoj...@apache.org>> Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" mailto:user@flink.apache.org>> Subject: Re: Nondeterministic results with SQL job when parallelism is > 1 Pitorek, I was actually originally using a group function that WAS deterministic (but was a custom UDF I made), but chose something here built in. By non-deterministic, I mean that the number of records coming out is not consistent. Since the FIRST_VALUE here is on an attribute that is not part of the key, that shouldn’t affect the number of records coming out I wouldn’t think. Dylan From: Piotr Nowojski mailto:pnowoj...@apache.org>> Date: Wednesday, April 14, 2021 at 9:06 AM To: Dylan Forciea mailto:dy...@oseberg.io>> Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" mailto:user@flink.apache.org>> Subject: Re: Nondeterministic results with SQL job when parallelism is > 1 Hi, Yes, it looks like your query is non deterministic because of `FIRST_VALUE` used inside `GROUP BY`. If you have many different parallel sources, each time you run your query your first value might be different. If that's the case, you could try to confirm it with even smaller query: SELECT id2, FIRST_VALUE(attr) AS attr FROM table2 GROUP BY id2 Best, Piotrek śr., 14 kwi 2021 o 14:45 Dylan Forciea mailto:dy...@oseberg.io>> napisał(a): I am running Flink 1.12.2, and I was trying to up the parallelism of my Flink SQL job to see what happened. However, once I did that, my results became nondeterministic. This happens whether I set the table.exec.resource.default-parallelism config option or I set the default local parallelism to something higher than 1. I would end up with less records in the end, and each time I ran the output record count would come out differently. I managed to distill an example, as pasted below (with attribute names changed to protect company proprietary info), that causes the issue. I feel like I managed to get it to happen with a LEFT JOIN rather than a FULL JOIN, but the distilled version wasn’t giving me wrong results with that. Maybe it has to do with joining to a table that was formed using a GROUP BY? Can somebody tell if I’m doing something that is known not to work, or if I have run across a bug? Regards, Dylan Forciea object Job { def main(args: Array[String]): Unit = { StreamExecutionEnvironment.setDefaultLocalParallelism(1) val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)
Re: Nondeterministic results with SQL job when parallelism is > 1
I replaced the FIRST_VALUE with MAX to ensure that the results should be identical even in their content, and my problem still remains – I end up with a nondeterministic count of records being emitted into the sink when the parallelism is over 1, and that count is about 20-25% short (and not consistent) of what comes out consistently when parallelism is set to 1. Dylan From: Dylan Forciea Date: Wednesday, April 14, 2021 at 9:08 AM To: Piotr Nowojski Cc: "user@flink.apache.org" Subject: Re: Nondeterministic results with SQL job when parallelism is > 1 Pitorek, I was actually originally using a group function that WAS deterministic (but was a custom UDF I made), but chose something here built in. By non-deterministic, I mean that the number of records coming out is not consistent. Since the FIRST_VALUE here is on an attribute that is not part of the key, that shouldn’t affect the number of records coming out I wouldn’t think. Dylan From: Piotr Nowojski Date: Wednesday, April 14, 2021 at 9:06 AM To: Dylan Forciea Cc: "user@flink.apache.org" Subject: Re: Nondeterministic results with SQL job when parallelism is > 1 Hi, Yes, it looks like your query is non deterministic because of `FIRST_VALUE` used inside `GROUP BY`. If you have many different parallel sources, each time you run your query your first value might be different. If that's the case, you could try to confirm it with even smaller query: SELECT id2, FIRST_VALUE(attr) AS attr FROM table2 GROUP BY id2 Best, Piotrek śr., 14 kwi 2021 o 14:45 Dylan Forciea mailto:dy...@oseberg.io>> napisał(a): I am running Flink 1.12.2, and I was trying to up the parallelism of my Flink SQL job to see what happened. However, once I did that, my results became nondeterministic. This happens whether I set the table.exec.resource.default-parallelism config option or I set the default local parallelism to something higher than 1. I would end up with less records in the end, and each time I ran the output record count would come out differently. I managed to distill an example, as pasted below (with attribute names changed to protect company proprietary info), that causes the issue. I feel like I managed to get it to happen with a LEFT JOIN rather than a FULL JOIN, but the distilled version wasn’t giving me wrong results with that. Maybe it has to do with joining to a table that was formed using a GROUP BY? Can somebody tell if I’m doing something that is known not to work, or if I have run across a bug? Regards, Dylan Forciea object Job { def main(args: Array[String]): Unit = { StreamExecutionEnvironment.setDefaultLocalParallelism(1) val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings) val configuration = streamTableEnv.getConfig().getConfiguration() configuration.setInteger("table.exec.resource.default-parallelism", 16) streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH); streamTableEnv.executeSql( """ CREATE TABLE table1 ( id1 STRING PRIMARY KEY NOT ENFORCED, attr STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://…', 'table-name' = 'table1’, 'username' = 'username', 'password' = 'password', 'scan.fetch-size' = '500', 'scan.auto-commit' = 'false' )""") streamTableEnv.executeSql( """ CREATE TABLE table2 ( attr STRING, id2 STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://…', 'table-name' = 'table2', 'username' = 'username', 'password' = 'password', 'scan.fetch-size' = '500', 'scan.auto-commit' = 'false' )""") streamTableEnv.executeSql( """ CREATE TABLE table3 ( attr STRING PRIMARY KEY NOT ENFORCED, attr_mapped STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://…', 'table-name' = ‘table3', 'username' = ‘username', 'password' = 'password', 'scan.fetch-size' = '500', 'scan.auto-commit' = 'false' )""") streamTableEnv.executeSql(""" CREATE TABLE sink ( id STRING PRIMARY KEY NOT ENFORCED, attr STRING, attr_mapped STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://…, 'table-name' = 'sink', 'username' = 'username', 'password' = 'password', 'scan.fetch-size' = '500', 'scan.auto-commit' = 'false' )""") val view = streamTableEnv.sqlQuery(""" S
Re: Nondeterministic results with SQL job when parallelism is > 1
Pitorek, I was actually originally using a group function that WAS deterministic (but was a custom UDF I made), but chose something here built in. By non-deterministic, I mean that the number of records coming out is not consistent. Since the FIRST_VALUE here is on an attribute that is not part of the key, that shouldn’t affect the number of records coming out I wouldn’t think. Dylan From: Piotr Nowojski Date: Wednesday, April 14, 2021 at 9:06 AM To: Dylan Forciea Cc: "user@flink.apache.org" Subject: Re: Nondeterministic results with SQL job when parallelism is > 1 Hi, Yes, it looks like your query is non deterministic because of `FIRST_VALUE` used inside `GROUP BY`. If you have many different parallel sources, each time you run your query your first value might be different. If that's the case, you could try to confirm it with even smaller query: SELECT id2, FIRST_VALUE(attr) AS attr FROM table2 GROUP BY id2 Best, Piotrek śr., 14 kwi 2021 o 14:45 Dylan Forciea mailto:dy...@oseberg.io>> napisał(a): I am running Flink 1.12.2, and I was trying to up the parallelism of my Flink SQL job to see what happened. However, once I did that, my results became nondeterministic. This happens whether I set the table.exec.resource.default-parallelism config option or I set the default local parallelism to something higher than 1. I would end up with less records in the end, and each time I ran the output record count would come out differently. I managed to distill an example, as pasted below (with attribute names changed to protect company proprietary info), that causes the issue. I feel like I managed to get it to happen with a LEFT JOIN rather than a FULL JOIN, but the distilled version wasn’t giving me wrong results with that. Maybe it has to do with joining to a table that was formed using a GROUP BY? Can somebody tell if I’m doing something that is known not to work, or if I have run across a bug? Regards, Dylan Forciea object Job { def main(args: Array[String]): Unit = { StreamExecutionEnvironment.setDefaultLocalParallelism(1) val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings) val configuration = streamTableEnv.getConfig().getConfiguration() configuration.setInteger("table.exec.resource.default-parallelism", 16) streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH); streamTableEnv.executeSql( """ CREATE TABLE table1 ( id1 STRING PRIMARY KEY NOT ENFORCED, attr STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://…', 'table-name' = 'table1’, 'username' = 'username', 'password' = 'password', 'scan.fetch-size' = '500', 'scan.auto-commit' = 'false' )""") streamTableEnv.executeSql( """ CREATE TABLE table2 ( attr STRING, id2 STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://…', 'table-name' = 'table2', 'username' = 'username', 'password' = 'password', 'scan.fetch-size' = '500', 'scan.auto-commit' = 'false' )""") streamTableEnv.executeSql( """ CREATE TABLE table3 ( attr STRING PRIMARY KEY NOT ENFORCED, attr_mapped STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://…', 'table-name' = ‘table3', 'username' = ‘username', 'password' = 'password', 'scan.fetch-size' = '500', 'scan.auto-commit' = 'false' )""") streamTableEnv.executeSql(""" CREATE TABLE sink ( id STRING PRIMARY KEY NOT ENFORCED, attr STRING, attr_mapped STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://…, 'table-name' = 'sink', 'username' = 'username', 'password' = 'password', 'scan.fetch-size' = '500', 'scan.auto-commit' = 'false' )""") val view = streamTableEnv.sqlQuery(""" SELECT COALESCE(t1.id1, t2.id2) AS id, COALESCE(t2.attr, t1.attr) AS operator, COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped FROM table1 t1 FULL JOIN ( SELECT id2, FIRST_VALUE(attr) AS attr FROM table2 GROUP BY id2 ) t2 ON (t1.id1 = t2.id2) LEFT JOIN table3 t3 ON (COALESCE(t2.attr, t1.attr) = t3.attr)""") streamTableEnv.createTemporaryView("view", view) val statementSet = streamTableEnv.createStatementSet() statementSet.addInsertSql(""" INSERT INTO sink SELECT * FROM view """) statementSet.execute().await() } }
Nondeterministic results with SQL job when parallelism is > 1
I am running Flink 1.12.2, and I was trying to up the parallelism of my Flink SQL job to see what happened. However, once I did that, my results became nondeterministic. This happens whether I set the table.exec.resource.default-parallelism config option or I set the default local parallelism to something higher than 1. I would end up with less records in the end, and each time I ran the output record count would come out differently. I managed to distill an example, as pasted below (with attribute names changed to protect company proprietary info), that causes the issue. I feel like I managed to get it to happen with a LEFT JOIN rather than a FULL JOIN, but the distilled version wasn’t giving me wrong results with that. Maybe it has to do with joining to a table that was formed using a GROUP BY? Can somebody tell if I’m doing something that is known not to work, or if I have run across a bug? Regards, Dylan Forciea object Job { def main(args: Array[String]): Unit = { StreamExecutionEnvironment.setDefaultLocalParallelism(1) val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings) val configuration = streamTableEnv.getConfig().getConfiguration() configuration.setInteger("table.exec.resource.default-parallelism", 16) streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH); streamTableEnv.executeSql( """ CREATE TABLE table1 ( id1 STRING PRIMARY KEY NOT ENFORCED, attr STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://…', 'table-name' = 'table1’, 'username' = 'username', 'password' = 'password', 'scan.fetch-size' = '500', 'scan.auto-commit' = 'false' )""") streamTableEnv.executeSql( """ CREATE TABLE table2 ( attr STRING, id2 STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://…', 'table-name' = 'table2', 'username' = 'username', 'password' = 'password', 'scan.fetch-size' = '500', 'scan.auto-commit' = 'false' )""") streamTableEnv.executeSql( """ CREATE TABLE table3 ( attr STRING PRIMARY KEY NOT ENFORCED, attr_mapped STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://…', 'table-name' = ‘table3', 'username' = ‘username', 'password' = 'password', 'scan.fetch-size' = '500', 'scan.auto-commit' = 'false' )""") streamTableEnv.executeSql(""" CREATE TABLE sink ( id STRING PRIMARY KEY NOT ENFORCED, attr STRING, attr_mapped STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://…, 'table-name' = 'sink', 'username' = 'username', 'password' = 'password', 'scan.fetch-size' = '500', 'scan.auto-commit' = 'false' )""") val view = streamTableEnv.sqlQuery(""" SELECT COALESCE(t1.id1, t2.id2) AS id, COALESCE(t2.attr, t1.attr) AS operator, COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped FROM table1 t1 FULL JOIN ( SELECT id2, FIRST_VALUE(attr) AS attr FROM table2 GROUP BY id2 ) t2 ON (t1.id1 = t2.id2) LEFT JOIN table3 t3 ON (COALESCE(t2.attr, t1.attr) = t3.attr)""") streamTableEnv.createTemporaryView("view", view) val statementSet = streamTableEnv.createStatementSet() statementSet.addInsertSql(""" INSERT INTO sink SELECT * FROM view """) statementSet.execute().await() } }
Re: Trying to create a generic aggregate UDF
I wanted to report that I tried out your PR, and it does solve my issue. I am able to create a generic LatestNonNull and it appears to do what is expected. Thanks, Dylan Forciea On 1/21/21, 8:50 AM, "Timo Walther" wrote: I opened a PR. Feel free to try it out. https://github.com/apache/flink/pull/14720 Btw: >> env.createTemporarySystemFunction("LatestNonNullLong", >> classOf[LatestNonNull[Long]]) >> >> env.createTemporarySystemFunction("LatestNonNullString", >> classOf[LatestNonNull[String]]) don't make a difference. The generics will be type erased in bytecode and only the class name matters. Thanks, Timo On 21.01.21 11:36, Timo Walther wrote: > Hi Dylan, > > thanks for the investigation. I can now also reproduce it my code. Yes, > this is a bug. I opened > > https://issues.apache.org/jira/browse/FLINK-21070 > > and will try to fix this asap. > > Thanks, > Timo > > On 20.01.21 17:52, Dylan Forciea wrote: >> Timo, >> >> I converted what I had to Java, and ended up with the exact same issue >> as before where it will work if I only ever use it on 1 type, but not >> if I use it on multiple. Maybe this is a bug? >> >> Dylan >> >> On 1/20/21, 10:06 AM, "Dylan Forciea" wrote: >> >> Oh, I think I might have a clue as to what is going on. I notice >> that it will work properly when I only call it on Long. I think that >> it is using the same generated code for the Converter for whatever was >> called first. >> >> Since in Scala I can't declare an object as static within the >> class itself, I wonder if it won't generate appropriate Converter code >> per subtype. I tried creating a subclass that is specific to the type >> within my class and returning that as the accumulator, but that didn't >> help. And, I can't refer to that class in the TypeInference since it >> isn't static and I get an error from Flink because of that. I'm going >> to see if I just write this UDF in Java with an embedded public static >> class like you have if it will solve my problems. I'll report back to >> let you know what I find. If that works, I'm not quite sure how to >> make it work in Scala. >> >> Regards, >> Dylan Forciea >> >> On 1/20/21, 9:34 AM, "Dylan Forciea" wrote: >> >> As a side note, I also just tried to unify into a single >> function registration and used _ as the type parameter in the classOf >> calls there and within the TypeInference definition for the >> accumulator and still ended up with the exact same stack trace. >> >> Dylan >> >> On 1/20/21, 9:22 AM, "Dylan Forciea" wrote: >> >> Timo, >> >> I appreciate it! I am using Flink 1.12.0 right now with >> the Blink planner. What you proposed is roughly what I had come up >> with the first time around that resulted in the stack trace with the >> ClassCastException I had originally included. I saw that you had used >> a Row instead of just the value in our example, but changing it that >> way didn't seem to help, which makes sense since the problem seems to >> be in the code generated for the accumulator Converter and not the >> output. >> >> Here is the exact code that caused that error (while >> calling LatestNonNullLong): >> >> The registration of the below: >> >> env.createTemporarySystemFunction("LatestNonNullLong", >> classOf[LatestNonNull[Long]]) >> >> env.createTemporarySystemFunction("LatestNonNullString", >> classOf[LatestNonNull[String]]) >> >> >> The class itself: >> >> import java.time.LocalDate >> import java.util.Optional >> import org.apache.flink.table.api.DataTypes >> import org.apache.flink.table.catalog.DataTypeFactory >> import org.apache.flink.table.functions.AggregateFunction >> import >> org.apache.flink.table.types.inference.{InputTypeStrategies, >> TypeInference} >> >>
Re: Trying to create a generic aggregate UDF
Timo, Will do! I have been patching in a change locally that I have a PR [1] out for, so if this will end up in the next 1.12 patch release, I may add this in with it once it has been approved and merged. On a side note, that PR has been out since the end of October (looks like I need to do a rebase to accommodate the code reformatting change that occurred since). Is there a process for getting somebody to review it? Not sure if with the New Year and the 1.12 release and follow-up if it just got lost in the commotion. Regards, Dylan Forciea [1] https://github.com/apache/flink/pull/13787 On 1/21/21, 8:50 AM, "Timo Walther" wrote: I opened a PR. Feel free to try it out. https://github.com/apache/flink/pull/14720 Btw: >> env.createTemporarySystemFunction("LatestNonNullLong", >> classOf[LatestNonNull[Long]]) >> >> env.createTemporarySystemFunction("LatestNonNullString", >> classOf[LatestNonNull[String]]) don't make a difference. The generics will be type erased in bytecode and only the class name matters. Thanks, Timo On 21.01.21 11:36, Timo Walther wrote: > Hi Dylan, > > thanks for the investigation. I can now also reproduce it my code. Yes, > this is a bug. I opened > > https://issues.apache.org/jira/browse/FLINK-21070 > > and will try to fix this asap. > > Thanks, > Timo > > On 20.01.21 17:52, Dylan Forciea wrote: >> Timo, >> >> I converted what I had to Java, and ended up with the exact same issue >> as before where it will work if I only ever use it on 1 type, but not >> if I use it on multiple. Maybe this is a bug? >> >> Dylan >> >> On 1/20/21, 10:06 AM, "Dylan Forciea" wrote: >> >> Oh, I think I might have a clue as to what is going on. I notice >> that it will work properly when I only call it on Long. I think that >> it is using the same generated code for the Converter for whatever was >> called first. >> >> Since in Scala I can't declare an object as static within the >> class itself, I wonder if it won't generate appropriate Converter code >> per subtype. I tried creating a subclass that is specific to the type >> within my class and returning that as the accumulator, but that didn't >> help. And, I can't refer to that class in the TypeInference since it >> isn't static and I get an error from Flink because of that. I'm going >> to see if I just write this UDF in Java with an embedded public static >> class like you have if it will solve my problems. I'll report back to >> let you know what I find. If that works, I'm not quite sure how to >> make it work in Scala. >> >> Regards, >> Dylan Forciea >> >> On 1/20/21, 9:34 AM, "Dylan Forciea" wrote: >> >> As a side note, I also just tried to unify into a single >> function registration and used _ as the type parameter in the classOf >> calls there and within the TypeInference definition for the >> accumulator and still ended up with the exact same stack trace. >> >> Dylan >> >> On 1/20/21, 9:22 AM, "Dylan Forciea" wrote: >> >> Timo, >> >> I appreciate it! I am using Flink 1.12.0 right now with >> the Blink planner. What you proposed is roughly what I had come up >> with the first time around that resulted in the stack trace with the >> ClassCastException I had originally included. I saw that you had used >> a Row instead of just the value in our example, but changing it that >> way didn't seem to help, which makes sense since the problem seems to >> be in the code generated for the accumulator Converter and not the >> output. >> >> Here is the exact code that caused that error (while >> calling LatestNonNullLong): >> >> The registration of the below: >> >> env.createTemporarySystemFunction("LatestNonNullLong", >> classOf[LatestNonNull[Long]]) >> >> env.createTemporarySystemFunction("LatestNonNullString", >> classOf[LatestNonNull[String]]) >> >> >> The class itself: >> >> import java.time.LocalDate >>
Re: Trying to create a generic aggregate UDF
Timo, I converted what I had to Java, and ended up with the exact same issue as before where it will work if I only ever use it on 1 type, but not if I use it on multiple. Maybe this is a bug? Dylan On 1/20/21, 10:06 AM, "Dylan Forciea" wrote: Oh, I think I might have a clue as to what is going on. I notice that it will work properly when I only call it on Long. I think that it is using the same generated code for the Converter for whatever was called first. Since in Scala I can't declare an object as static within the class itself, I wonder if it won't generate appropriate Converter code per subtype. I tried creating a subclass that is specific to the type within my class and returning that as the accumulator, but that didn't help. And, I can't refer to that class in the TypeInference since it isn't static and I get an error from Flink because of that. I'm going to see if I just write this UDF in Java with an embedded public static class like you have if it will solve my problems. I'll report back to let you know what I find. If that works, I'm not quite sure how to make it work in Scala. Regards, Dylan Forciea On 1/20/21, 9:34 AM, "Dylan Forciea" wrote: As a side note, I also just tried to unify into a single function registration and used _ as the type parameter in the classOf calls there and within the TypeInference definition for the accumulator and still ended up with the exact same stack trace. Dylan On 1/20/21, 9:22 AM, "Dylan Forciea" wrote: Timo, I appreciate it! I am using Flink 1.12.0 right now with the Blink planner. What you proposed is roughly what I had come up with the first time around that resulted in the stack trace with the ClassCastException I had originally included. I saw that you had used a Row instead of just the value in our example, but changing it that way didn't seem to help, which makes sense since the problem seems to be in the code generated for the accumulator Converter and not the output. Here is the exact code that caused that error (while calling LatestNonNullLong): The registration of the below: env.createTemporarySystemFunction("LatestNonNullLong", classOf[LatestNonNull[Long]]) env.createTemporarySystemFunction("LatestNonNullString", classOf[LatestNonNull[String]]) The class itself: import java.time.LocalDate import java.util.Optional import org.apache.flink.table.api.DataTypes import org.apache.flink.table.catalog.DataTypeFactory import org.apache.flink.table.functions.AggregateFunction import org.apache.flink.table.types.inference.{InputTypeStrategies, TypeInference} case class LatestNonNullAccumulator[T]( var value: T = null.asInstanceOf[T], var date: LocalDate = null) class LatestNonNull[T] extends AggregateFunction[T, LatestNonNullAccumulator[T]] { override def createAccumulator(): LatestNonNullAccumulator[T] = { LatestNonNullAccumulator[T]() } override def getValue(acc: LatestNonNullAccumulator[T]): T = { acc.value } def accumulate(acc: LatestNonNullAccumulator[T], value: T, date: LocalDate): Unit = { if (value != null) { Option(acc.date).fold { acc.value = value acc.date = date } { accDate => if (date != null && date.isAfter(accDate)) { acc.value = value acc.date = date } } } } def merge( acc: LatestNonNullAccumulator[T], it: java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = { val iter = it.iterator() while (iter.hasNext) { val a = iter.next() if (a.value != null) { Option(acc.date).fold { acc.value = a.value acc.date = a.date } { accDate => Option(a.date).map { curDate => if (curDate.isAfter(accDate)) { acc.value = a.value acc.date = a.date } } } } } } def resetAccumulator(acc: LatestNonNullAccumulator[T]): Unit = { acc.value = null.asInstanceOf[T] acc.date = null } override def getTypeInference(typeFactory: DataTypeFactory): TypeInfere
Re: Trying to create a generic aggregate UDF
Oh, I think I might have a clue as to what is going on. I notice that it will work properly when I only call it on Long. I think that it is using the same generated code for the Converter for whatever was called first. Since in Scala I can't declare an object as static within the class itself, I wonder if it won't generate appropriate Converter code per subtype. I tried creating a subclass that is specific to the type within my class and returning that as the accumulator, but that didn't help. And, I can't refer to that class in the TypeInference since it isn't static and I get an error from Flink because of that. I'm going to see if I just write this UDF in Java with an embedded public static class like you have if it will solve my problems. I'll report back to let you know what I find. If that works, I'm not quite sure how to make it work in Scala. Regards, Dylan Forciea On 1/20/21, 9:34 AM, "Dylan Forciea" wrote: As a side note, I also just tried to unify into a single function registration and used _ as the type parameter in the classOf calls there and within the TypeInference definition for the accumulator and still ended up with the exact same stack trace. Dylan On 1/20/21, 9:22 AM, "Dylan Forciea" wrote: Timo, I appreciate it! I am using Flink 1.12.0 right now with the Blink planner. What you proposed is roughly what I had come up with the first time around that resulted in the stack trace with the ClassCastException I had originally included. I saw that you had used a Row instead of just the value in our example, but changing it that way didn't seem to help, which makes sense since the problem seems to be in the code generated for the accumulator Converter and not the output. Here is the exact code that caused that error (while calling LatestNonNullLong): The registration of the below: env.createTemporarySystemFunction("LatestNonNullLong", classOf[LatestNonNull[Long]]) env.createTemporarySystemFunction("LatestNonNullString", classOf[LatestNonNull[String]]) The class itself: import java.time.LocalDate import java.util.Optional import org.apache.flink.table.api.DataTypes import org.apache.flink.table.catalog.DataTypeFactory import org.apache.flink.table.functions.AggregateFunction import org.apache.flink.table.types.inference.{InputTypeStrategies, TypeInference} case class LatestNonNullAccumulator[T]( var value: T = null.asInstanceOf[T], var date: LocalDate = null) class LatestNonNull[T] extends AggregateFunction[T, LatestNonNullAccumulator[T]] { override def createAccumulator(): LatestNonNullAccumulator[T] = { LatestNonNullAccumulator[T]() } override def getValue(acc: LatestNonNullAccumulator[T]): T = { acc.value } def accumulate(acc: LatestNonNullAccumulator[T], value: T, date: LocalDate): Unit = { if (value != null) { Option(acc.date).fold { acc.value = value acc.date = date } { accDate => if (date != null && date.isAfter(accDate)) { acc.value = value acc.date = date } } } } def merge( acc: LatestNonNullAccumulator[T], it: java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = { val iter = it.iterator() while (iter.hasNext) { val a = iter.next() if (a.value != null) { Option(acc.date).fold { acc.value = a.value acc.date = a.date } { accDate => Option(a.date).map { curDate => if (curDate.isAfter(accDate)) { acc.value = a.value acc.date = a.date } } } } } } def resetAccumulator(acc: LatestNonNullAccumulator[T]): Unit = { acc.value = null.asInstanceOf[T] acc.date = null } override def getTypeInference(typeFactory: DataTypeFactory): TypeInference = { TypeInference .newBuilder() .inputTypeStrategy(InputTypeStrategies .sequence(InputTypeStrategies.ANY, InputTypeStrategies.explicit(DataTypes.DATE( .accumulatorTypeStrategy { callContext => val accDataType = DataTypes.STRUCTURED( classOf[LatestNonNullAccumulator[T]], DataTypes.FIELD("value", callContext.getArgumentDataTypes.get(0)), DataTypes.FIELD("date", DataTypes.DATE()))
Re: Trying to create a generic aggregate UDF
As a side note, I also just tried to unify into a single function registration and used _ as the type parameter in the classOf calls there and within the TypeInference definition for the accumulator and still ended up with the exact same stack trace. Dylan On 1/20/21, 9:22 AM, "Dylan Forciea" wrote: Timo, I appreciate it! I am using Flink 1.12.0 right now with the Blink planner. What you proposed is roughly what I had come up with the first time around that resulted in the stack trace with the ClassCastException I had originally included. I saw that you had used a Row instead of just the value in our example, but changing it that way didn't seem to help, which makes sense since the problem seems to be in the code generated for the accumulator Converter and not the output. Here is the exact code that caused that error (while calling LatestNonNullLong): The registration of the below: env.createTemporarySystemFunction("LatestNonNullLong", classOf[LatestNonNull[Long]]) env.createTemporarySystemFunction("LatestNonNullString", classOf[LatestNonNull[String]]) The class itself: import java.time.LocalDate import java.util.Optional import org.apache.flink.table.api.DataTypes import org.apache.flink.table.catalog.DataTypeFactory import org.apache.flink.table.functions.AggregateFunction import org.apache.flink.table.types.inference.{InputTypeStrategies, TypeInference} case class LatestNonNullAccumulator[T]( var value: T = null.asInstanceOf[T], var date: LocalDate = null) class LatestNonNull[T] extends AggregateFunction[T, LatestNonNullAccumulator[T]] { override def createAccumulator(): LatestNonNullAccumulator[T] = { LatestNonNullAccumulator[T]() } override def getValue(acc: LatestNonNullAccumulator[T]): T = { acc.value } def accumulate(acc: LatestNonNullAccumulator[T], value: T, date: LocalDate): Unit = { if (value != null) { Option(acc.date).fold { acc.value = value acc.date = date } { accDate => if (date != null && date.isAfter(accDate)) { acc.value = value acc.date = date } } } } def merge( acc: LatestNonNullAccumulator[T], it: java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = { val iter = it.iterator() while (iter.hasNext) { val a = iter.next() if (a.value != null) { Option(acc.date).fold { acc.value = a.value acc.date = a.date } { accDate => Option(a.date).map { curDate => if (curDate.isAfter(accDate)) { acc.value = a.value acc.date = a.date } } } } } } def resetAccumulator(acc: LatestNonNullAccumulator[T]): Unit = { acc.value = null.asInstanceOf[T] acc.date = null } override def getTypeInference(typeFactory: DataTypeFactory): TypeInference = { TypeInference .newBuilder() .inputTypeStrategy(InputTypeStrategies .sequence(InputTypeStrategies.ANY, InputTypeStrategies.explicit(DataTypes.DATE( .accumulatorTypeStrategy { callContext => val accDataType = DataTypes.STRUCTURED( classOf[LatestNonNullAccumulator[T]], DataTypes.FIELD("value", callContext.getArgumentDataTypes.get(0)), DataTypes.FIELD("date", DataTypes.DATE())) Optional.of(accDataType) } .outputTypeStrategy { callContext => val outputDataType = callContext.getArgumentDataTypes().get(0); Optional.of(outputDataType); } .build() } } Regards, Dylan Forciea On 1/20/21, 2:37 AM, "Timo Walther" wrote: Hi Dylan, I'm assuming your are using Flink 1.12 and the Blink planner? Beginning from 1.12 you can use the "new" aggregate functions with a better type inference. So TypeInformation will not be used in this stack. I tried to come up with an example that should explain the rough design. I will include this example into the Flink code base. I hope this helps: import org.apache.flink.table.types.inference.InputTypeStrategies; public static class LastIfNotNull extends AggregateFunction> { public static class Accumulator { public T value; public LocalDate date; } public void accumulate(Accumulator acc, T input, LocalDate date) { if (input != null) { acc.value = input; ac
Re: Trying to create a generic aggregate UDF
Timo, I appreciate it! I am using Flink 1.12.0 right now with the Blink planner. What you proposed is roughly what I had come up with the first time around that resulted in the stack trace with the ClassCastException I had originally included. I saw that you had used a Row instead of just the value in our example, but changing it that way didn't seem to help, which makes sense since the problem seems to be in the code generated for the accumulator Converter and not the output. Here is the exact code that caused that error (while calling LatestNonNullLong): The registration of the below: env.createTemporarySystemFunction("LatestNonNullLong", classOf[LatestNonNull[Long]]) env.createTemporarySystemFunction("LatestNonNullString", classOf[LatestNonNull[String]]) The class itself: import java.time.LocalDate import java.util.Optional import org.apache.flink.table.api.DataTypes import org.apache.flink.table.catalog.DataTypeFactory import org.apache.flink.table.functions.AggregateFunction import org.apache.flink.table.types.inference.{InputTypeStrategies, TypeInference} case class LatestNonNullAccumulator[T]( var value: T = null.asInstanceOf[T], var date: LocalDate = null) class LatestNonNull[T] extends AggregateFunction[T, LatestNonNullAccumulator[T]] { override def createAccumulator(): LatestNonNullAccumulator[T] = { LatestNonNullAccumulator[T]() } override def getValue(acc: LatestNonNullAccumulator[T]): T = { acc.value } def accumulate(acc: LatestNonNullAccumulator[T], value: T, date: LocalDate): Unit = { if (value != null) { Option(acc.date).fold { acc.value = value acc.date = date } { accDate => if (date != null && date.isAfter(accDate)) { acc.value = value acc.date = date } } } } def merge( acc: LatestNonNullAccumulator[T], it: java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = { val iter = it.iterator() while (iter.hasNext) { val a = iter.next() if (a.value != null) { Option(acc.date).fold { acc.value = a.value acc.date = a.date } { accDate => Option(a.date).map { curDate => if (curDate.isAfter(accDate)) { acc.value = a.value acc.date = a.date } } } } } } def resetAccumulator(acc: LatestNonNullAccumulator[T]): Unit = { acc.value = null.asInstanceOf[T] acc.date = null } override def getTypeInference(typeFactory: DataTypeFactory): TypeInference = { TypeInference .newBuilder() .inputTypeStrategy(InputTypeStrategies .sequence(InputTypeStrategies.ANY, InputTypeStrategies.explicit(DataTypes.DATE( .accumulatorTypeStrategy { callContext => val accDataType = DataTypes.STRUCTURED( classOf[LatestNonNullAccumulator[T]], DataTypes.FIELD("value", callContext.getArgumentDataTypes.get(0)), DataTypes.FIELD("date", DataTypes.DATE())) Optional.of(accDataType) } .outputTypeStrategy { callContext => val outputDataType = callContext.getArgumentDataTypes().get(0); Optional.of(outputDataType); } .build() } } Regards, Dylan Forciea On 1/20/21, 2:37 AM, "Timo Walther" wrote: Hi Dylan, I'm assuming your are using Flink 1.12 and the Blink planner? Beginning from 1.12 you can use the "new" aggregate functions with a better type inference. So TypeInformation will not be used in this stack. I tried to come up with an example that should explain the rough design. I will include this example into the Flink code base. I hope this helps: import org.apache.flink.table.types.inference.InputTypeStrategies; public static class LastIfNotNull extends AggregateFunction> { public static class Accumulator { public T value; public LocalDate date; } public void accumulate(Accumulator acc, T input, LocalDate date) { if (input != null) { acc.value = input; acc.date = date; } } @Override public Row getValue(Accumulator acc) { return Row.of(acc.value, acc.date); } @Override public Accumulator createAccumulator() { return new Accumulator<>(); } @Override public TypeInference getTypeInference(DataTypeFactory typeFactory) { return TypeInference.newBuilder() .inputTypeStrategy( InputTypeStrategies.sequence( InputTypeStrategies.ANY, InputTypeStrategies.explicit(DataTypes.DATE( .accumulatorTypeStrategy(
Trying to create a generic aggregate UDF
I am attempting to create an aggregate UDF that takes a generic parameter T, but for the life of me, I can’t seem to get it to work. The UDF I’m trying to implement takes two input arguments, a value that is generic, and a date. It will choose the non-null value with the latest associated date. I had originally done this with separate Top 1 queries connected with a left join, but the memory usage seems far higher than doing this with a custom aggregate function. As a first attempt, I tried to use custom type inference to have it validate that the first argument type is the output type and have a single function, and also used DataTypes.STRUCTURE to try to define the shape of my accumulator. However, that resulted in an exception like this whenever I tried to use a non-string value as the first argument: [error] Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String [error] at io$oseberg$flink$udf$LatestNonNullAccumulator$Converter.toInternal(Unknown Source) [error] at org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:92) [error] at org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:47) [error] at org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:59) [error] at GroupAggsHandler$777.getAccumulators(Unknown Source) [error] at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:175) [error] at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:45) [error] at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) [error] at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193) [error] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179) [error] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152) [error] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) [error] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) [error] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) [error] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) [error] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) [error] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [error] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [error] at java.lang.Thread.run(Thread.java:748) Figuring that I can’t do something of that sort, I tried to follow the general approach in the Sum accumulator[1] in the Flink source code where separate classes are derived from a base class, and each advertises its accumulator shape, but ended up with the exact same stack trace as above when I tried to create and use a function specifically for a non-string type like Long. Is there something I’m missing as far as how this is supposed to be done? Everything I try either results in a stack track like the above, or type erasure issues when trying to get type information for the accumulator. If I just copy the generic code multiple times and just directly use Long or String rather than using subclassing, then it works just fine. I appreciate any help I can get on this! Regards, Dylan Forciea [1] https://github.com/apache/flink/blob/release-1.12.0/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
Re: Batch loading into postgres database
After a bit more playing around with it today, I figured out that what I needed to call was: statementSet.execute().getJobClient().get().getJobExecutionResult(getClass().getClassLoader()).get() The fact that getJobExecutionResult required a classloader is what threw me off. Since I’m using an application cluster, I didn’t really think about the fact that I would need to supply one. It might be nice to have a flavor of this that just uses the default system class loader. However, I’m still interested in if there is a better way to handle reloading a table or tables in active use than the series of steps that I went through, if anybody has any suggestions! Thanks, Dylan Forciea From: Dylan Forciea Date: Monday, December 7, 2020 at 5:33 PM To: "user@flink.apache.org" Subject: Re: Batch loading into postgres database As a follow up – I’m trying to follow the approach I outlined below, and I’m having trouble figuring out how to perform the step of doing the delete/insert after the job is complete. I’ve tried adding a job listener, like so, but that doesn’t seem to ever get fired off: val statementSet = streamTableEnv.createStatementSet() statementSet.addInsertSql(""" INSERT INTO table1_staging SELECT * FROM table """) statementSet.addInsertSql(""" INSERT INTO table2_staging SELECT * FROM table """) statementSet.addInsertSql(""" INSERT INTO table3_staging SELECT * FROM table3 """) streamEnv.registerJobListener(new JobListener() { override def onJobSubmitted(jobClient: JobClient, throwable: Throwable): Unit = {} override def onJobExecuted(result: JobExecutionResult, throwable: Throwable): Unit = { val time = Option(result).map(_.getNetRuntime()) if (throwable == null) { Log.info(s"Completed job successfully in $time milliseconds") } else { Log.error(s"Unable to execute job successfully", throwable) } } }) statementSet.execute() I tried the above with the execute before and after the register, but it doesn’t seem to fire in any case. I also tried this: Try(statementSet.execute().getJobClient().get().getJobStatus().join()) .map { _ => Log.info(s"Completed job successfully") } .recover { case t => { Log.error(s"Unable to execute job successfully", t) } } And this seems to have fired WAY before the job actually finished flowing all the data through. I tried both join and get on the job status CompleteableFuture Is there anything I’m missing as far as being able to tell when the job is complete? Again, this is Flink 1.11.2 that I’m running. Thanks, Dylan Forciea From: Dylan Forciea Date: Monday, December 7, 2020 at 8:04 AM To: "user@flink.apache.org" Subject: Batch loading into postgres database I am setting up a Flink job that will reload a table in a postgres database using the Flink SQL functionality. I just wanted to make sure that given the current feature set I am going about this the correct way. I am currently using version 1.11.2, but plan on upgrading to 1.12 soon whenever it is finalized. I have setup a staging table and a final table in a postgres database. My plan is to have a Flink application that will truncate the contents of the staging table before the job begins using JDBC, run the job to completion, and then with JDBC delete/insert into the final table from the staging table in a transaction after the job completes. Is this the expected way to interact with postgres in a batch job like this? Or is there some functionality or method that I am missing? Regards, Dylan Forciae
Re: Batch loading into postgres database
As a follow up – I’m trying to follow the approach I outlined below, and I’m having trouble figuring out how to perform the step of doing the delete/insert after the job is complete. I’ve tried adding a job listener, like so, but that doesn’t seem to ever get fired off: val statementSet = streamTableEnv.createStatementSet() statementSet.addInsertSql(""" INSERT INTO table1_staging SELECT * FROM table """) statementSet.addInsertSql(""" INSERT INTO table2_staging SELECT * FROM table """) statementSet.addInsertSql(""" INSERT INTO table3_staging SELECT * FROM table3 """) streamEnv.registerJobListener(new JobListener() { override def onJobSubmitted(jobClient: JobClient, throwable: Throwable): Unit = {} override def onJobExecuted(result: JobExecutionResult, throwable: Throwable): Unit = { val time = Option(result).map(_.getNetRuntime()) if (throwable == null) { Log.info(s"Completed job successfully in $time milliseconds") } else { Log.error(s"Unable to execute job successfully", throwable) } } }) statementSet.execute() I tried the above with the execute before and after the register, but it doesn’t seem to fire in any case. I also tried this: Try(statementSet.execute().getJobClient().get().getJobStatus().join()) .map { _ => Log.info(s"Completed job successfully") } .recover { case t => { Log.error(s"Unable to execute job successfully", t) } } And this seems to have fired WAY before the job actually finished flowing all the data through. I tried both join and get on the job status CompleteableFuture Is there anything I’m missing as far as being able to tell when the job is complete? Again, this is Flink 1.11.2 that I’m running. Thanks, Dylan Forciea From: Dylan Forciea Date: Monday, December 7, 2020 at 8:04 AM To: "user@flink.apache.org" Subject: Batch loading into postgres database I am setting up a Flink job that will reload a table in a postgres database using the Flink SQL functionality. I just wanted to make sure that given the current feature set I am going about this the correct way. I am currently using version 1.11.2, but plan on upgrading to 1.12 soon whenever it is finalized. I have setup a staging table and a final table in a postgres database. My plan is to have a Flink application that will truncate the contents of the staging table before the job begins using JDBC, run the job to completion, and then with JDBC delete/insert into the final table from the staging table in a transaction after the job completes. Is this the expected way to interact with postgres in a batch job like this? Or is there some functionality or method that I am missing? Regards, Dylan Forciae
Batch loading into postgres database
I am setting up a Flink job that will reload a table in a postgres database using the Flink SQL functionality. I just wanted to make sure that given the current feature set I am going about this the correct way. I am currently using version 1.11.2, but plan on upgrading to 1.12 soon whenever it is finalized. I have setup a staging table and a final table in a postgres database. My plan is to have a Flink application that will truncate the contents of the staging table before the job begins using JDBC, run the job to completion, and then with JDBC delete/insert into the final table from the staging table in a transaction after the job completes. Is this the expected way to interact with postgres in a batch job like this? Or is there some functionality or method that I am missing? Regards, Dylan Forciae
Re: Lateral join not finding correlate variable
Godfrey, Glad I could help! I suspected that was what the problem was. I have made a view in my postgres database to perform the inner lateral join, so that should let me work around this for the time being. Thanks, Dylan From: godfrey he Date: Friday, November 20, 2020 at 1:09 AM To: Dylan Forciea Cc: "user@flink.apache.org" Subject: Re: Lateral join not finding correlate variable Hi Dylan, I have reproduced your issue based on your code, Currently Flink does not support such nested correlate pattern query. I have created a issue to track this [1]. Thanks for your reporting and help. [1] https://issues.apache.org/jira/browse/FLINK-20255 Best, Godfrey Dylan Forciea mailto:dy...@oseberg.io>> 于2020年11月19日周四 下午12:10写道: Godfrey, I confirmed that in Flink 1.11.2 and in 1.12-SNAPSHOT I get the stack trace running exactly this code: import org.apache.flink.api.scala._ import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.types.Row import org.apache.flink.table.annotation.FunctionHint import org.apache.flink.table.annotation.DataTypeHint import org.apache.flink.table.functions.TableFunction @FunctionHint(output = new DataTypeHint("ROW")) class SplitStringToRows extends TableFunction[Row] { def eval(str: String, separator: String = ";"): Unit = { if (str != null) { str.split(separator).foreach(s => collect(Row.of(s.trim( } } } object Job { def main(args: Array[String]): Unit = { val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings) streamTableEnv.createTemporarySystemFunction( "SplitStringToRows", classOf[SplitStringToRows] ) // Class defined in previous email streamTableEnv.executeSql( """ CREATE TABLE table1 ( id_source BIGINT PRIMARY KEY, attr1_source STRING, attr2 STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://host.domain.com/db1?ssl=true<http://host.domain.com/db1?ssl=true>', 'table-name' = '', 'username' = '', 'password' = '', 'scan.fetch-size' = '500', 'scan.auto-commit' = 'false') """) streamTableEnv.executeSql( """ CREATE TABLE table2 ( attr1_source STRING, attr2 STRING, attr3 DECIMAL, attr4 DATE ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://host.domain.com/db1?ssl=true<http://host.domain.com/db1?ssl=true>', 'table-name' = '', 'username' = '', 'password' = '', 'scan.fetch-size' = '500', 'scan.auto-commit' = 'false') """) val q1 = streamTableEnv.sqlQuery(""" SELECT id_source AS id, attr1_source AS attr1, attr2 FROM table1 """) streamTableEnv.createTemporaryView("view1", q1) val q2 = streamTableEnv.sqlQuery( """ SELECT a.attr1 AS attr1, attr2, attr3, attr4 FROM table2 p, LATERAL TABLE(SplitStringToRows(p.attr1_source, ';')) AS a(attr1) """) streamTableEnv.createTemporaryView("view2", q2) val q3 = streamTableEnv.sqlQuery(""" SELECT w.attr1, p.attr3 FROM view1 w LEFT JOIN LATERAL ( SELECT attr1, attr3 FROM ( SELECT attr1, attr3, ROW_NUMBER() OVER ( PARTITION BY attr1 ORDER BY attr4 DESC NULLS LAST, w.attr2 = attr2 DESC NULLS LAST ) AS row_num FROM view2) WHERE row_num = 1) p ON (w.attr1 = p.attr1) """) streamTableEnv.createTemporaryView("view3", q3) val view3 = streamTableEnv.from("view3") view3 .toRetractStream[Row] .writeAsCsv("./view3.csv", WriteMode.OVERWRITE) .setParallelism(1) streamEnv.execute() } } Thanks, Dylan Forciea From: godfrey he mailto:godfre...@gmail.com>> Date: Wednesday, November 18, 2020 at 8:29 PM To: Dylan Forciea mailto:dy...@oseberg.io>> Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" mailto:user@flink.apache.org>> Subject: Re: Lateral join not finding correlate variable Dylan, Thanks for you feedback, if the planner encounters "unexpected correlate variable $cor2 in the plan" exce
Re: Filter Null in Array in SQL Connector
Ah yes, missed the kafka part and just saw the array part. FLINK-19771 definitely was solely in the postgres-specific code. Dylan From: Jark Wu Date: Thursday, November 19, 2020 at 9:12 AM To: Dylan Forciea Cc: Danny Chan , Rex Fenley , Flink ML Subject: Re: Filter Null in Array in SQL Connector Hi Dylan, I think Rex encountered another issue, because he is using Kafka with Debezium format. Hi Rex, If you can share the json data and the exception stack, that would be helpful! Besides, you can try to enable 'debezium-json.ignore-parse-errors' option [1] to skip the dirty data. Best, Jark [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors On Thu, 19 Nov 2020 at 21:13, Dylan Forciea mailto:dy...@oseberg.io>> wrote: Do you mean that the array contains values that are null, or that the entire array itself is null? If it’s the latter, I have an issue written, along with a PR to fix it that has been pending review [1]. Regards, Dylan Forciea [1] https://issues.apache.org/jira/browse/FLINK-19771 From: Danny Chan mailto:danny0...@apache.org>> Date: Thursday, November 19, 2020 at 2:24 AM To: Rex Fenley mailto:r...@remind101.com>> Cc: Flink ML mailto:user@flink.apache.org>> Subject: Re: Filter Null in Array in SQL Connector Hi, Fenley ~ You are right, parsing nulls of ARRAY field is not supported now, i have logged an issue [1] and would fix it soon ~ [1] https://issues.apache.org/jira/browse/FLINK-20234 Rex Fenley mailto:r...@remind101.com>> 于2020年11月19日周四 下午2:51写道: Hi, I recently discovered some of our data has NULL values arriving in an ARRAY column. This column is being consumed by Flink via the Kafka connector Debezium format. We seem to be receiving NullPointerExceptions for when these NULL values in the arrays arrive which restarts the source operator in a loop. Is there any way to not throw or to possibly filter out NULLs in an Array of Strings in Flink? We're somewhat stuck on how to solve this problem, we'd like to be defensive about this on Flink's side. Thanks! (P.S. The exception was not that informative, there may be room for improvement in terms of a richer error message when this happens.) -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com<https://www.remind.com/> | BLOG<http://blog.remind.com/> | FOLLOW US<https://twitter.com/remindhq> | LIKE US<https://www.facebook.com/remindhq>
Re: Filter Null in Array in SQL Connector
Do you mean that the array contains values that are null, or that the entire array itself is null? If it’s the latter, I have an issue written, along with a PR to fix it that has been pending review [1]. Regards, Dylan Forciea [1] https://issues.apache.org/jira/browse/FLINK-19771 From: Danny Chan Date: Thursday, November 19, 2020 at 2:24 AM To: Rex Fenley Cc: Flink ML Subject: Re: Filter Null in Array in SQL Connector Hi, Fenley ~ You are right, parsing nulls of ARRAY field is not supported now, i have logged an issue [1] and would fix it soon ~ [1] https://issues.apache.org/jira/browse/FLINK-20234 Rex Fenley mailto:r...@remind101.com>> 于2020年11月19日周四 下午2:51写道: Hi, I recently discovered some of our data has NULL values arriving in an ARRAY column. This column is being consumed by Flink via the Kafka connector Debezium format. We seem to be receiving NullPointerExceptions for when these NULL values in the arrays arrive which restarts the source operator in a loop. Is there any way to not throw or to possibly filter out NULLs in an Array of Strings in Flink? We're somewhat stuck on how to solve this problem, we'd like to be defensive about this on Flink's side. Thanks! (P.S. The exception was not that informative, there may be room for improvement in terms of a richer error message when this happens.) -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com<https://www.remind.com/> | BLOG<http://blog.remind.com/> | FOLLOW US<https://twitter.com/remindhq> | LIKE US<https://www.facebook.com/remindhq>
Re: Lateral join not finding correlate variable
Godfrey, I confirmed that in Flink 1.11.2 and in 1.12-SNAPSHOT I get the stack trace running exactly this code: import org.apache.flink.api.scala._ import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.types.Row import org.apache.flink.table.annotation.FunctionHint import org.apache.flink.table.annotation.DataTypeHint import org.apache.flink.table.functions.TableFunction @FunctionHint(output = new DataTypeHint("ROW")) class SplitStringToRows extends TableFunction[Row] { def eval(str: String, separator: String = ";"): Unit = { if (str != null) { str.split(separator).foreach(s => collect(Row.of(s.trim( } } } object Job { def main(args: Array[String]): Unit = { val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings) streamTableEnv.createTemporarySystemFunction( "SplitStringToRows", classOf[SplitStringToRows] ) // Class defined in previous email streamTableEnv.executeSql( """ CREATE TABLE table1 ( id_source BIGINT PRIMARY KEY, attr1_source STRING, attr2 STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://host.domain.com/db1?ssl=true', 'table-name' = '', 'username' = '', 'password' = '', 'scan.fetch-size' = '500', 'scan.auto-commit' = 'false') """) streamTableEnv.executeSql( """ CREATE TABLE table2 ( attr1_source STRING, attr2 STRING, attr3 DECIMAL, attr4 DATE ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://host.domain.com/db1?ssl=true', 'table-name' = '', 'username' = '', 'password' = '', 'scan.fetch-size' = '500', 'scan.auto-commit' = 'false') """) val q1 = streamTableEnv.sqlQuery(""" SELECT id_source AS id, attr1_source AS attr1, attr2 FROM table1 """) streamTableEnv.createTemporaryView("view1", q1) val q2 = streamTableEnv.sqlQuery( """ SELECT a.attr1 AS attr1, attr2, attr3, attr4 FROM table2 p, LATERAL TABLE(SplitStringToRows(p.attr1_source, ';')) AS a(attr1) """) streamTableEnv.createTemporaryView("view2", q2) val q3 = streamTableEnv.sqlQuery(""" SELECT w.attr1, p.attr3 FROM view1 w LEFT JOIN LATERAL ( SELECT attr1, attr3 FROM ( SELECT attr1, attr3, ROW_NUMBER() OVER ( PARTITION BY attr1 ORDER BY attr4 DESC NULLS LAST, w.attr2 = attr2 DESC NULLS LAST ) AS row_num FROM view2) WHERE row_num = 1) p ON (w.attr1 = p.attr1) """) streamTableEnv.createTemporaryView("view3", q3) val view3 = streamTableEnv.from("view3") view3 .toRetractStream[Row] .writeAsCsv("./view3.csv", WriteMode.OVERWRITE) .setParallelism(1) streamEnv.execute() } } Thanks, Dylan Forciea From: godfrey he Date: Wednesday, November 18, 2020 at 8:29 PM To: Dylan Forciea Cc: "user@flink.apache.org" Subject: Re: Lateral join not finding correlate variable Dylan, Thanks for you feedback, if the planner encounters "unexpected correlate variable $cor2 in the plan" exception, There's a high probability that FlinkDecorrelateProgram has some bugs or the query pattern is not supported now. I try to use JDBC Connector as the input tables, but I still don't reproduce the exception. Could you provide your full code, including ddl, query, etc. Thanks so much. Best, Godfrey Dylan Forciea mailto:dy...@oseberg.io>> 于2020年11月18日周三 下午10:09写道: Godfrey, I was using Flink 1.11.2, but I just tried switching to 1.12-SNAPSHOT and am still having the same issue. Note that I am using the JDBC Connector for the input tables, and table1 and table2 are actually created from queries on those connector tables and not directly. Since you indicated what I did should work, I played around a bit more, and determined it’s something inside of the table2 query that is triggering the error. The id field there is generated by a table function. Removing that piece made the plan start working. Table 2 is formulated as follows: SELECT T.id, attr2,
Re: Lateral join not finding correlate variable
Godfrey, I was using Flink 1.11.2, but I just tried switching to 1.12-SNAPSHOT and am still having the same issue. Note that I am using the JDBC Connector for the input tables, and table1 and table2 are actually created from queries on those connector tables and not directly. Since you indicated what I did should work, I played around a bit more, and determined it’s something inside of the table2 query that is triggering the error. The id field there is generated by a table function. Removing that piece made the plan start working. Table 2 is formulated as follows: SELECT T.id, attr2, attr3, attr4 FROM table3 t3, LATERAL TABLE(SplitStringToRows(t3.id, ';')) AS T(id) Where SplitStringToRows is defined as: @FunctionHint(output = new DataTypeHint("ROW")) class SplitStringToRows extends TableFunction[Row] { def eval(str: String, separator: String = ";"): Unit = { if (str != null) { str.split(separator).foreach(s => collect(Row.of(s.trim( } } } Removing the lateral table bit in that first table made the original query plan work correctly. I greatly appreciate your assistance! Regards, Dylan Forciea From: godfrey he Date: Wednesday, November 18, 2020 at 7:33 AM To: Dylan Forciea Cc: "user@flink.apache.org" Subject: Re: Lateral join not finding correlate variable Hi Dylan, Could you provide which Flink version you find out the problem with? I test the above query on master, and I get the plan, no errors occur. Here is my test case: @Test def testLateralJoin(): Unit = { util.addTableSource[(String, String, String, String, String)]("table1", 'id, 'attr1, 'attr2, 'attr3, 'attr4) util.addTableSource[(String, String, String, String, String)]("table2", 'id, 'attr1, 'attr2, 'attr3, 'attr4) val query = """ |SELECT | t1.id<http://t1.id>, | t1.attr1, | t2.attr2 |FROM table1 t1 |LEFT JOIN LATERAL ( | SELECT |id, |attr2 | FROM ( |SELECT | id, | attr2, | ROW_NUMBER() OVER ( |PARTITION BY id |ORDER BY | attr3 DESC, | t1.attr4 = attr4 DESC | ) AS row_num |FROM table2) |WHERE row_num = 1) t2 |ON t1.id<http://t1.id> = t2.id<http://t2.id> |""".stripMargin util.verifyPlan(query) } Best, Godfrey Dylan Forciea mailto:dy...@oseberg.io>> 于2020年11月18日周三 上午7:44写道: This may be due to not understanding lateral joins in Flink – perhaps you can only do so on temporal variables – but I figured I’d ask since the error message isn’t intuitive. I am trying to do a combination of a lateral join and a top N query. Part of my ordering is based upon whether the a value in the left side of the query matches up. I’m trying to do this in the general form of: SELECT t1.id<http://t1.id>, t1.attr1, t2.attr2 FROM table1 t1 LEFT JOIN LATERAL ( SELECT id, attr2 FROM ( SELECT id, attr2, ROW_NUMBER() OVER ( PARTITION BY id ORDER BY attr3 DESC, t1.attr4 = attr4 DESC ) AS row_num FROM table2 WHERE row_num = 1) t2 ON (t1.id<http://t1.id> = t2.id<http://t2.id>) I am getting an error that looks like: Exception in thread "main" org.apache.flink.table.api.TableException: unexpected correlate variable $cor2 in the plan at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:58) at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187) at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamComm
Lateral join not finding correlate variable
This may be due to not understanding lateral joins in Flink – perhaps you can only do so on temporal variables – but I figured I’d ask since the error message isn’t intuitive. I am trying to do a combination of a lateral join and a top N query. Part of my ordering is based upon whether the a value in the left side of the query matches up. I’m trying to do this in the general form of: SELECT t1.id, t1.attr1, t2.attr2 FROM table1 t1 LEFT JOIN LATERAL ( SELECT id, attr2 FROM ( SELECT id, attr2, ROW_NUMBER() OVER ( PARTITION BY id ORDER BY attr3 DESC, t1.attr4 = attr4 DESC ) AS row_num FROM table2 WHERE row_num = 1) t2 ON (t1.id = t2.id) I am getting an error that looks like: Exception in thread "main" org.apache.flink.table.api.TableException: unexpected correlate variable $cor2 in the plan at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:58) at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187) at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:294) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178) at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113) at org.apache.flink.table.api.bridge.scala.TableConversions.toRetractStream(TableConversions.scala:97) at io.oseberg.flink.well.ok.Job$.main(Job.scala:57) at io.oseberg.flink.well.ok.Job.main(Job.scala) The only other thing I can think of doing is creating a Table Aggregate function to pull this off. But, I wanted to check to make sure I wasn’t doing something wrong in the above first, or if there is something I’m not thinking of doing. Regards, Dylan Forciea
Re: NullPointerException when trying to read null array in Postgres using JDBC Connector
Danny, Thanks! I have created a new JIRA issue [1]. I’ll look into how hard it is to get a patch and unit test myself, although I may need a hand on the process of making a change to both the master branch and a release branch if it is desired to get a fix into 1.11. Regards, Dylan Forciea [1] https://issues.apache.org/jira/browse/FLINK-19771 From: Danny Chan Date: Thursday, October 22, 2020 at 4:34 AM To: Dylan Forciea Cc: Flink ML Subject: Re: NullPointerException when trying to read null array in Postgres using JDBC Connector Yes, the current code throws directly for NULLs, can you log an issue there ? Dylan Forciea mailto:dy...@oseberg.io>> 于2020年10月21日周三 上午4:30写道: I believe I am getting an error because I have a nullable postgres array of text that is set to NULL that I’m reading using the JDBC SQL Connector. Is this something that should be allowed? Looking at the source code line below, it doesn’t look like the case of an array being null would be handled. [error] Caused by: java.io.IOException: Couldn't access resultSet [error] at org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:266) [error] at org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:57) [error] at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91) [error] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) [error] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) [error] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) [error] Caused by: java.lang.NullPointerException [error] at org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.lambda$createPostgresArrayConverter$c06ce9f4$2(PostgresRowConverter.java:97) [error] at org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.toInternal(AbstractJdbcRowConverter.java:79) [error] at org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:259) [error] ... 5 more Thanks, Dylan Forciea
NullPointerException when trying to read null array in Postgres using JDBC Connector
I believe I am getting an error because I have a nullable postgres array of text that is set to NULL that I’m reading using the JDBC SQL Connector. Is this something that should be allowed? Looking at the source code line below, it doesn’t look like the case of an array being null would be handled. [error] Caused by: java.io.IOException: Couldn't access resultSet [error] at org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:266) [error] at org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:57) [error] at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91) [error] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) [error] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) [error] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) [error] Caused by: java.lang.NullPointerException [error] at org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.lambda$createPostgresArrayConverter$c06ce9f4$2(PostgresRowConverter.java:97) [error] at org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.toInternal(AbstractJdbcRowConverter.java:79) [error] at org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:259) [error] ... 5 more Thanks, Dylan Forciea
Re: Setting JDBC connector options using JdbcCatalog
(Re-sending, so it also goes to the list serve) Leonard, That does seem to work for me, thank you! I was looking in the Table API, so that’s why I didn’t find that option. Is there a way to specify the sql hints using the Table API? I had tried appending them onto a call to .from on the table environment, but it wasn’t happy with that. Thanks, Dylan Forciea From: Leonard Xu Date: Wednesday, October 14, 2020 at 10:20 PM To: Dylan Forciea Cc: "user@flink.apache.org" Subject: Re: Setting JDBC connector options using JdbcCatalog Hi, Dylan The table in JdbcCatalog only contains basic options, it’s normal the table from JdbcCatalog does not bring some options. Flink provides SQL Hints feature to specify or override table options[1], you can have a try. Best, Leonard [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/hints.html#dynamic-table-options 在 2020年10月15日,10:20,Dylan Forciea mailto:dy...@oseberg.io>> 写道: scan.fetch-size
Setting JDBC connector options using JdbcCatalog
I was experimenting with the JdbcCatalog, and I see that the options match some of the SQL WITH options. I looked at the source code, and even see that it directly references those options from JdbcDynamicTableFactory. However, I didn’t see any obvious way to set scan.fetch-size or any way to generically set those properties that would go in the WITH clause of the SQL interface beyond the JdbcCatalog constructor arguments. Is there any way to set those after the catalog is created/registered? Thanks, Dylan Forciea
Re: autoCommit for postgres jdbc streaming in Table/SQL API
Jark, Thank you! I had actually mistyped the JIRA issue; autoCommit needs to be set to false for streaming to work. The default on the driver is true when the option isn’t specified. I’ve updated the issue accordingly. Setting this to false automatically on the read path would fix my issue. However, I’m only certain that this is proper for Postgres. I’m not sure if this should be done for other drivers, although my gut would say it should be fine if it’s only done for reading. My patch as it is will set the builder to not specify whether to set autoCommit if the option is not specified, which means it would then be left at the default of true. That would conflict with the 1.11 patch you suggested. Let me know if you think I should make the default true in the SQL API. https://github.com/apache/flink/pull/13570 Regards, Dylan From: Jark Wu Date: Thursday, October 8, 2020 at 10:15 PM To: Dylan Forciea Cc: Till Rohrmann , dev , Shengkai Fang , "user@flink.apache.org" , Leonard Xu Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API Hi Dylan, Sorry for the late reply. We've just come back from a long holiday. Thanks for reporting this problem. First, I think this is a bug that `autoCommit` is false by default (JdbcRowDataInputFormat.Builder). We can fix the default to true in 1.11 series, and I think this can solve your problem in a short term? Besides, we should expose the connector options to set auto commit and this can be another issue to be implemented in master. I'm glad to review the code. What do you think? Regarding to the failed JMXReporterFactoryTest, I think this is a known issue, see FLINK-19539 [1] Best, Jark [1]: https://issues.apache.org/jira/browse/FLINK-19539 On Fri, 9 Oct 2020 at 01:29, Dylan Forciea mailto:dy...@oseberg.io>> wrote: I’ve updated the unit tests and documentation, and I was running the azure test pipeline as described in the instructions. However, it appears that what seems to be an unrelated test for the JMX code failed. Is this a matter of me not setting things up correctly? I wanted to ensure everything looked good before I submitted the PR. [ERROR] Failures: [ERROR] JMXReporterFactoryTest.testPortRangeArgument:46 Expected: (a value equal to or greater than <9000> and a value less than or equal to <9010>) but: a value less than or equal to <9010> <9040> was greater than <9010> [ERROR] JMXReporterFactoryTest.testWithoutArgument:60 [INFO] [ERROR] Tests run: 10, Failures: 2, Errors: 0, Skipped: 0 Thanks, Dylan Forciea From: Till Rohrmann mailto:trohrm...@apache.org>> Date: Thursday, October 8, 2020 at 2:29 AM To: Dylan Forciea mailto:dy...@oseberg.io>> Cc: dev mailto:d...@flink.apache.org>>, Shengkai Fang mailto:fskm...@gmail.com>>, "user@flink.apache.org<mailto:user@flink.apache.org>" mailto:user@flink.apache.org>>, "j...@apache.org<mailto:j...@apache.org>" mailto:j...@apache.org>>, Leonard Xu mailto:xbjt...@gmail.com>> Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API This sounds good. Maybe there are others in the community who can help with the review before the Jark and Leonard are back. Cheers, Till On Wed, Oct 7, 2020 at 7:33 PM Dylan Forciea mailto:dy...@oseberg.io>> wrote: Actually…. It looks like what I did covers both cases. I’ll see about getting some unit tests and documentation updated. Dylan From: Dylan Forciea mailto:dy...@oseberg.io>> Date: Wednesday, October 7, 2020 at 11:47 AM To: Till Rohrmann mailto:trohrm...@apache.org>>, dev mailto:d...@flink.apache.org>> Cc: Shengkai Fang mailto:fskm...@gmail.com>>, "user@flink.apache.org<mailto:user@flink.apache.org>" mailto:user@flink.apache.org>>, "j...@apache.org<mailto:j...@apache.org>" mailto:j...@apache.org>>, Leonard Xu mailto:xbjt...@gmail.com>> Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API Ok, I have created FLINK-19522 describing the issue. I have the code I made so far checked in at https://github.com/apache/flink/compare/master...dforciea:FLINK-19522 but this only fixes the SQL API. It sounds like there may be another change needed for the Table API… I’ll look into that and see if I can figure it out on my own while they’re out. I will also need to add some unit tests and update some documentation to get this ready for a PR. Thanks, Dylan From: Till Rohrmann mailto:trohrm...@apache.org>> Date: Wednesday, October 7, 2020 at 10:55 AM To: dev mailto:d...@flink.apache.org>> Cc: Shengkai Fang mailto:fskm...@gmail.com>>, "user@flink.apache.org<mailto:user@flink.apache.org>" mailto:user@flink.apache.org>>, "j...@apache.org<mailto:j...@apache.org>" mailto:j...@apache.org>>, Leonard Xu mailto:xbjt...@gmail.com>
Re: autoCommit for postgres jdbc streaming in Table/SQL API
I’ve updated the unit tests and documentation, and I was running the azure test pipeline as described in the instructions. However, it appears that what seems to be an unrelated test for the JMX code failed. Is this a matter of me not setting things up correctly? I wanted to ensure everything looked good before I submitted the PR. [ERROR] Failures: [ERROR] JMXReporterFactoryTest.testPortRangeArgument:46 Expected: (a value equal to or greater than <9000> and a value less than or equal to <9010>) but: a value less than or equal to <9010> <9040> was greater than <9010> [ERROR] JMXReporterFactoryTest.testWithoutArgument:60 [INFO] [ERROR] Tests run: 10, Failures: 2, Errors: 0, Skipped: 0 Thanks, Dylan Forciea From: Till Rohrmann Date: Thursday, October 8, 2020 at 2:29 AM To: Dylan Forciea Cc: dev , Shengkai Fang , "user@flink.apache.org" , "j...@apache.org" , Leonard Xu Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API This sounds good. Maybe there are others in the community who can help with the review before the Jark and Leonard are back. Cheers, Till On Wed, Oct 7, 2020 at 7:33 PM Dylan Forciea mailto:dy...@oseberg.io>> wrote: Actually…. It looks like what I did covers both cases. I’ll see about getting some unit tests and documentation updated. Dylan From: Dylan Forciea mailto:dy...@oseberg.io>> Date: Wednesday, October 7, 2020 at 11:47 AM To: Till Rohrmann mailto:trohrm...@apache.org>>, dev mailto:d...@flink.apache.org>> Cc: Shengkai Fang mailto:fskm...@gmail.com>>, "user@flink.apache.org<mailto:user@flink.apache.org>" mailto:user@flink.apache.org>>, "j...@apache.org<mailto:j...@apache.org>" mailto:j...@apache.org>>, Leonard Xu mailto:xbjt...@gmail.com>> Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API Ok, I have created FLINK-19522 describing the issue. I have the code I made so far checked in at https://github.com/apache/flink/compare/master...dforciea:FLINK-19522 but this only fixes the SQL API. It sounds like there may be another change needed for the Table API… I’ll look into that and see if I can figure it out on my own while they’re out. I will also need to add some unit tests and update some documentation to get this ready for a PR. Thanks, Dylan From: Till Rohrmann mailto:trohrm...@apache.org>> Date: Wednesday, October 7, 2020 at 10:55 AM To: dev mailto:d...@flink.apache.org>> Cc: Shengkai Fang mailto:fskm...@gmail.com>>, "user@flink.apache.org<mailto:user@flink.apache.org>" mailto:user@flink.apache.org>>, "j...@apache.org<mailto:j...@apache.org>" mailto:j...@apache.org>>, Leonard Xu mailto:xbjt...@gmail.com>> Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API Hi Dylan, thanks for reaching out to the Flink community and excuse our late response. I am not an expert for the Table API and its JDBC connector but what you describe sounds like a missing feature. Also given that FLINK-12198 enabled this feature for the JDBCInputFormat indicates that we might simply need to make it configurable from the JdbcTableSource. I am pulling in Jark and Leonard who worked on the JdbcTableSource and might help you to get this feature into Flink. Their response could take a week because they are currently on vacation if I am not mistaken. What you could already do is to open an issue linking FLINK-12198 and describing the problem and your solution proposal. [1] https://issues.apache.org/jira/browse/FLINK-12198 Cheers, Till On Wed, Oct 7, 2020 at 5:00 PM Dylan Forciea mailto:dy...@oseberg.io>> wrote: I appreciate it! Let me know if you want me to submit a PR against the issue after it is created. It wasn’t a huge amount of code, so it’s probably not a big deal if you wanted to redo it. Thanks, Dylan From: Shengkai Fang mailto:fskm...@gmail.com>> Date: Wednesday, October 7, 2020 at 9:06 AM To: Dylan Forciea mailto:dy...@oseberg.io>> Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API Sorry for late response. +1 to support it. I will open a jira about it later. Dylan Forciea mailto:dy...@oseberg.io><mailto:dy...@oseberg.io<mailto:dy...@oseberg.io>>>于2020年10月7日 周三下午9:53写道: I hadn’t heard a response on this, so I’m going to expand this to the dev email list. If this is indeed an issue and not my misunderstanding, I have most of a patch already coded up. Please let me know, and I can create a JIRA issue and send out a PR. Regards, Dylan Forciea Oseberg From: Dylan Forciea mailto:dy...@oseberg.io><mailto:dy...@oseberg.io<mailto:dy...@oseberg.io>>> Date: Thursday, October 1, 2020 at 5:14 PM To: "user@flink.apache.org<mailto:user@flink.apache.org><mailto:user@flink.apache.org<mailt
Re: autoCommit for postgres jdbc streaming in Table/SQL API
Actually…. It looks like what I did covers both cases. I’ll see about getting some unit tests and documentation updated. Dylan From: Dylan Forciea Date: Wednesday, October 7, 2020 at 11:47 AM To: Till Rohrmann , dev Cc: Shengkai Fang , "user@flink.apache.org" , "j...@apache.org" , Leonard Xu Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API Ok, I have created FLINK-19522 describing the issue. I have the code I made so far checked in at https://github.com/apache/flink/compare/master...dforciea:FLINK-19522 but this only fixes the SQL API. It sounds like there may be another change needed for the Table API… I’ll look into that and see if I can figure it out on my own while they’re out. I will also need to add some unit tests and update some documentation to get this ready for a PR. Thanks, Dylan From: Till Rohrmann Date: Wednesday, October 7, 2020 at 10:55 AM To: dev Cc: Shengkai Fang , "user@flink.apache.org" , "j...@apache.org" , Leonard Xu Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API Hi Dylan, thanks for reaching out to the Flink community and excuse our late response. I am not an expert for the Table API and its JDBC connector but what you describe sounds like a missing feature. Also given that FLINK-12198 enabled this feature for the JDBCInputFormat indicates that we might simply need to make it configurable from the JdbcTableSource. I am pulling in Jark and Leonard who worked on the JdbcTableSource and might help you to get this feature into Flink. Their response could take a week because they are currently on vacation if I am not mistaken. What you could already do is to open an issue linking FLINK-12198 and describing the problem and your solution proposal. [1] https://issues.apache.org/jira/browse/FLINK-12198 Cheers, Till On Wed, Oct 7, 2020 at 5:00 PM Dylan Forciea mailto:dy...@oseberg.io>> wrote: I appreciate it! Let me know if you want me to submit a PR against the issue after it is created. It wasn’t a huge amount of code, so it’s probably not a big deal if you wanted to redo it. Thanks, Dylan From: Shengkai Fang mailto:fskm...@gmail.com>> Date: Wednesday, October 7, 2020 at 9:06 AM To: Dylan Forciea mailto:dy...@oseberg.io>> Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API Sorry for late response. +1 to support it. I will open a jira about it later. Dylan Forciea mailto:dy...@oseberg.io><mailto:dy...@oseberg.io<mailto:dy...@oseberg.io>>>于2020年10月7日 周三下午9:53写道: I hadn’t heard a response on this, so I’m going to expand this to the dev email list. If this is indeed an issue and not my misunderstanding, I have most of a patch already coded up. Please let me know, and I can create a JIRA issue and send out a PR. Regards, Dylan Forciea Oseberg From: Dylan Forciea mailto:dy...@oseberg.io><mailto:dy...@oseberg.io<mailto:dy...@oseberg.io>>> Date: Thursday, October 1, 2020 at 5:14 PM To: "user@flink.apache.org<mailto:user@flink.apache.org><mailto:user@flink.apache.org<mailto:user@flink.apache.org>>" mailto:user@flink.apache.org><mailto:user@flink.apache.org<mailto:user@flink.apache.org>>> Subject: autoCommit for postgres jdbc streaming in Table/SQL API Hi! I’ve just recently started evaluating Flink for our ETL needs, and I ran across an issue with streaming postgres data via the Table/SQL API. I see that the API has the scan.fetch-size option, but not scan.auto-commit per https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html . I had attempted to load a large table in, but it completely slurped it into memory before starting the streaming. I modified the flink source code to add a scan.auto-commit option, and I was then able to immediately start streaming and cut my memory usage way down. I see in this thread that there was a similar issue resolved for JDBCInputFormat in this thread: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-JDBC-Disable-auto-commit-mode-td27256.html , but I don’t see a way to utilize that in the Table/SQL API. Am I missing something on how to pull this off? Regards, Dylan Forciea Oseberg
Re: autoCommit for postgres jdbc streaming in Table/SQL API
Ok, I have created FLINK-19522 describing the issue. I have the code I made so far checked in at https://github.com/apache/flink/compare/master...dforciea:FLINK-19522 but this only fixes the SQL API. It sounds like there may be another change needed for the Table API… I’ll look into that and see if I can figure it out on my own while they’re out. I will also need to add some unit tests and update some documentation to get this ready for a PR. Thanks, Dylan From: Till Rohrmann Date: Wednesday, October 7, 2020 at 10:55 AM To: dev Cc: Shengkai Fang , "user@flink.apache.org" , "j...@apache.org" , Leonard Xu Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API Hi Dylan, thanks for reaching out to the Flink community and excuse our late response. I am not an expert for the Table API and its JDBC connector but what you describe sounds like a missing feature. Also given that FLINK-12198 enabled this feature for the JDBCInputFormat indicates that we might simply need to make it configurable from the JdbcTableSource. I am pulling in Jark and Leonard who worked on the JdbcTableSource and might help you to get this feature into Flink. Their response could take a week because they are currently on vacation if I am not mistaken. What you could already do is to open an issue linking FLINK-12198 and describing the problem and your solution proposal. [1] https://issues.apache.org/jira/browse/FLINK-12198 Cheers, Till On Wed, Oct 7, 2020 at 5:00 PM Dylan Forciea mailto:dy...@oseberg.io>> wrote: I appreciate it! Let me know if you want me to submit a PR against the issue after it is created. It wasn’t a huge amount of code, so it’s probably not a big deal if you wanted to redo it. Thanks, Dylan From: Shengkai Fang mailto:fskm...@gmail.com>> Date: Wednesday, October 7, 2020 at 9:06 AM To: Dylan Forciea mailto:dy...@oseberg.io>> Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API Sorry for late response. +1 to support it. I will open a jira about it later. Dylan Forciea mailto:dy...@oseberg.io><mailto:dy...@oseberg.io<mailto:dy...@oseberg.io>>>于2020年10月7日 周三下午9:53写道: I hadn’t heard a response on this, so I’m going to expand this to the dev email list. If this is indeed an issue and not my misunderstanding, I have most of a patch already coded up. Please let me know, and I can create a JIRA issue and send out a PR. Regards, Dylan Forciea Oseberg From: Dylan Forciea mailto:dy...@oseberg.io><mailto:dy...@oseberg.io<mailto:dy...@oseberg.io>>> Date: Thursday, October 1, 2020 at 5:14 PM To: "user@flink.apache.org<mailto:user@flink.apache.org><mailto:user@flink.apache.org<mailto:user@flink.apache.org>>" mailto:user@flink.apache.org><mailto:user@flink.apache.org<mailto:user@flink.apache.org>>> Subject: autoCommit for postgres jdbc streaming in Table/SQL API Hi! I’ve just recently started evaluating Flink for our ETL needs, and I ran across an issue with streaming postgres data via the Table/SQL API. I see that the API has the scan.fetch-size option, but not scan.auto-commit per https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html . I had attempted to load a large table in, but it completely slurped it into memory before starting the streaming. I modified the flink source code to add a scan.auto-commit option, and I was then able to immediately start streaming and cut my memory usage way down. I see in this thread that there was a similar issue resolved for JDBCInputFormat in this thread: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-JDBC-Disable-auto-commit-mode-td27256.html , but I don’t see a way to utilize that in the Table/SQL API. Am I missing something on how to pull this off? Regards, Dylan Forciea Oseberg
Re: autoCommit for postgres jdbc streaming in Table/SQL API
I appreciate it! Let me know if you want me to submit a PR against the issue after it is created. It wasn’t a huge amount of code, so it’s probably not a big deal if you wanted to redo it. Thanks, Dylan From: Shengkai Fang Date: Wednesday, October 7, 2020 at 9:06 AM To: Dylan Forciea Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API Sorry for late response. +1 to support it. I will open a jira about it later. Dylan Forciea mailto:dy...@oseberg.io>>于2020年10月7日 周三下午9:53写道: I hadn’t heard a response on this, so I’m going to expand this to the dev email list. If this is indeed an issue and not my misunderstanding, I have most of a patch already coded up. Please let me know, and I can create a JIRA issue and send out a PR. Regards, Dylan Forciea Oseberg From: Dylan Forciea mailto:dy...@oseberg.io>> Date: Thursday, October 1, 2020 at 5:14 PM To: "user@flink.apache.org<mailto:user@flink.apache.org>" mailto:user@flink.apache.org>> Subject: autoCommit for postgres jdbc streaming in Table/SQL API Hi! I’ve just recently started evaluating Flink for our ETL needs, and I ran across an issue with streaming postgres data via the Table/SQL API. I see that the API has the scan.fetch-size option, but not scan.auto-commit per https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html . I had attempted to load a large table in, but it completely slurped it into memory before starting the streaming. I modified the flink source code to add a scan.auto-commit option, and I was then able to immediately start streaming and cut my memory usage way down. I see in this thread that there was a similar issue resolved for JDBCInputFormat in this thread: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-JDBC-Disable-auto-commit-mode-td27256.html , but I don’t see a way to utilize that in the Table/SQL API. Am I missing something on how to pull this off? Regards, Dylan Forciea Oseberg
Re: autoCommit for postgres jdbc streaming in Table/SQL API
I hadn’t heard a response on this, so I’m going to expand this to the dev email list. If this is indeed an issue and not my misunderstanding, I have most of a patch already coded up. Please let me know, and I can create a JIRA issue and send out a PR. Regards, Dylan Forciea Oseberg From: Dylan Forciea Date: Thursday, October 1, 2020 at 5:14 PM To: "user@flink.apache.org" Subject: autoCommit for postgres jdbc streaming in Table/SQL API Hi! I’ve just recently started evaluating Flink for our ETL needs, and I ran across an issue with streaming postgres data via the Table/SQL API. I see that the API has the scan.fetch-size option, but not scan.auto-commit per https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html . I had attempted to load a large table in, but it completely slurped it into memory before starting the streaming. I modified the flink source code to add a scan.auto-commit option, and I was then able to immediately start streaming and cut my memory usage way down. I see in this thread that there was a similar issue resolved for JDBCInputFormat in this thread: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-JDBC-Disable-auto-commit-mode-td27256.html , but I don’t see a way to utilize that in the Table/SQL API. Am I missing something on how to pull this off? Regards, Dylan Forciea Oseberg