Piotrek,

I am looking at the count of records present in the sink table in Postgres 
after the entire job completes, not the number of inserts/retracts. I can see 
as the job runs that records are added and removed from the “sink” table. With 
parallelism set to 1, it always comes out to the same number (which is 
consistent with the number of ids in the source tables “table1” and “table2”), 
at about 491k records in table “sink” when the job is complete. With the 
parallelism set to 16, the “sink” table will have somewhere around 360k records 
+/- 20k when the job is complete. I truncate the “sink” table before I run the 
job, and this is a test environment where the source databases are static.

I removed my line for setting to Batch mode per Timo’s suggestion, and am still 
running with MAX which should have deterministic output.

Dylan

From: Piotr Nowojski <pnowoj...@apache.org>
Date: Wednesday, April 14, 2021 at 9:38 AM
To: Dylan Forciea <dy...@oseberg.io>
Cc: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: Nondeterministic results with SQL job when parallelism is > 1

Hi Dylan,

But if you are running your query in Streaming mode, aren't you counting 
retractions from the FULL JOIN? AFAIK in Streaming mode in FULL JOIN, when the 
first record comes in it will be immediately emitted with NULLs (not matched, 
as the other table is empty). Later if a matching record is received from the 
second table, the previous result will be retracted and the new one, updated, 
will be re-emitted. Maybe this is what you are observing in the varying output?

Maybe you could try to analyse how the results differ between different runs?

Best,
Piotrek

śr., 14 kwi 2021 o 16:22 Dylan Forciea 
<dy...@oseberg.io<mailto:dy...@oseberg.io>> napisał(a):
I replaced the FIRST_VALUE with MAX to ensure that the results should be 
identical even in their content, and my problem still remains – I end up with a 
nondeterministic count of records being emitted into the sink when the 
parallelism is over 1, and that count is about 20-25% short (and not 
consistent) of what comes out consistently when parallelism is set to 1.

Dylan

From: Dylan Forciea <dy...@oseberg.io<mailto:dy...@oseberg.io>>
Date: Wednesday, April 14, 2021 at 9:08 AM
To: Piotr Nowojski <pnowoj...@apache.org<mailto:pnowoj...@apache.org>>
Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Nondeterministic results with SQL job when parallelism is > 1

Pitorek,

I was actually originally using a group function that WAS deterministic (but 
was a custom UDF I made), but chose something here built in. By 
non-deterministic, I mean that the number of records coming out is not 
consistent. Since the FIRST_VALUE here is on an attribute that is not part of 
the key, that shouldn’t affect the number of records coming out I wouldn’t 
think.

Dylan

From: Piotr Nowojski <pnowoj...@apache.org<mailto:pnowoj...@apache.org>>
Date: Wednesday, April 14, 2021 at 9:06 AM
To: Dylan Forciea <dy...@oseberg.io<mailto:dy...@oseberg.io>>
Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Nondeterministic results with SQL job when parallelism is > 1

Hi,

Yes, it looks like your query is non deterministic because of `FIRST_VALUE` 
used inside `GROUP BY`. If you have many different parallel sources, each time 
you run your query your first value might be different. If that's the case, you 
could try to confirm it with even smaller query:

       SELECT
          id2,
          FIRST_VALUE(attr) AS attr
        FROM table2
        GROUP BY id2

Best,
Piotrek

śr., 14 kwi 2021 o 14:45 Dylan Forciea 
<dy...@oseberg.io<mailto:dy...@oseberg.io>> napisał(a):
I am running Flink 1.12.2, and I was trying to up the parallelism of my Flink 
SQL job to see what happened. However, once I did that, my results became 
nondeterministic. This happens whether I set the 
table.exec.resource.default-parallelism config option or I set the default 
local parallelism to something higher than 1. I would end up with less records 
in the end, and each time I ran the output record count would come out 
differently.

I managed to distill an example, as pasted below (with attribute names changed 
to protect company proprietary info), that causes the issue. I feel like I 
managed to get it to happen with a LEFT JOIN rather than a FULL JOIN, but the 
distilled version wasn’t giving me wrong results with that. Maybe it has to do 
with joining to a table that was formed using a GROUP BY? Can somebody tell if 
I’m doing something that is known not to work, or if I have run across a bug?

Regards,
Dylan Forciea


object Job {
  def main(args: Array[String]): Unit = {
    StreamExecutionEnvironment.setDefaultLocalParallelism(1)

    val settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)

    val configuration = streamTableEnv.getConfig().getConfiguration()
    configuration.setInteger("table.exec.resource.default-parallelism", 16)

    streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);

    streamTableEnv.executeSql(
      """
      CREATE TABLE table1 (
        id1 STRING PRIMARY KEY NOT ENFORCED,
        attr STRING
      ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:postgresql://…',
        'table-name' = 'table1’,
        'username' = 'username',
        'password' = 'password',
        'scan.fetch-size' = '500',
        'scan.auto-commit' = 'false'
      )""")

    streamTableEnv.executeSql(
      """
      CREATE TABLE table2 (
        attr STRING,
        id2 STRING
      ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:postgresql://…',
        'table-name' = 'table2',
        'username' = 'username',
        'password' = 'password',
        'scan.fetch-size' = '500',
        'scan.auto-commit' = 'false'
      )""")

    streamTableEnv.executeSql(
      """
      CREATE TABLE table3 (
        attr STRING PRIMARY KEY NOT ENFORCED,
        attr_mapped STRING
      ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:postgresql://…',
        'table-name' = ‘table3',
        'username' = ‘username',
        'password' = 'password',
        'scan.fetch-size' = '500',
        'scan.auto-commit' = 'false'
      )""")

    streamTableEnv.executeSql("""
      CREATE TABLE sink (
        id STRING PRIMARY KEY NOT ENFORCED,
        attr STRING,
        attr_mapped STRING
      ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:postgresql://…,
        'table-name' = 'sink',
        'username' = 'username',
        'password' = 'password',
        'scan.fetch-size' = '500',
        'scan.auto-commit' = 'false'
      )""")

    val view =
      streamTableEnv.sqlQuery("""
      SELECT
        COALESCE(t1.id1, t2.id2) AS id,
        COALESCE(t2.attr, t1.attr) AS operator,
        COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped
      FROM table1 t1
      FULL JOIN (
        SELECT
          id2,
          FIRST_VALUE(attr) AS attr
        FROM table2
        GROUP BY id2
      ) t2
       ON (t1.id1 = t2.id2)
      LEFT JOIN table3 t3
        ON (COALESCE(t2.attr, t1.attr) = t3.attr)""")
    streamTableEnv.createTemporaryView("view", view)

    val statementSet = streamTableEnv.createStatementSet()
    statementSet.addInsertSql("""
      INSERT INTO sink SELECT * FROM view
    """)

    statementSet.execute().await()
  }
}


Reply via email to