Piotrek, I am looking at the count of records present in the sink table in Postgres after the entire job completes, not the number of inserts/retracts. I can see as the job runs that records are added and removed from the “sink” table. With parallelism set to 1, it always comes out to the same number (which is consistent with the number of ids in the source tables “table1” and “table2”), at about 491k records in table “sink” when the job is complete. With the parallelism set to 16, the “sink” table will have somewhere around 360k records +/- 20k when the job is complete. I truncate the “sink” table before I run the job, and this is a test environment where the source databases are static.
I removed my line for setting to Batch mode per Timo’s suggestion, and am still running with MAX which should have deterministic output. Dylan From: Piotr Nowojski <pnowoj...@apache.org> Date: Wednesday, April 14, 2021 at 9:38 AM To: Dylan Forciea <dy...@oseberg.io> Cc: "user@flink.apache.org" <user@flink.apache.org> Subject: Re: Nondeterministic results with SQL job when parallelism is > 1 Hi Dylan, But if you are running your query in Streaming mode, aren't you counting retractions from the FULL JOIN? AFAIK in Streaming mode in FULL JOIN, when the first record comes in it will be immediately emitted with NULLs (not matched, as the other table is empty). Later if a matching record is received from the second table, the previous result will be retracted and the new one, updated, will be re-emitted. Maybe this is what you are observing in the varying output? Maybe you could try to analyse how the results differ between different runs? Best, Piotrek śr., 14 kwi 2021 o 16:22 Dylan Forciea <dy...@oseberg.io<mailto:dy...@oseberg.io>> napisał(a): I replaced the FIRST_VALUE with MAX to ensure that the results should be identical even in their content, and my problem still remains – I end up with a nondeterministic count of records being emitted into the sink when the parallelism is over 1, and that count is about 20-25% short (and not consistent) of what comes out consistently when parallelism is set to 1. Dylan From: Dylan Forciea <dy...@oseberg.io<mailto:dy...@oseberg.io>> Date: Wednesday, April 14, 2021 at 9:08 AM To: Piotr Nowojski <pnowoj...@apache.org<mailto:pnowoj...@apache.org>> Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Re: Nondeterministic results with SQL job when parallelism is > 1 Pitorek, I was actually originally using a group function that WAS deterministic (but was a custom UDF I made), but chose something here built in. By non-deterministic, I mean that the number of records coming out is not consistent. Since the FIRST_VALUE here is on an attribute that is not part of the key, that shouldn’t affect the number of records coming out I wouldn’t think. Dylan From: Piotr Nowojski <pnowoj...@apache.org<mailto:pnowoj...@apache.org>> Date: Wednesday, April 14, 2021 at 9:06 AM To: Dylan Forciea <dy...@oseberg.io<mailto:dy...@oseberg.io>> Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Re: Nondeterministic results with SQL job when parallelism is > 1 Hi, Yes, it looks like your query is non deterministic because of `FIRST_VALUE` used inside `GROUP BY`. If you have many different parallel sources, each time you run your query your first value might be different. If that's the case, you could try to confirm it with even smaller query: SELECT id2, FIRST_VALUE(attr) AS attr FROM table2 GROUP BY id2 Best, Piotrek śr., 14 kwi 2021 o 14:45 Dylan Forciea <dy...@oseberg.io<mailto:dy...@oseberg.io>> napisał(a): I am running Flink 1.12.2, and I was trying to up the parallelism of my Flink SQL job to see what happened. However, once I did that, my results became nondeterministic. This happens whether I set the table.exec.resource.default-parallelism config option or I set the default local parallelism to something higher than 1. I would end up with less records in the end, and each time I ran the output record count would come out differently. I managed to distill an example, as pasted below (with attribute names changed to protect company proprietary info), that causes the issue. I feel like I managed to get it to happen with a LEFT JOIN rather than a FULL JOIN, but the distilled version wasn’t giving me wrong results with that. Maybe it has to do with joining to a table that was formed using a GROUP BY? Can somebody tell if I’m doing something that is known not to work, or if I have run across a bug? Regards, Dylan Forciea object Job { def main(args: Array[String]): Unit = { StreamExecutionEnvironment.setDefaultLocalParallelism(1) val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings) val configuration = streamTableEnv.getConfig().getConfiguration() configuration.setInteger("table.exec.resource.default-parallelism", 16) streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH); streamTableEnv.executeSql( """ CREATE TABLE table1 ( id1 STRING PRIMARY KEY NOT ENFORCED, attr STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://…', 'table-name' = 'table1’, 'username' = 'username', 'password' = 'password', 'scan.fetch-size' = '500', 'scan.auto-commit' = 'false' )""") streamTableEnv.executeSql( """ CREATE TABLE table2 ( attr STRING, id2 STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://…', 'table-name' = 'table2', 'username' = 'username', 'password' = 'password', 'scan.fetch-size' = '500', 'scan.auto-commit' = 'false' )""") streamTableEnv.executeSql( """ CREATE TABLE table3 ( attr STRING PRIMARY KEY NOT ENFORCED, attr_mapped STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://…', 'table-name' = ‘table3', 'username' = ‘username', 'password' = 'password', 'scan.fetch-size' = '500', 'scan.auto-commit' = 'false' )""") streamTableEnv.executeSql(""" CREATE TABLE sink ( id STRING PRIMARY KEY NOT ENFORCED, attr STRING, attr_mapped STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://…, 'table-name' = 'sink', 'username' = 'username', 'password' = 'password', 'scan.fetch-size' = '500', 'scan.auto-commit' = 'false' )""") val view = streamTableEnv.sqlQuery(""" SELECT COALESCE(t1.id1, t2.id2) AS id, COALESCE(t2.attr, t1.attr) AS operator, COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped FROM table1 t1 FULL JOIN ( SELECT id2, FIRST_VALUE(attr) AS attr FROM table2 GROUP BY id2 ) t2 ON (t1.id1 = t2.id2) LEFT JOIN table3 t3 ON (COALESCE(t2.attr, t1.attr) = t3.attr)""") streamTableEnv.createTemporaryView("view", view) val statementSet = streamTableEnv.createStatementSet() statementSet.addInsertSql(""" INSERT INTO sink SELECT * FROM view """) statementSet.execute().await() } }