Ah, I think the "Result Updating" is what got me -- INNER joins do the job!

On Thu, Aug 27, 2020 at 3:38 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> oops, the example query should actually be:
>
> SELECT table_1.a, table_1.b, table_2.c
> FROM table_1
> LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;
>
> and duplicate results should actually be:
>
> Record(a = "data a 1", b = "data b 1", c = "data c 1")
> Record(a = "data a 1", b = "data b 1", c = null)
> Record(a = "data a 2", b = "data b 2", c = "data c 2")
> Record(a = "data a 2", b = "data b 2", c = null)
>
> On Thu, Aug 27, 2020 at 3:34 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hey all,
>>
>> I've got a Flink 1.10 Streaming SQL job using the Blink Planner that is
>> reading from a few CSV files and joins some records across them into a
>> couple of data streams (yes, this could be a batch job won't get into why
>> we chose streams unless it's relevant). These joins are producing some
>> duplicate records, one with the joined field present and one with the
>> joined field as `null`, though this happens only ~25% of the time. Reading
>> the docs on joins[1], I thought this could be caused by too strict Idle
>> State Retention[2], so I increased that to min, max (15min, 24h) but that
>> doesn't seem to have an effect, and the problem still occurs when testing
>> on a subset of data that finishes processing in under a minute.
>>
>> The query roughly looks like:
>>
>> table_1 has fields a, b
>> table_2 has fields b, c
>>
>> SELECT table_1.a, table_1.b, table_1.c
>> FROM table_1
>> LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;
>>
>> Correct result:
>> Record(a = "data a 1", b = "data b 1", c = "data c 1")
>> Record(a = "data a 2", b = "data b 2", c = "data c 2")
>>
>> Results seem to be anywhere between all possible dups and the correct
>> result.
>>
>> Record(a = "data a 1", b = "data b 1", c = "data c 1")
>> Record(a = "data a 1", b = null, c = "data c 1")
>> Record(a = "data a 2", b = "data b 2", c = "data c 2")
>> Record(a = "data a 2", b = null, c = "data c 2")
>>
>> The CSV files are registered as Flink Tables with the following:
>>
>> tableEnv.connect(
>>     new FileSystem()
>>         .path(path)
>> )
>>     .withFormat(
>>         new Csv()
>>             .quoteCharacter('"')
>>             .ignoreParseErrors()
>>     )
>>     .withSchema(schema)
>>     .inAppendMode()
>>     .createTemporaryTable(tableName);
>>
>>
>> I'm creating my table environment like so:
>>
>> EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
>>     .useBlinkPlanner()
>>     .build();
>>
>> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
>> tableEnvSettings);
>>
>> TableConfig tConfig = tEnv.getConfig();
>> tConfig.setIdleStateRetentionTime(Time.minutes(15), Time.hours(24));
>>
>>
>> Is there something I'm misconfiguring or have misunderstood the docs?
>>
>> Thanks,
>> Austin
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins
>> [2]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time
>>
>

Reply via email to