Hey all,

I've got a Flink 1.10 Streaming SQL job using the Blink Planner that is
reading from a few CSV files and joins some records across them into a
couple of data streams (yes, this could be a batch job won't get into why
we chose streams unless it's relevant). These joins are producing some
duplicate records, one with the joined field present and one with the
joined field as `null`, though this happens only ~25% of the time. Reading
the docs on joins[1], I thought this could be caused by too strict Idle
State Retention[2], so I increased that to min, max (15min, 24h) but that
doesn't seem to have an effect, and the problem still occurs when testing
on a subset of data that finishes processing in under a minute.

The query roughly looks like:

table_1 has fields a, b
table_2 has fields b, c

SELECT table_1.a, table_1.b, table_1.c
FROM table_1
LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;

Correct result:
Record(a = "data a 1", b = "data b 1", c = "data c 1")
Record(a = "data a 2", b = "data b 2", c = "data c 2")

Results seem to be anywhere between all possible dups and the correct
result.

Record(a = "data a 1", b = "data b 1", c = "data c 1")
Record(a = "data a 1", b = null, c = "data c 1")
Record(a = "data a 2", b = "data b 2", c = "data c 2")
Record(a = "data a 2", b = null, c = "data c 2")

The CSV files are registered as Flink Tables with the following:

tableEnv.connect(
    new FileSystem()
        .path(path)
)
    .withFormat(
        new Csv()
            .quoteCharacter('"')
            .ignoreParseErrors()
    )
    .withSchema(schema)
    .inAppendMode()
    .createTemporaryTable(tableName);


I'm creating my table environment like so:

EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
    .useBlinkPlanner()
    .build();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
tableEnvSettings);

TableConfig tConfig = tEnv.getConfig();
tConfig.setIdleStateRetentionTime(Time.minutes(15), Time.hours(24));


Is there something I'm misconfiguring or have misunderstood the docs?

Thanks,
Austin

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time

Reply via email to