Re: Flink SQL Streaming Join Creates Duplicates

2020-08-31 Thread Austin Cawley-Edwards
Hey Arvid,

Yes, I was able to self-answer this one. Was just confused on the
non-deterministic behavior of the FULL OUTER join statement. Thinking
through it and took a harder read through the Dynamic Tables doc section[1]
where "Result Updating" is hinted at, and the behavior makes total sense in
a streaming env.

Thanks,
Austin

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/dynamic_tables.html

On Mon, Aug 31, 2020 at 5:16 AM Arvid Heise  wrote:

> Hi Austin,
>
> Do I assume correctly, that you self-answered your question? If not, could
> you please update your current progress?
>
> Best,
>
> Arvid
>
> On Thu, Aug 27, 2020 at 11:41 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Ah, I think the "Result Updating" is what got me -- INNER joins do the
>> job!
>>
>> On Thu, Aug 27, 2020 at 3:38 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> oops, the example query should actually be:
>>>
>>> SELECT table_1.a, table_1.b, table_2.c
>>> FROM table_1
>>> LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;
>>>
>>> and duplicate results should actually be:
>>>
>>> Record(a = "data a 1", b = "data b 1", c = "data c 1")
>>> Record(a = "data a 1", b = "data b 1", c = null)
>>> Record(a = "data a 2", b = "data b 2", c = "data c 2")
>>> Record(a = "data a 2", b = "data b 2", c = null)
>>>
>>> On Thu, Aug 27, 2020 at 3:34 PM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
 Hey all,

 I've got a Flink 1.10 Streaming SQL job using the Blink Planner that is
 reading from a few CSV files and joins some records across them into a
 couple of data streams (yes, this could be a batch job won't get into why
 we chose streams unless it's relevant). These joins are producing some
 duplicate records, one with the joined field present and one with the
 joined field as `null`, though this happens only ~25% of the time. Reading
 the docs on joins[1], I thought this could be caused by too strict Idle
 State Retention[2], so I increased that to min, max (15min, 24h) but that
 doesn't seem to have an effect, and the problem still occurs when testing
 on a subset of data that finishes processing in under a minute.

 The query roughly looks like:

 table_1 has fields a, b
 table_2 has fields b, c

 SELECT table_1.a, table_1.b, table_1.c
 FROM table_1
 LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;

 Correct result:
 Record(a = "data a 1", b = "data b 1", c = "data c 1")
 Record(a = "data a 2", b = "data b 2", c = "data c 2")

 Results seem to be anywhere between all possible dups and the correct
 result.

 Record(a = "data a 1", b = "data b 1", c = "data c 1")
 Record(a = "data a 1", b = null, c = "data c 1")
 Record(a = "data a 2", b = "data b 2", c = "data c 2")
 Record(a = "data a 2", b = null, c = "data c 2")

 The CSV files are registered as Flink Tables with the following:

 tableEnv.connect(
 new FileSystem()
 .path(path)
 )
 .withFormat(
 new Csv()
 .quoteCharacter('"')
 .ignoreParseErrors()
 )
 .withSchema(schema)
 .inAppendMode()
 .createTemporaryTable(tableName);


 I'm creating my table environment like so:

 EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
 .useBlinkPlanner()
 .build();

 StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
 tableEnvSettings);

 TableConfig tConfig = tEnv.getConfig();
 tConfig.setIdleStateRetentionTime(Time.minutes(15), Time.hours(24));


 Is there something I'm misconfiguring or have misunderstood the docs?

 Thanks,
 Austin

 [1]:
 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins
 [2]:
 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time

>>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: Flink SQL Streaming Join Creates Duplicates

2020-08-31 Thread Arvid Heise
Hi Austin,

Do I assume correctly, that you self-answered your question? If not, could
you please update your current progress?

Best,

Arvid

On Thu, Aug 27, 2020 at 11:41 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Ah, I think the "Result Updating" is what got me -- INNER joins do the
> job!
>
> On Thu, Aug 27, 2020 at 3:38 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> oops, the example query should actually be:
>>
>> SELECT table_1.a, table_1.b, table_2.c
>> FROM table_1
>> LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;
>>
>> and duplicate results should actually be:
>>
>> Record(a = "data a 1", b = "data b 1", c = "data c 1")
>> Record(a = "data a 1", b = "data b 1", c = null)
>> Record(a = "data a 2", b = "data b 2", c = "data c 2")
>> Record(a = "data a 2", b = "data b 2", c = null)
>>
>> On Thu, Aug 27, 2020 at 3:34 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hey all,
>>>
>>> I've got a Flink 1.10 Streaming SQL job using the Blink Planner that is
>>> reading from a few CSV files and joins some records across them into a
>>> couple of data streams (yes, this could be a batch job won't get into why
>>> we chose streams unless it's relevant). These joins are producing some
>>> duplicate records, one with the joined field present and one with the
>>> joined field as `null`, though this happens only ~25% of the time. Reading
>>> the docs on joins[1], I thought this could be caused by too strict Idle
>>> State Retention[2], so I increased that to min, max (15min, 24h) but that
>>> doesn't seem to have an effect, and the problem still occurs when testing
>>> on a subset of data that finishes processing in under a minute.
>>>
>>> The query roughly looks like:
>>>
>>> table_1 has fields a, b
>>> table_2 has fields b, c
>>>
>>> SELECT table_1.a, table_1.b, table_1.c
>>> FROM table_1
>>> LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;
>>>
>>> Correct result:
>>> Record(a = "data a 1", b = "data b 1", c = "data c 1")
>>> Record(a = "data a 2", b = "data b 2", c = "data c 2")
>>>
>>> Results seem to be anywhere between all possible dups and the correct
>>> result.
>>>
>>> Record(a = "data a 1", b = "data b 1", c = "data c 1")
>>> Record(a = "data a 1", b = null, c = "data c 1")
>>> Record(a = "data a 2", b = "data b 2", c = "data c 2")
>>> Record(a = "data a 2", b = null, c = "data c 2")
>>>
>>> The CSV files are registered as Flink Tables with the following:
>>>
>>> tableEnv.connect(
>>> new FileSystem()
>>> .path(path)
>>> )
>>> .withFormat(
>>> new Csv()
>>> .quoteCharacter('"')
>>> .ignoreParseErrors()
>>> )
>>> .withSchema(schema)
>>> .inAppendMode()
>>> .createTemporaryTable(tableName);
>>>
>>>
>>> I'm creating my table environment like so:
>>>
>>> EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
>>> .useBlinkPlanner()
>>> .build();
>>>
>>> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
>>> tableEnvSettings);
>>>
>>> TableConfig tConfig = tEnv.getConfig();
>>> tConfig.setIdleStateRetentionTime(Time.minutes(15), Time.hours(24));
>>>
>>>
>>> Is there something I'm misconfiguring or have misunderstood the docs?
>>>
>>> Thanks,
>>> Austin
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins
>>> [2]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time
>>>
>>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Flink SQL Streaming Join Creates Duplicates

2020-08-27 Thread Austin Cawley-Edwards
Ah, I think the "Result Updating" is what got me -- INNER joins do the job!

On Thu, Aug 27, 2020 at 3:38 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> oops, the example query should actually be:
>
> SELECT table_1.a, table_1.b, table_2.c
> FROM table_1
> LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;
>
> and duplicate results should actually be:
>
> Record(a = "data a 1", b = "data b 1", c = "data c 1")
> Record(a = "data a 1", b = "data b 1", c = null)
> Record(a = "data a 2", b = "data b 2", c = "data c 2")
> Record(a = "data a 2", b = "data b 2", c = null)
>
> On Thu, Aug 27, 2020 at 3:34 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hey all,
>>
>> I've got a Flink 1.10 Streaming SQL job using the Blink Planner that is
>> reading from a few CSV files and joins some records across them into a
>> couple of data streams (yes, this could be a batch job won't get into why
>> we chose streams unless it's relevant). These joins are producing some
>> duplicate records, one with the joined field present and one with the
>> joined field as `null`, though this happens only ~25% of the time. Reading
>> the docs on joins[1], I thought this could be caused by too strict Idle
>> State Retention[2], so I increased that to min, max (15min, 24h) but that
>> doesn't seem to have an effect, and the problem still occurs when testing
>> on a subset of data that finishes processing in under a minute.
>>
>> The query roughly looks like:
>>
>> table_1 has fields a, b
>> table_2 has fields b, c
>>
>> SELECT table_1.a, table_1.b, table_1.c
>> FROM table_1
>> LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;
>>
>> Correct result:
>> Record(a = "data a 1", b = "data b 1", c = "data c 1")
>> Record(a = "data a 2", b = "data b 2", c = "data c 2")
>>
>> Results seem to be anywhere between all possible dups and the correct
>> result.
>>
>> Record(a = "data a 1", b = "data b 1", c = "data c 1")
>> Record(a = "data a 1", b = null, c = "data c 1")
>> Record(a = "data a 2", b = "data b 2", c = "data c 2")
>> Record(a = "data a 2", b = null, c = "data c 2")
>>
>> The CSV files are registered as Flink Tables with the following:
>>
>> tableEnv.connect(
>> new FileSystem()
>> .path(path)
>> )
>> .withFormat(
>> new Csv()
>> .quoteCharacter('"')
>> .ignoreParseErrors()
>> )
>> .withSchema(schema)
>> .inAppendMode()
>> .createTemporaryTable(tableName);
>>
>>
>> I'm creating my table environment like so:
>>
>> EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
>> .useBlinkPlanner()
>> .build();
>>
>> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
>> tableEnvSettings);
>>
>> TableConfig tConfig = tEnv.getConfig();
>> tConfig.setIdleStateRetentionTime(Time.minutes(15), Time.hours(24));
>>
>>
>> Is there something I'm misconfiguring or have misunderstood the docs?
>>
>> Thanks,
>> Austin
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins
>> [2]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time
>>
>


Flink SQL Streaming Join Creates Duplicates

2020-08-27 Thread Austin Cawley-Edwards
Hey all,

I've got a Flink 1.10 Streaming SQL job using the Blink Planner that is
reading from a few CSV files and joins some records across them into a
couple of data streams (yes, this could be a batch job won't get into why
we chose streams unless it's relevant). These joins are producing some
duplicate records, one with the joined field present and one with the
joined field as `null`, though this happens only ~25% of the time. Reading
the docs on joins[1], I thought this could be caused by too strict Idle
State Retention[2], so I increased that to min, max (15min, 24h) but that
doesn't seem to have an effect, and the problem still occurs when testing
on a subset of data that finishes processing in under a minute.

The query roughly looks like:

table_1 has fields a, b
table_2 has fields b, c

SELECT table_1.a, table_1.b, table_1.c
FROM table_1
LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;

Correct result:
Record(a = "data a 1", b = "data b 1", c = "data c 1")
Record(a = "data a 2", b = "data b 2", c = "data c 2")

Results seem to be anywhere between all possible dups and the correct
result.

Record(a = "data a 1", b = "data b 1", c = "data c 1")
Record(a = "data a 1", b = null, c = "data c 1")
Record(a = "data a 2", b = "data b 2", c = "data c 2")
Record(a = "data a 2", b = null, c = "data c 2")

The CSV files are registered as Flink Tables with the following:

tableEnv.connect(
new FileSystem()
.path(path)
)
.withFormat(
new Csv()
.quoteCharacter('"')
.ignoreParseErrors()
)
.withSchema(schema)
.inAppendMode()
.createTemporaryTable(tableName);


I'm creating my table environment like so:

EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.build();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
tableEnvSettings);

TableConfig tConfig = tEnv.getConfig();
tConfig.setIdleStateRetentionTime(Time.minutes(15), Time.hours(24));


Is there something I'm misconfiguring or have misunderstood the docs?

Thanks,
Austin

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time