Hey Arvid,

Yes, I was able to self-answer this one. Was just confused on the
non-deterministic behavior of the FULL OUTER join statement. Thinking
through it and took a harder read through the Dynamic Tables doc section[1]
where "Result Updating" is hinted at, and the behavior makes total sense in
a streaming env.

Thanks,
Austin

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/dynamic_tables.html

On Mon, Aug 31, 2020 at 5:16 AM Arvid Heise <ar...@ververica.com> wrote:

> Hi Austin,
>
> Do I assume correctly, that you self-answered your question? If not, could
> you please update your current progress?
>
> Best,
>
> Arvid
>
> On Thu, Aug 27, 2020 at 11:41 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Ah, I think the "Result Updating" is what got me -- INNER joins do the
>> job!
>>
>> On Thu, Aug 27, 2020 at 3:38 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> oops, the example query should actually be:
>>>
>>> SELECT table_1.a, table_1.b, table_2.c
>>> FROM table_1
>>> LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;
>>>
>>> and duplicate results should actually be:
>>>
>>> Record(a = "data a 1", b = "data b 1", c = "data c 1")
>>> Record(a = "data a 1", b = "data b 1", c = null)
>>> Record(a = "data a 2", b = "data b 2", c = "data c 2")
>>> Record(a = "data a 2", b = "data b 2", c = null)
>>>
>>> On Thu, Aug 27, 2020 at 3:34 PM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
>>>> Hey all,
>>>>
>>>> I've got a Flink 1.10 Streaming SQL job using the Blink Planner that is
>>>> reading from a few CSV files and joins some records across them into a
>>>> couple of data streams (yes, this could be a batch job won't get into why
>>>> we chose streams unless it's relevant). These joins are producing some
>>>> duplicate records, one with the joined field present and one with the
>>>> joined field as `null`, though this happens only ~25% of the time. Reading
>>>> the docs on joins[1], I thought this could be caused by too strict Idle
>>>> State Retention[2], so I increased that to min, max (15min, 24h) but that
>>>> doesn't seem to have an effect, and the problem still occurs when testing
>>>> on a subset of data that finishes processing in under a minute.
>>>>
>>>> The query roughly looks like:
>>>>
>>>> table_1 has fields a, b
>>>> table_2 has fields b, c
>>>>
>>>> SELECT table_1.a, table_1.b, table_1.c
>>>> FROM table_1
>>>> LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;
>>>>
>>>> Correct result:
>>>> Record(a = "data a 1", b = "data b 1", c = "data c 1")
>>>> Record(a = "data a 2", b = "data b 2", c = "data c 2")
>>>>
>>>> Results seem to be anywhere between all possible dups and the correct
>>>> result.
>>>>
>>>> Record(a = "data a 1", b = "data b 1", c = "data c 1")
>>>> Record(a = "data a 1", b = null, c = "data c 1")
>>>> Record(a = "data a 2", b = "data b 2", c = "data c 2")
>>>> Record(a = "data a 2", b = null, c = "data c 2")
>>>>
>>>> The CSV files are registered as Flink Tables with the following:
>>>>
>>>> tableEnv.connect(
>>>>     new FileSystem()
>>>>         .path(path)
>>>> )
>>>>     .withFormat(
>>>>         new Csv()
>>>>             .quoteCharacter('"')
>>>>             .ignoreParseErrors()
>>>>     )
>>>>     .withSchema(schema)
>>>>     .inAppendMode()
>>>>     .createTemporaryTable(tableName);
>>>>
>>>>
>>>> I'm creating my table environment like so:
>>>>
>>>> EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
>>>>     .useBlinkPlanner()
>>>>     .build();
>>>>
>>>> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
>>>> tableEnvSettings);
>>>>
>>>> TableConfig tConfig = tEnv.getConfig();
>>>> tConfig.setIdleStateRetentionTime(Time.minutes(15), Time.hours(24));
>>>>
>>>>
>>>> Is there something I'm misconfiguring or have misunderstood the docs?
>>>>
>>>> Thanks,
>>>> Austin
>>>>
>>>> [1]:
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins
>>>> [2]:
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time
>>>>
>>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Reply via email to