Re: Nondeterministic results with SQL job when parallelism is > 1

2021-04-16 Thread Dylan Forciea
Jark,

Thanks for the heads up! I didn’t see this behavior when running in batch mode 
with parallelism turned on. Is it safe to do this kind of join in batch mode 
right now, or am I just getting lucky?

Dylan

From: Jark Wu 
Date: Friday, April 16, 2021 at 5:10 AM
To: Dylan Forciea 
Cc: Timo Walther , Piotr Nowojski , 
"user@flink.apache.org" 
Subject: Re: Nondeterministic results with SQL job when parallelism is > 1

HI Dylan,

I think this has the same reason as 
https://issues.apache.org/jira/browse/FLINK-20374.
The root cause is that changelogs are shuffled by `attr` at second join,
and thus records with the same `id` will be shuffled to different join tasks 
(also different sink tasks).
So the data arrived at sinks are not ordered on the sink primary key.

We may need something like primary key ordering mechanism in the whole planner 
to fix this.

Best,
Jark

On Thu, 15 Apr 2021 at 01:33, Dylan Forciea 
mailto:dy...@oseberg.io>> wrote:
On a side note - I changed to use the batch mode per your suggestion Timo, and 
my job ran much faster and with deterministic counts with parallelism turned 
on. So I'll probably utilize that for now. However, it would still be nice to 
dig down into why streaming isn't working in case I need that in the future.

Dylan

On 4/14/21, 10:27 AM, "Dylan Forciea" 
mailto:dy...@oseberg.io>> wrote:

Timo,

Here is the plan (hopefully I properly cleansed it of company proprietary 
info without garbling it)

Dylan

== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.sink], fields=[id, 
attr, attr_mapped])
+- LogicalProject(id=[CASE(IS NOT NULL($0), $0, $2)], attr=[CASE(IS NOT 
NULL($3), $3, $1)], attr_mapped=[CASE(IS NOT NULL($6), $6, IS NOT NULL($3), $3, 
$1)])
   +- LogicalJoin(condition=[=($4, $5)], joinType=[left])
  :- LogicalProject(id1=[$0], attr=[$1], id2=[$2], attr0=[$3], 
$f4=[CASE(IS NOT NULL($3), $3, $1)])
  :  +- LogicalJoin(condition=[=($0, $2)], joinType=[full])
  : :- LogicalTableScan(table=[[default_catalog, default_database, 
table1]])
  : +- LogicalAggregate(group=[{0}], attr=[MAX($1)])
  :+- LogicalProject(id2=[$1], attr=[$0])
  :   +- LogicalTableScan(table=[[default_catalog, 
default_database, table2]])
  +- LogicalTableScan(table=[[default_catalog, default_database, 
table3]])

== Optimized Logical Plan ==
Sink(table=[default_catalog.default_database.sink], fields=[id, attr, 
attr_mapped], changelogMode=[NONE])
+- Calc(select=[CASE(IS NOT NULL(id1), id1, id2) AS id, CASE(IS NOT 
NULL(attr0), attr0, attr) AS attr, CASE(IS NOT NULL(attr_mapped), attr_mapped, 
IS NOT NULL(attr0), attr0, attr) AS attr_mapped], changelogMode=[I,UB,UA,D])
   +- Join(joinType=[LeftOuterJoin], where=[=($f4, attr)], select=[id1, 
attr, id2, attr0, $f4, attr, attr_mapped], leftInputSpec=[HasUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
  :- Exchange(distribution=[hash[$f4]], changelogMode=[I,UB,UA,D])
  :  +- Calc(select=[id1, attr, id2, attr0, CASE(IS NOT NULL(attr0), 
attr0, attr) AS $f4], changelogMode=[I,UB,UA,D])
  : +- Join(joinType=[FullOuterJoin], where=[=(id1, id2)], 
select=[id1, attr, id2, attr0], leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
  ::- Exchange(distribution=[hash[id1]], changelogMode=[I])
  ::  +- TableSourceScan(table=[[default_catalog, 
default_database, table1]], fields=[id1, attr], changelogMode=[I])
  :+- Exchange(distribution=[hash[id2]], 
changelogMode=[I,UB,UA])
  :   +- GroupAggregate(groupBy=[id2], select=[id2, MAX(attr) 
AS attr], changelogMode=[I,UB,UA])
  :  +- Exchange(distribution=[hash[id2]], 
changelogMode=[I])
  : +- TableSourceScan(table=[[default_catalog, 
default_database, table2]], fields=[attr, id2], changelogMode=[I])
  +- Exchange(distribution=[hash[attr]], changelogMode=[I])
 +- TableSourceScan(table=[[default_catalog, default_database, 
table3]], fields=[attr, attr_mapped], changelogMode=[I])

== Physical Execution Plan ==
Stage 1 : Data Source
content : Source: TableSourceScan(table=[[default_catalog, 
default_database, table1]], fields=[id1, attr])

Stage 3 : Data Source
content : Source: TableSourceScan(table=[[default_catalog, 
default_database, table2]], fields=[attr, id2])

Stage 5 : Attr
content : GroupAggregate(groupBy=[id2], select=[id2, MAX(attr) 
AS attr])
ship_strategy : HASH

Stage 7 : Attr
content : Join(joinType=[FullOuterJoin], where=[(id1 = 
id2)], select=[id1, attr, id2, attr0], 
leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])
  

Re: Nondeterministic results with SQL job when parallelism is > 1

2021-04-14 Thread Dylan Forciea
On a side note - I changed to use the batch mode per your suggestion Timo, and 
my job ran much faster and with deterministic counts with parallelism turned 
on. So I'll probably utilize that for now. However, it would still be nice to 
dig down into why streaming isn't working in case I need that in the future.

Dylan

On 4/14/21, 10:27 AM, "Dylan Forciea"  wrote:

Timo,

Here is the plan (hopefully I properly cleansed it of company proprietary 
info without garbling it)

Dylan

== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.sink], fields=[id, 
attr, attr_mapped])
+- LogicalProject(id=[CASE(IS NOT NULL($0), $0, $2)], attr=[CASE(IS NOT 
NULL($3), $3, $1)], attr_mapped=[CASE(IS NOT NULL($6), $6, IS NOT NULL($3), $3, 
$1)])
   +- LogicalJoin(condition=[=($4, $5)], joinType=[left])
  :- LogicalProject(id1=[$0], attr=[$1], id2=[$2], attr0=[$3], 
$f4=[CASE(IS NOT NULL($3), $3, $1)])
  :  +- LogicalJoin(condition=[=($0, $2)], joinType=[full])
  : :- LogicalTableScan(table=[[default_catalog, default_database, 
table1]])
  : +- LogicalAggregate(group=[{0}], attr=[MAX($1)])
  :+- LogicalProject(id2=[$1], attr=[$0])
  :   +- LogicalTableScan(table=[[default_catalog, 
default_database, table2]])
  +- LogicalTableScan(table=[[default_catalog, default_database, 
table3]])

== Optimized Logical Plan ==
Sink(table=[default_catalog.default_database.sink], fields=[id, attr, 
attr_mapped], changelogMode=[NONE])
+- Calc(select=[CASE(IS NOT NULL(id1), id1, id2) AS id, CASE(IS NOT 
NULL(attr0), attr0, attr) AS attr, CASE(IS NOT NULL(attr_mapped), attr_mapped, 
IS NOT NULL(attr0), attr0, attr) AS attr_mapped], changelogMode=[I,UB,UA,D])
   +- Join(joinType=[LeftOuterJoin], where=[=($f4, attr)], select=[id1, 
attr, id2, attr0, $f4, attr, attr_mapped], leftInputSpec=[HasUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
  :- Exchange(distribution=[hash[$f4]], changelogMode=[I,UB,UA,D])
  :  +- Calc(select=[id1, attr, id2, attr0, CASE(IS NOT NULL(attr0), 
attr0, attr) AS $f4], changelogMode=[I,UB,UA,D])
  : +- Join(joinType=[FullOuterJoin], where=[=(id1, id2)], 
select=[id1, attr, id2, attr0], leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
  ::- Exchange(distribution=[hash[id1]], changelogMode=[I])
  ::  +- TableSourceScan(table=[[default_catalog, 
default_database, table1]], fields=[id1, attr], changelogMode=[I])
  :+- Exchange(distribution=[hash[id2]], 
changelogMode=[I,UB,UA])
  :   +- GroupAggregate(groupBy=[id2], select=[id2, MAX(attr) 
AS attr], changelogMode=[I,UB,UA])
  :  +- Exchange(distribution=[hash[id2]], 
changelogMode=[I])
  : +- TableSourceScan(table=[[default_catalog, 
default_database, table2]], fields=[attr, id2], changelogMode=[I])
  +- Exchange(distribution=[hash[attr]], changelogMode=[I])
 +- TableSourceScan(table=[[default_catalog, default_database, 
table3]], fields=[attr, attr_mapped], changelogMode=[I])

== Physical Execution Plan ==
Stage 1 : Data Source
content : Source: TableSourceScan(table=[[default_catalog, 
default_database, table1]], fields=[id1, attr])

Stage 3 : Data Source
content : Source: TableSourceScan(table=[[default_catalog, 
default_database, table2]], fields=[attr, id2])

Stage 5 : Attr
content : GroupAggregate(groupBy=[id2], select=[id2, MAX(attr) 
AS attr])
ship_strategy : HASH

Stage 7 : Attr
content : Join(joinType=[FullOuterJoin], where=[(id1 = 
id2)], select=[id1, attr, id2, attr0], 
leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])
ship_strategy : HASH

Stage 8 : Attr
content : Calc(select=[id1, attr, id2, attr0, 
(attr0 IS NOT NULL CASE attr0 CASE attr) AS $f4])
ship_strategy : FORWARD

Stage 10 : Data Source
content : Source: TableSourceScan(table=[[default_catalog, 
default_database, table3]], fields=[attr, attr_mapped])

Stage 12 : Attr
content : Join(joinType=[LeftOuterJoin], where=[($f4 = attr)], 
select=[id1, attr, id2, attr0, $f4, attr, attr_mapped], 
leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
ship_strategy : HASH

Stage 13 : Attr
content : Calc(select=[(id1 IS NOT NULL CASE id1 CASE 
id2) AS id, (attr0 IS NOT NULL CASE attr0 CASE attr) AS attr, (attr_mapped IS 
NOT NULL CASE attr_mapped CASE attr0 IS NOT NULL CASE attr0 CASE attr) AS 
attr_mapped])
ship_strategy

Re: Nondeterministic results with SQL job when parallelism is > 1

2021-04-14 Thread Dylan Forciea
:47, Dylan Forciea wrote:
> Piotrek,
> 
> I am looking at the count of records present in the sink table in 
> Postgres after the entire job completes, not the number of 
> inserts/retracts. I can see as the job runs that records are added and 
> removed from the “sink” table. With parallelism set to 1, it always 
> comes out to the same number (which is consistent with the number of ids 
> in the source tables “table1” and “table2”), at about 491k records in 
> table “sink” when the job is complete. With the parallelism set to 16, 
> the “sink” table will have somewhere around 360k records +/- 20k when 
> the job is complete. I truncate the “sink” table before I run the job, 
> and this is a test environment where the source databases are static.
> 
> I removed my line for setting to Batch mode per Timo’s suggestion, and 
> am still running with MAX which should have deterministic output.
> 
> Dylan
> 
> *From: *Piotr Nowojski 
    > *Date: *Wednesday, April 14, 2021 at 9:38 AM
> *To: *Dylan Forciea 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: Nondeterministic results with SQL job when parallelism is 
> 1
> 
> Hi Dylan,
> 
> But if you are running your query in Streaming mode, aren't you counting 
> retractions from the FULL JOIN? AFAIK in Streaming mode in FULL JOIN, 
> when the first record comes in it will be immediately emitted with NULLs 
> (not matched, as the other table is empty). Later if a matching record 
> is received from the second table, the previous result will be retracted 
> and the new one, updated, will be re-emitted. Maybe this is what you are 
> observing in the varying output?
> 
> Maybe you could try to analyse how the results differ between different 
> runs?
> 
> Best,
> 
> Piotrek
> 
> śr., 14 kwi 2021 o 16:22 Dylan Forciea  <mailto:dy...@oseberg.io>> napisał(a):
> 
> I replaced the FIRST_VALUE with MAX to ensure that the results
> should be identical even in their content, and my problem still
> remains – I end up with a nondeterministic count of records being
> emitted into the sink when the parallelism is over 1, and that count
> is about 20-25% short (and not consistent) of what comes out
> consistently when parallelism is set to 1.
> 
> Dylan
> 
> *From: *Dylan Forciea mailto:dy...@oseberg.io>>
> *Date: *Wednesday, April 14, 2021 at 9:08 AM
> *To: *Piotr Nowojski  <mailto:pnowoj...@apache.org>>
> *Cc: *"user@flink.apache.org <mailto:user@flink.apache.org>"
> mailto:user@flink.apache.org>>
> *Subject: *Re: Nondeterministic results with SQL job when
> parallelism is > 1
> 
> Pitorek,
> 
> I was actually originally using a group function that WAS
> deterministic (but was a custom UDF I made), but chose something
> here built in. By non-deterministic, I mean that the number of
> records coming out is not consistent. Since the FIRST_VALUE here is
> on an attribute that is not part of the key, that shouldn’t affect
> the number of records coming out I wouldn’t think.
> 
> Dylan
> 
> *From: *Piotr Nowojski  <mailto:pnowoj...@apache.org>>
> *Date: *Wednesday, April 14, 2021 at 9:06 AM
> *To: *Dylan Forciea mailto:dy...@oseberg.io>>
> *Cc: *"user@flink.apache.org <mailto:user@flink.apache.org>"
> mailto:user@flink.apache.org>>
> *Subject: *Re: Nondeterministic results with SQL job when
> parallelism is > 1
> 
> Hi,
> 
> Yes, it looks like your query is non deterministic because of
> `FIRST_VALUE` used inside `GROUP BY`. If you have many different
> parallel sources, each time you run your query your first value
> might be different. If that's the case, you could try to confirm it
> with even smaller query:
> 
> SELECT
>id2,
>FIRST_VALUE(attr) AS attr
>  FROM table2
>  GROUP BY id2
> 
> Best,
> 
> Piotrek
> 
> śr., 14 kwi 2021 o 14:45 Dylan Forciea  <mailto:dy...@oseberg.io>> napisał(a):
> 
> I am running Flink 1.12.2, and I was trying to up the
> parallelism of my Flink SQL job to see what happened. However,
> once I

Re: Nondeterministic results with SQL job when parallelism is > 1

2021-04-14 Thread Dylan Forciea
Piotrek,

I am looking at the count of records present in the sink table in Postgres 
after the entire job completes, not the number of inserts/retracts. I can see 
as the job runs that records are added and removed from the “sink” table. With 
parallelism set to 1, it always comes out to the same number (which is 
consistent with the number of ids in the source tables “table1” and “table2”), 
at about 491k records in table “sink” when the job is complete. With the 
parallelism set to 16, the “sink” table will have somewhere around 360k records 
+/- 20k when the job is complete. I truncate the “sink” table before I run the 
job, and this is a test environment where the source databases are static.

I removed my line for setting to Batch mode per Timo’s suggestion, and am still 
running with MAX which should have deterministic output.

Dylan

From: Piotr Nowojski 
Date: Wednesday, April 14, 2021 at 9:38 AM
To: Dylan Forciea 
Cc: "user@flink.apache.org" 
Subject: Re: Nondeterministic results with SQL job when parallelism is > 1

Hi Dylan,

But if you are running your query in Streaming mode, aren't you counting 
retractions from the FULL JOIN? AFAIK in Streaming mode in FULL JOIN, when the 
first record comes in it will be immediately emitted with NULLs (not matched, 
as the other table is empty). Later if a matching record is received from the 
second table, the previous result will be retracted and the new one, updated, 
will be re-emitted. Maybe this is what you are observing in the varying output?

Maybe you could try to analyse how the results differ between different runs?

Best,
Piotrek

śr., 14 kwi 2021 o 16:22 Dylan Forciea 
mailto:dy...@oseberg.io>> napisał(a):
I replaced the FIRST_VALUE with MAX to ensure that the results should be 
identical even in their content, and my problem still remains – I end up with a 
nondeterministic count of records being emitted into the sink when the 
parallelism is over 1, and that count is about 20-25% short (and not 
consistent) of what comes out consistently when parallelism is set to 1.

Dylan

From: Dylan Forciea mailto:dy...@oseberg.io>>
Date: Wednesday, April 14, 2021 at 9:08 AM
To: Piotr Nowojski mailto:pnowoj...@apache.org>>
Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" 
mailto:user@flink.apache.org>>
Subject: Re: Nondeterministic results with SQL job when parallelism is > 1

Pitorek,

I was actually originally using a group function that WAS deterministic (but 
was a custom UDF I made), but chose something here built in. By 
non-deterministic, I mean that the number of records coming out is not 
consistent. Since the FIRST_VALUE here is on an attribute that is not part of 
the key, that shouldn’t affect the number of records coming out I wouldn’t 
think.

Dylan

From: Piotr Nowojski mailto:pnowoj...@apache.org>>
Date: Wednesday, April 14, 2021 at 9:06 AM
To: Dylan Forciea mailto:dy...@oseberg.io>>
Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" 
mailto:user@flink.apache.org>>
Subject: Re: Nondeterministic results with SQL job when parallelism is > 1

Hi,

Yes, it looks like your query is non deterministic because of `FIRST_VALUE` 
used inside `GROUP BY`. If you have many different parallel sources, each time 
you run your query your first value might be different. If that's the case, you 
could try to confirm it with even smaller query:

   SELECT
  id2,
  FIRST_VALUE(attr) AS attr
    FROM table2
GROUP BY id2

Best,
Piotrek

śr., 14 kwi 2021 o 14:45 Dylan Forciea 
mailto:dy...@oseberg.io>> napisał(a):
I am running Flink 1.12.2, and I was trying to up the parallelism of my Flink 
SQL job to see what happened. However, once I did that, my results became 
nondeterministic. This happens whether I set the 
table.exec.resource.default-parallelism config option or I set the default 
local parallelism to something higher than 1. I would end up with less records 
in the end, and each time I ran the output record count would come out 
differently.

I managed to distill an example, as pasted below (with attribute names changed 
to protect company proprietary info), that causes the issue. I feel like I 
managed to get it to happen with a LEFT JOIN rather than a FULL JOIN, but the 
distilled version wasn’t giving me wrong results with that. Maybe it has to do 
with joining to a table that was formed using a GROUP BY? Can somebody tell if 
I’m doing something that is known not to work, or if I have run across a bug?

Regards,
Dylan Forciea


object Job {
  def main(args: Array[String]): Unit = {
StreamExecutionEnvironment.setDefaultLocalParallelism(1)

val settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)

Re: Nondeterministic results with SQL job when parallelism is > 1

2021-04-14 Thread Dylan Forciea
I replaced the FIRST_VALUE with MAX to ensure that the results should be 
identical even in their content, and my problem still remains – I end up with a 
nondeterministic count of records being emitted into the sink when the 
parallelism is over 1, and that count is about 20-25% short (and not 
consistent) of what comes out consistently when parallelism is set to 1.

Dylan

From: Dylan Forciea 
Date: Wednesday, April 14, 2021 at 9:08 AM
To: Piotr Nowojski 
Cc: "user@flink.apache.org" 
Subject: Re: Nondeterministic results with SQL job when parallelism is > 1

Pitorek,

I was actually originally using a group function that WAS deterministic (but 
was a custom UDF I made), but chose something here built in. By 
non-deterministic, I mean that the number of records coming out is not 
consistent. Since the FIRST_VALUE here is on an attribute that is not part of 
the key, that shouldn’t affect the number of records coming out I wouldn’t 
think.

Dylan

From: Piotr Nowojski 
Date: Wednesday, April 14, 2021 at 9:06 AM
To: Dylan Forciea 
Cc: "user@flink.apache.org" 
Subject: Re: Nondeterministic results with SQL job when parallelism is > 1

Hi,

Yes, it looks like your query is non deterministic because of `FIRST_VALUE` 
used inside `GROUP BY`. If you have many different parallel sources, each time 
you run your query your first value might be different. If that's the case, you 
could try to confirm it with even smaller query:

   SELECT
  id2,
  FIRST_VALUE(attr) AS attr
FROM table2
GROUP BY id2

Best,
Piotrek

śr., 14 kwi 2021 o 14:45 Dylan Forciea 
mailto:dy...@oseberg.io>> napisał(a):
I am running Flink 1.12.2, and I was trying to up the parallelism of my Flink 
SQL job to see what happened. However, once I did that, my results became 
nondeterministic. This happens whether I set the 
table.exec.resource.default-parallelism config option or I set the default 
local parallelism to something higher than 1. I would end up with less records 
in the end, and each time I ran the output record count would come out 
differently.

I managed to distill an example, as pasted below (with attribute names changed 
to protect company proprietary info), that causes the issue. I feel like I 
managed to get it to happen with a LEFT JOIN rather than a FULL JOIN, but the 
distilled version wasn’t giving me wrong results with that. Maybe it has to do 
with joining to a table that was formed using a GROUP BY? Can somebody tell if 
I’m doing something that is known not to work, or if I have run across a bug?

Regards,
Dylan Forciea


object Job {
  def main(args: Array[String]): Unit = {
StreamExecutionEnvironment.setDefaultLocalParallelism(1)

val settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)

val configuration = streamTableEnv.getConfig().getConfiguration()
configuration.setInteger("table.exec.resource.default-parallelism", 16)

streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);

streamTableEnv.executeSql(
  """
  CREATE TABLE table1 (
id1 STRING PRIMARY KEY NOT ENFORCED,
attr STRING
  ) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…',
'table-name' = 'table1’,
'username' = 'username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
  )""")

streamTableEnv.executeSql(
  """
  CREATE TABLE table2 (
attr STRING,
id2 STRING
  ) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…',
'table-name' = 'table2',
'username' = 'username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
  )""")

streamTableEnv.executeSql(
  """
  CREATE TABLE table3 (
attr STRING PRIMARY KEY NOT ENFORCED,
attr_mapped STRING
  ) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…',
'table-name' = ‘table3',
'username' = ‘username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
  )""")

streamTableEnv.executeSql("""
  CREATE TABLE sink (
id STRING PRIMARY KEY NOT ENFORCED,
attr STRING,
attr_mapped STRING
  ) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…,
'table-name' = 'sink',
'username' = 'username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
  )""")

val view =
  streamTableEnv.sqlQuery("""
  S

Re: Nondeterministic results with SQL job when parallelism is > 1

2021-04-14 Thread Dylan Forciea
Pitorek,

I was actually originally using a group function that WAS deterministic (but 
was a custom UDF I made), but chose something here built in. By 
non-deterministic, I mean that the number of records coming out is not 
consistent. Since the FIRST_VALUE here is on an attribute that is not part of 
the key, that shouldn’t affect the number of records coming out I wouldn’t 
think.

Dylan

From: Piotr Nowojski 
Date: Wednesday, April 14, 2021 at 9:06 AM
To: Dylan Forciea 
Cc: "user@flink.apache.org" 
Subject: Re: Nondeterministic results with SQL job when parallelism is > 1

Hi,

Yes, it looks like your query is non deterministic because of `FIRST_VALUE` 
used inside `GROUP BY`. If you have many different parallel sources, each time 
you run your query your first value might be different. If that's the case, you 
could try to confirm it with even smaller query:

   SELECT
  id2,
  FIRST_VALUE(attr) AS attr
FROM table2
GROUP BY id2

Best,
Piotrek

śr., 14 kwi 2021 o 14:45 Dylan Forciea 
mailto:dy...@oseberg.io>> napisał(a):
I am running Flink 1.12.2, and I was trying to up the parallelism of my Flink 
SQL job to see what happened. However, once I did that, my results became 
nondeterministic. This happens whether I set the 
table.exec.resource.default-parallelism config option or I set the default 
local parallelism to something higher than 1. I would end up with less records 
in the end, and each time I ran the output record count would come out 
differently.

I managed to distill an example, as pasted below (with attribute names changed 
to protect company proprietary info), that causes the issue. I feel like I 
managed to get it to happen with a LEFT JOIN rather than a FULL JOIN, but the 
distilled version wasn’t giving me wrong results with that. Maybe it has to do 
with joining to a table that was formed using a GROUP BY? Can somebody tell if 
I’m doing something that is known not to work, or if I have run across a bug?

Regards,
Dylan Forciea


object Job {
  def main(args: Array[String]): Unit = {
StreamExecutionEnvironment.setDefaultLocalParallelism(1)

val settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)

val configuration = streamTableEnv.getConfig().getConfiguration()
configuration.setInteger("table.exec.resource.default-parallelism", 16)

streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);

streamTableEnv.executeSql(
  """
  CREATE TABLE table1 (
id1 STRING PRIMARY KEY NOT ENFORCED,
attr STRING
  ) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…',
'table-name' = 'table1’,
'username' = 'username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
  )""")

streamTableEnv.executeSql(
  """
  CREATE TABLE table2 (
attr STRING,
id2 STRING
  ) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…',
'table-name' = 'table2',
'username' = 'username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
  )""")

streamTableEnv.executeSql(
  """
  CREATE TABLE table3 (
attr STRING PRIMARY KEY NOT ENFORCED,
attr_mapped STRING
  ) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…',
'table-name' = ‘table3',
'username' = ‘username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
  )""")

streamTableEnv.executeSql("""
  CREATE TABLE sink (
id STRING PRIMARY KEY NOT ENFORCED,
attr STRING,
attr_mapped STRING
  ) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…,
'table-name' = 'sink',
'username' = 'username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
  )""")

val view =
  streamTableEnv.sqlQuery("""
  SELECT
COALESCE(t1.id1, t2.id2) AS id,
COALESCE(t2.attr, t1.attr) AS operator,
COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped
  FROM table1 t1
  FULL JOIN (
SELECT
  id2,
  FIRST_VALUE(attr) AS attr
FROM table2
GROUP BY id2
  ) t2
   ON (t1.id1 = t2.id2)
  LEFT JOIN table3 t3
ON (COALESCE(t2.attr, t1.attr) = t3.attr)""")
streamTableEnv.createTemporaryView("view", view)

val statementSet = streamTableEnv.createStatementSet()
statementSet.addInsertSql("""
  INSERT INTO sink SELECT * FROM view
""")

statementSet.execute().await()
  }
}




Nondeterministic results with SQL job when parallelism is > 1

2021-04-14 Thread Dylan Forciea
I am running Flink 1.12.2, and I was trying to up the parallelism of my Flink 
SQL job to see what happened. However, once I did that, my results became 
nondeterministic. This happens whether I set the 
table.exec.resource.default-parallelism config option or I set the default 
local parallelism to something higher than 1. I would end up with less records 
in the end, and each time I ran the output record count would come out 
differently.

I managed to distill an example, as pasted below (with attribute names changed 
to protect company proprietary info), that causes the issue. I feel like I 
managed to get it to happen with a LEFT JOIN rather than a FULL JOIN, but the 
distilled version wasn’t giving me wrong results with that. Maybe it has to do 
with joining to a table that was formed using a GROUP BY? Can somebody tell if 
I’m doing something that is known not to work, or if I have run across a bug?

Regards,
Dylan Forciea


object Job {
  def main(args: Array[String]): Unit = {
StreamExecutionEnvironment.setDefaultLocalParallelism(1)

val settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)

val configuration = streamTableEnv.getConfig().getConfiguration()
configuration.setInteger("table.exec.resource.default-parallelism", 16)

streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);

streamTableEnv.executeSql(
  """
  CREATE TABLE table1 (
id1 STRING PRIMARY KEY NOT ENFORCED,
attr STRING
  ) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…',
'table-name' = 'table1’,
'username' = 'username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
  )""")

streamTableEnv.executeSql(
  """
  CREATE TABLE table2 (
attr STRING,
id2 STRING
  ) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…',
'table-name' = 'table2',
'username' = 'username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
  )""")

streamTableEnv.executeSql(
  """
  CREATE TABLE table3 (
attr STRING PRIMARY KEY NOT ENFORCED,
attr_mapped STRING
  ) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…',
'table-name' = ‘table3',
'username' = ‘username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
  )""")

streamTableEnv.executeSql("""
  CREATE TABLE sink (
id STRING PRIMARY KEY NOT ENFORCED,
attr STRING,
attr_mapped STRING
  ) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…,
'table-name' = 'sink',
'username' = 'username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
  )""")

val view =
  streamTableEnv.sqlQuery("""
  SELECT
COALESCE(t1.id1, t2.id2) AS id,
COALESCE(t2.attr, t1.attr) AS operator,
COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped
  FROM table1 t1
  FULL JOIN (
SELECT
  id2,
  FIRST_VALUE(attr) AS attr
FROM table2
GROUP BY id2
  ) t2
   ON (t1.id1 = t2.id2)
  LEFT JOIN table3 t3
ON (COALESCE(t2.attr, t1.attr) = t3.attr)""")
streamTableEnv.createTemporaryView("view", view)

val statementSet = streamTableEnv.createStatementSet()
statementSet.addInsertSql("""
  INSERT INTO sink SELECT * FROM view
""")

statementSet.execute().await()
  }
}




Re: Trying to create a generic aggregate UDF

2021-01-21 Thread Dylan Forciea
I wanted to report that I tried out your PR, and it does solve my issue. I am 
able to create a generic LatestNonNull and it appears to do what is expected.

Thanks,
Dylan Forciea

On 1/21/21, 8:50 AM, "Timo Walther"  wrote:

I opened a PR. Feel free to try it out.

https://github.com/apache/flink/pull/14720

Btw:

 >> env.createTemporarySystemFunction("LatestNonNullLong",
 >> classOf[LatestNonNull[Long]])
 >>
 >> env.createTemporarySystemFunction("LatestNonNullString",
 >> classOf[LatestNonNull[String]])

don't make a difference. The generics will be type erased in bytecode 
and only the class name matters.

Thanks,
Timo

On 21.01.21 11:36, Timo Walther wrote:
> Hi Dylan,
> 
> thanks for the investigation. I can now also reproduce it my code. Yes, 
> this is a bug. I opened
> 
> https://issues.apache.org/jira/browse/FLINK-21070
> 
> and will try to fix this asap.
> 
> Thanks,
> Timo
> 
> On 20.01.21 17:52, Dylan Forciea wrote:
>> Timo,
>>
>> I converted what I had to Java, and ended up with the exact same issue 
>> as before where it will work if I only ever use it on 1 type, but not 
>> if I use it on multiple. Maybe this is a bug?
>>
>> Dylan
>>
>> On 1/20/21, 10:06 AM, "Dylan Forciea"  wrote:
>>
>>  Oh, I think I might have a clue as to what is going on. I notice 
>> that it will work properly when I only call it on Long. I think that 
>> it is using the same generated code for the Converter for whatever was 
>> called first.
>>
>>  Since in Scala I can't declare an object as static within the 
>> class itself, I wonder if it won't generate appropriate Converter code 
>> per subtype. I tried creating a subclass that is specific to the type 
>> within my class and returning that as the accumulator, but that didn't 
>> help. And, I can't refer to that class in the TypeInference since it 
>> isn't static and I get an error from Flink because of that. I'm going 
>> to see if I just write this UDF in Java with an embedded public static 
>> class like you have if it will solve my problems. I'll report back to 
>> let you know what I find. If that works, I'm not quite sure how to 
>> make it work in Scala.
>>
>>  Regards,
>>  Dylan Forciea
>>
>>  On 1/20/21, 9:34 AM, "Dylan Forciea"  wrote:
>>
>>  As a side note, I also just tried to unify into a single 
>> function registration and used _ as the type parameter in the classOf 
>> calls there and within the TypeInference definition for the 
>> accumulator and still ended up with the exact same stack trace.
>>
>>  Dylan
>>
>>  On 1/20/21, 9:22 AM, "Dylan Forciea"  wrote:
>>
>>  Timo,
>>
>>  I appreciate it! I am using Flink 1.12.0 right now with 
>> the Blink planner. What you proposed is roughly what I had come up 
>> with the first time around that resulted in the stack trace with the 
>> ClassCastException I had originally included. I saw that you had used 
>> a Row instead of just the value in our example, but changing it that 
>> way didn't seem to help, which makes sense since the problem seems to 
>> be in the code generated for the accumulator Converter and not the 
>> output.
>>
>>  Here is the exact code that caused that error (while 
>> calling LatestNonNullLong):
>>
>>  The registration of the below:
>>  
>> env.createTemporarySystemFunction("LatestNonNullLong", 
>> classOf[LatestNonNull[Long]])
>>  
>> env.createTemporarySystemFunction("LatestNonNullString", 
>> classOf[LatestNonNull[String]])
>>
>>
>>  The class itself:
>>
>>  import java.time.LocalDate
>>  import java.util.Optional
>>  import org.apache.flink.table.api.DataTypes
>>  import org.apache.flink.table.catalog.DataTypeFactory
>>  import org.apache.flink.table.functions.AggregateFunction
>>  import 
>> org.apache.flink.table.types.inference.{InputTypeStrategies, 
>> TypeInference}
>>
>>

Re: Trying to create a generic aggregate UDF

2021-01-21 Thread Dylan Forciea
Timo,

Will do! I have been patching in a change locally that I have a PR [1] out for, 
so if this will end up in the next 1.12 patch release, I may add this in with 
it once it has been approved and merged.

On a side note, that PR has been out since the end of October (looks like I 
need to do a rebase to accommodate the code reformatting change that occurred 
since). Is there a process for getting somebody to review it? Not sure if with 
the New Year and the 1.12 release and follow-up if it just got lost in the 
commotion.

Regards,
Dylan Forciea

[1] https://github.com/apache/flink/pull/13787

On 1/21/21, 8:50 AM, "Timo Walther"  wrote:

I opened a PR. Feel free to try it out.

https://github.com/apache/flink/pull/14720

Btw:

 >> env.createTemporarySystemFunction("LatestNonNullLong",
 >> classOf[LatestNonNull[Long]])
 >>
 >> env.createTemporarySystemFunction("LatestNonNullString",
 >> classOf[LatestNonNull[String]])

don't make a difference. The generics will be type erased in bytecode 
and only the class name matters.

Thanks,
Timo

On 21.01.21 11:36, Timo Walther wrote:
> Hi Dylan,
> 
> thanks for the investigation. I can now also reproduce it my code. Yes, 
> this is a bug. I opened
> 
> https://issues.apache.org/jira/browse/FLINK-21070
> 
> and will try to fix this asap.
> 
> Thanks,
> Timo
> 
> On 20.01.21 17:52, Dylan Forciea wrote:
>> Timo,
>>
>> I converted what I had to Java, and ended up with the exact same issue 
>> as before where it will work if I only ever use it on 1 type, but not 
>> if I use it on multiple. Maybe this is a bug?
>>
>> Dylan
>>
>> On 1/20/21, 10:06 AM, "Dylan Forciea"  wrote:
>>
>>  Oh, I think I might have a clue as to what is going on. I notice 
>> that it will work properly when I only call it on Long. I think that 
>> it is using the same generated code for the Converter for whatever was 
>> called first.
>>
>>  Since in Scala I can't declare an object as static within the 
>> class itself, I wonder if it won't generate appropriate Converter code 
>> per subtype. I tried creating a subclass that is specific to the type 
>> within my class and returning that as the accumulator, but that didn't 
>> help. And, I can't refer to that class in the TypeInference since it 
>> isn't static and I get an error from Flink because of that. I'm going 
>> to see if I just write this UDF in Java with an embedded public static 
>> class like you have if it will solve my problems. I'll report back to 
>> let you know what I find. If that works, I'm not quite sure how to 
>> make it work in Scala.
>>
>>  Regards,
>>  Dylan Forciea
>>
>>  On 1/20/21, 9:34 AM, "Dylan Forciea"  wrote:
>>
>>  As a side note, I also just tried to unify into a single 
>> function registration and used _ as the type parameter in the classOf 
>> calls there and within the TypeInference definition for the 
>> accumulator and still ended up with the exact same stack trace.
>>
>>  Dylan
>>
>>  On 1/20/21, 9:22 AM, "Dylan Forciea"  wrote:
>>
>>  Timo,
>>
>>  I appreciate it! I am using Flink 1.12.0 right now with 
>> the Blink planner. What you proposed is roughly what I had come up 
>> with the first time around that resulted in the stack trace with the 
>> ClassCastException I had originally included. I saw that you had used 
>> a Row instead of just the value in our example, but changing it that 
>> way didn't seem to help, which makes sense since the problem seems to 
>> be in the code generated for the accumulator Converter and not the 
>> output.
>>
>>  Here is the exact code that caused that error (while 
>> calling LatestNonNullLong):
>>
>>  The registration of the below:
>>  
>> env.createTemporarySystemFunction("LatestNonNullLong", 
>> classOf[LatestNonNull[Long]])
>>  
>> env.createTemporarySystemFunction("LatestNonNullString", 
>> classOf[LatestNonNull[String]])
>>
>>
>>  The class itself:
>>
>>  import java.time.LocalDate
>>  

Re: Trying to create a generic aggregate UDF

2021-01-20 Thread Dylan Forciea
Timo,

I converted what I had to Java, and ended up with the exact same issue as 
before where it will work if I only ever use it on 1 type, but not if I use it 
on multiple. Maybe this is a bug?

Dylan

On 1/20/21, 10:06 AM, "Dylan Forciea"  wrote:

Oh, I think I might have a clue as to what is going on. I notice that it 
will work properly when I only call it on Long. I think that it is using the 
same generated code for the Converter for whatever was called first.

Since in Scala I can't declare an object as static within the class itself, 
I wonder if it won't generate appropriate Converter code per subtype. I tried 
creating a subclass that is specific to the type within my class and returning 
that as the accumulator, but that didn't help. And, I can't refer to that class 
in the TypeInference since it isn't static and I get an error from Flink 
because of that. I'm going to see if I just write this UDF in Java with an 
embedded public static class like you have if it will solve my problems. I'll 
report back to let you know what I find. If that works, I'm not quite sure how 
to make it work in Scala.

Regards,
Dylan Forciea

On 1/20/21, 9:34 AM, "Dylan Forciea"  wrote:

As a side note, I also just tried to unify into a single function 
registration and used _ as the type parameter in the classOf calls there and 
within the TypeInference definition for the accumulator and still ended up with 
the exact same stack trace.

Dylan

On 1/20/21, 9:22 AM, "Dylan Forciea"  wrote:

Timo,

I appreciate it! I am using Flink 1.12.0 right now with the Blink 
planner. What you proposed is roughly what I had come up with the first time 
around that resulted in the stack trace with the ClassCastException I had 
originally included. I saw that you had used a Row instead of just the value in 
our example, but changing it that way didn't seem to help, which makes sense 
since the problem seems to be in the code generated for the accumulator 
Converter and not the output. 

Here is the exact code that caused that error (while calling 
LatestNonNullLong):

The registration of the below:
env.createTemporarySystemFunction("LatestNonNullLong", 
classOf[LatestNonNull[Long]])
env.createTemporarySystemFunction("LatestNonNullString", 
classOf[LatestNonNull[String]])


The class itself:

import java.time.LocalDate
import java.util.Optional
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.catalog.DataTypeFactory
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.types.inference.{InputTypeStrategies, 
TypeInference}

case class LatestNonNullAccumulator[T](
var value: T = null.asInstanceOf[T],
var date: LocalDate = null)

class LatestNonNull[T] extends AggregateFunction[T, 
LatestNonNullAccumulator[T]] {

  override def createAccumulator(): LatestNonNullAccumulator[T] = {
LatestNonNullAccumulator[T]()
  }

  override def getValue(acc: LatestNonNullAccumulator[T]): T = {
acc.value
  }

  def accumulate(acc: LatestNonNullAccumulator[T], value: T, date: 
LocalDate): Unit = {
if (value != null) {
  Option(acc.date).fold {
acc.value = value
acc.date = date
  } { accDate =>
if (date != null && date.isAfter(accDate)) {
  acc.value = value
  acc.date = date
}
  }
}
  }

  def merge(
  acc: LatestNonNullAccumulator[T],
  it: java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = {
val iter = it.iterator()
while (iter.hasNext) {
  val a = iter.next()
  if (a.value != null) {
Option(acc.date).fold {
  acc.value = a.value
  acc.date = a.date
} { accDate =>
  Option(a.date).map { curDate =>
if (curDate.isAfter(accDate)) {
  acc.value = a.value
  acc.date = a.date
}
  }
}
  }
}
  }

  def resetAccumulator(acc: LatestNonNullAccumulator[T]): Unit = {
acc.value = null.asInstanceOf[T]
acc.date = null
  }

  override def getTypeInference(typeFactory: DataTypeFactory): 
TypeInfere

Re: Trying to create a generic aggregate UDF

2021-01-20 Thread Dylan Forciea
Oh, I think I might have a clue as to what is going on. I notice that it will 
work properly when I only call it on Long. I think that it is using the same 
generated code for the Converter for whatever was called first.

Since in Scala I can't declare an object as static within the class itself, I 
wonder if it won't generate appropriate Converter code per subtype. I tried 
creating a subclass that is specific to the type within my class and returning 
that as the accumulator, but that didn't help. And, I can't refer to that class 
in the TypeInference since it isn't static and I get an error from Flink 
because of that. I'm going to see if I just write this UDF in Java with an 
embedded public static class like you have if it will solve my problems. I'll 
report back to let you know what I find. If that works, I'm not quite sure how 
to make it work in Scala.

Regards,
Dylan Forciea

On 1/20/21, 9:34 AM, "Dylan Forciea"  wrote:

As a side note, I also just tried to unify into a single function 
registration and used _ as the type parameter in the classOf calls there and 
within the TypeInference definition for the accumulator and still ended up with 
the exact same stack trace.

Dylan

On 1/20/21, 9:22 AM, "Dylan Forciea"  wrote:

Timo,

I appreciate it! I am using Flink 1.12.0 right now with the Blink 
planner. What you proposed is roughly what I had come up with the first time 
around that resulted in the stack trace with the ClassCastException I had 
originally included. I saw that you had used a Row instead of just the value in 
our example, but changing it that way didn't seem to help, which makes sense 
since the problem seems to be in the code generated for the accumulator 
Converter and not the output. 

Here is the exact code that caused that error (while calling 
LatestNonNullLong):

The registration of the below:
env.createTemporarySystemFunction("LatestNonNullLong", 
classOf[LatestNonNull[Long]])
env.createTemporarySystemFunction("LatestNonNullString", 
classOf[LatestNonNull[String]])


The class itself:

import java.time.LocalDate
import java.util.Optional
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.catalog.DataTypeFactory
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.types.inference.{InputTypeStrategies, 
TypeInference}

case class LatestNonNullAccumulator[T](
var value: T = null.asInstanceOf[T],
var date: LocalDate = null)

class LatestNonNull[T] extends AggregateFunction[T, 
LatestNonNullAccumulator[T]] {

  override def createAccumulator(): LatestNonNullAccumulator[T] = {
LatestNonNullAccumulator[T]()
  }

  override def getValue(acc: LatestNonNullAccumulator[T]): T = {
acc.value
  }

  def accumulate(acc: LatestNonNullAccumulator[T], value: T, date: 
LocalDate): Unit = {
if (value != null) {
  Option(acc.date).fold {
acc.value = value
acc.date = date
  } { accDate =>
if (date != null && date.isAfter(accDate)) {
  acc.value = value
  acc.date = date
}
  }
}
  }

  def merge(
  acc: LatestNonNullAccumulator[T],
  it: java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = {
val iter = it.iterator()
while (iter.hasNext) {
  val a = iter.next()
  if (a.value != null) {
Option(acc.date).fold {
  acc.value = a.value
  acc.date = a.date
} { accDate =>
  Option(a.date).map { curDate =>
if (curDate.isAfter(accDate)) {
  acc.value = a.value
  acc.date = a.date
}
  }
}
  }
}
  }

  def resetAccumulator(acc: LatestNonNullAccumulator[T]): Unit = {
acc.value = null.asInstanceOf[T]
acc.date = null
  }

  override def getTypeInference(typeFactory: DataTypeFactory): 
TypeInference = {
TypeInference
  .newBuilder()
  .inputTypeStrategy(InputTypeStrategies
.sequence(InputTypeStrategies.ANY, 
InputTypeStrategies.explicit(DataTypes.DATE(
  .accumulatorTypeStrategy { callContext =>
val accDataType = DataTypes.STRUCTURED(
  classOf[LatestNonNullAccumulator[T]],
  DataTypes.FIELD("value", 
callContext.getArgumentDataTypes.get(0)),
  DataTypes.FIELD("date", DataTypes.DATE()))

  

Re: Trying to create a generic aggregate UDF

2021-01-20 Thread Dylan Forciea
As a side note, I also just tried to unify into a single function registration 
and used _ as the type parameter in the classOf calls there and within the 
TypeInference definition for the accumulator and still ended up with the exact 
same stack trace.

Dylan

On 1/20/21, 9:22 AM, "Dylan Forciea"  wrote:

Timo,

I appreciate it! I am using Flink 1.12.0 right now with the Blink planner. 
What you proposed is roughly what I had come up with the first time around that 
resulted in the stack trace with the ClassCastException I had originally 
included. I saw that you had used a Row instead of just the value in our 
example, but changing it that way didn't seem to help, which makes sense since 
the problem seems to be in the code generated for the accumulator Converter and 
not the output. 

Here is the exact code that caused that error (while calling 
LatestNonNullLong):

The registration of the below:
env.createTemporarySystemFunction("LatestNonNullLong", 
classOf[LatestNonNull[Long]])
env.createTemporarySystemFunction("LatestNonNullString", 
classOf[LatestNonNull[String]])


The class itself:

import java.time.LocalDate
import java.util.Optional
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.catalog.DataTypeFactory
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.types.inference.{InputTypeStrategies, 
TypeInference}

case class LatestNonNullAccumulator[T](
var value: T = null.asInstanceOf[T],
var date: LocalDate = null)

class LatestNonNull[T] extends AggregateFunction[T, 
LatestNonNullAccumulator[T]] {

  override def createAccumulator(): LatestNonNullAccumulator[T] = {
LatestNonNullAccumulator[T]()
  }

  override def getValue(acc: LatestNonNullAccumulator[T]): T = {
acc.value
  }

  def accumulate(acc: LatestNonNullAccumulator[T], value: T, date: 
LocalDate): Unit = {
if (value != null) {
  Option(acc.date).fold {
acc.value = value
acc.date = date
  } { accDate =>
if (date != null && date.isAfter(accDate)) {
  acc.value = value
  acc.date = date
}
  }
}
  }

  def merge(
  acc: LatestNonNullAccumulator[T],
  it: java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = {
val iter = it.iterator()
while (iter.hasNext) {
  val a = iter.next()
  if (a.value != null) {
Option(acc.date).fold {
  acc.value = a.value
  acc.date = a.date
} { accDate =>
  Option(a.date).map { curDate =>
if (curDate.isAfter(accDate)) {
  acc.value = a.value
  acc.date = a.date
}
  }
}
  }
}
  }

  def resetAccumulator(acc: LatestNonNullAccumulator[T]): Unit = {
acc.value = null.asInstanceOf[T]
acc.date = null
  }

  override def getTypeInference(typeFactory: DataTypeFactory): 
TypeInference = {
TypeInference
  .newBuilder()
  .inputTypeStrategy(InputTypeStrategies
.sequence(InputTypeStrategies.ANY, 
InputTypeStrategies.explicit(DataTypes.DATE(
  .accumulatorTypeStrategy { callContext =>
val accDataType = DataTypes.STRUCTURED(
  classOf[LatestNonNullAccumulator[T]],
  DataTypes.FIELD("value", callContext.getArgumentDataTypes.get(0)),
  DataTypes.FIELD("date", DataTypes.DATE()))

Optional.of(accDataType)
  }
  .outputTypeStrategy { callContext =>
val outputDataType = callContext.getArgumentDataTypes().get(0);
Optional.of(outputDataType);
      }
  .build()
  }
}

Regards,
Dylan Forciea

On 1/20/21, 2:37 AM, "Timo Walther"  wrote:

Hi Dylan,

I'm assuming your are using Flink 1.12 and the Blink planner?

Beginning from 1.12 you can use the "new" aggregate functions with a 
better type inference. So TypeInformation will not be used in this 
stack.

I tried to come up with an example that should explain the rough 
design. 
I will include this example into the Flink code base. I hope this helps:



import org.apache.flink.table.types.inference.InputTypeStrategies;

public static class LastIfNotNull
 extends AggregateFunction> {

 public static class Accumulator {
 public T value;
 public LocalDate date;
 }

 public void accumulate(Accumulator acc, T input, LocalDate 
date) {
 if (input != null) {
 acc.value = input;
 ac

Re: Trying to create a generic aggregate UDF

2021-01-20 Thread Dylan Forciea
Timo,

I appreciate it! I am using Flink 1.12.0 right now with the Blink planner. What 
you proposed is roughly what I had come up with the first time around that 
resulted in the stack trace with the ClassCastException I had originally 
included. I saw that you had used a Row instead of just the value in our 
example, but changing it that way didn't seem to help, which makes sense since 
the problem seems to be in the code generated for the accumulator Converter and 
not the output. 

Here is the exact code that caused that error (while calling LatestNonNullLong):

The registration of the below:
env.createTemporarySystemFunction("LatestNonNullLong", 
classOf[LatestNonNull[Long]])
env.createTemporarySystemFunction("LatestNonNullString", 
classOf[LatestNonNull[String]])


The class itself:

import java.time.LocalDate
import java.util.Optional
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.catalog.DataTypeFactory
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.types.inference.{InputTypeStrategies, 
TypeInference}

case class LatestNonNullAccumulator[T](
var value: T = null.asInstanceOf[T],
var date: LocalDate = null)

class LatestNonNull[T] extends AggregateFunction[T, 
LatestNonNullAccumulator[T]] {

  override def createAccumulator(): LatestNonNullAccumulator[T] = {
LatestNonNullAccumulator[T]()
  }

  override def getValue(acc: LatestNonNullAccumulator[T]): T = {
acc.value
  }

  def accumulate(acc: LatestNonNullAccumulator[T], value: T, date: LocalDate): 
Unit = {
if (value != null) {
  Option(acc.date).fold {
acc.value = value
acc.date = date
  } { accDate =>
if (date != null && date.isAfter(accDate)) {
  acc.value = value
  acc.date = date
}
  }
}
  }

  def merge(
  acc: LatestNonNullAccumulator[T],
  it: java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = {
val iter = it.iterator()
while (iter.hasNext) {
  val a = iter.next()
  if (a.value != null) {
Option(acc.date).fold {
  acc.value = a.value
  acc.date = a.date
} { accDate =>
  Option(a.date).map { curDate =>
if (curDate.isAfter(accDate)) {
  acc.value = a.value
  acc.date = a.date
}
  }
}
  }
}
  }

  def resetAccumulator(acc: LatestNonNullAccumulator[T]): Unit = {
acc.value = null.asInstanceOf[T]
acc.date = null
  }

  override def getTypeInference(typeFactory: DataTypeFactory): TypeInference = {
TypeInference
  .newBuilder()
  .inputTypeStrategy(InputTypeStrategies
.sequence(InputTypeStrategies.ANY, 
InputTypeStrategies.explicit(DataTypes.DATE(
  .accumulatorTypeStrategy { callContext =>
val accDataType = DataTypes.STRUCTURED(
  classOf[LatestNonNullAccumulator[T]],
  DataTypes.FIELD("value", callContext.getArgumentDataTypes.get(0)),
  DataTypes.FIELD("date", DataTypes.DATE()))

Optional.of(accDataType)
  }
  .outputTypeStrategy { callContext =>
val outputDataType = callContext.getArgumentDataTypes().get(0);
Optional.of(outputDataType);
  }
  .build()
  }
}

Regards,
Dylan Forciea

On 1/20/21, 2:37 AM, "Timo Walther"  wrote:

Hi Dylan,

I'm assuming your are using Flink 1.12 and the Blink planner?

Beginning from 1.12 you can use the "new" aggregate functions with a 
better type inference. So TypeInformation will not be used in this stack.

I tried to come up with an example that should explain the rough design. 
I will include this example into the Flink code base. I hope this helps:



import org.apache.flink.table.types.inference.InputTypeStrategies;

public static class LastIfNotNull
 extends AggregateFunction> {

 public static class Accumulator {
 public T value;
 public LocalDate date;
 }

 public void accumulate(Accumulator acc, T input, LocalDate date) {
 if (input != null) {
 acc.value = input;
 acc.date = date;
 }
 }

 @Override
 public Row getValue(Accumulator acc) {
 return Row.of(acc.value, acc.date);
 }

 @Override
 public Accumulator createAccumulator() {
 return new Accumulator<>();
 }

 @Override
 public TypeInference getTypeInference(DataTypeFactory typeFactory) {
 return TypeInference.newBuilder()
 .inputTypeStrategy(
 InputTypeStrategies.sequence(
 InputTypeStrategies.ANY,

InputTypeStrategies.explicit(DataTypes.DATE(
 .accumulatorTypeStrategy(
   

Trying to create a generic aggregate UDF

2021-01-19 Thread Dylan Forciea
I am attempting to create an aggregate UDF that takes a generic parameter T, 
but for the life of me, I can’t seem to get it to work.

The UDF I’m trying to implement takes two input arguments, a value that is 
generic, and a date. It will choose the non-null value with the latest 
associated date. I had originally done this with separate Top 1 queries 
connected with a left join, but the memory usage seems far higher than doing 
this with a custom aggregate function.

As a first attempt, I tried to use custom type inference to have it validate 
that the first argument type is the output type and have a single function, and 
also used DataTypes.STRUCTURE to try to define the shape of my accumulator. 
However, that resulted in an exception like this whenever I tried to use a 
non-string value as the first argument:

[error] Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast 
to java.lang.String
[error]   at 
io$oseberg$flink$udf$LatestNonNullAccumulator$Converter.toInternal(Unknown 
Source)
[error]   at 
org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:92)
[error]   at 
org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:47)
[error]   at 
org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:59)
[error]   at GroupAggsHandler$777.getAccumulators(Unknown Source)
[error]   at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:175)
[error]   at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:45)
[error]   at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
[error]   at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
[error]   at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
[error]   at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
[error]   at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
[error]   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
[error]   at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
[error]   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
[error]   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
[error]   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
[error]   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
[error]   at java.lang.Thread.run(Thread.java:748)

Figuring that I can’t do something of that sort, I tried to follow the general 
approach in the Sum accumulator[1] in the Flink source code where separate 
classes are derived from a base class, and each advertises its accumulator 
shape, but ended up with the exact same stack trace as above when I tried to 
create and use a function specifically for a non-string type like Long.

Is there something I’m missing as far as how this is supposed to be done? 
Everything I try either results in a stack track like the above, or type 
erasure issues when trying to get type information for the accumulator. If I 
just copy the generic code multiple times and just directly use Long or String 
rather than using subclassing, then it works just fine. I appreciate any help I 
can get on this!

Regards,
Dylan Forciea

[1] 
https://github.com/apache/flink/blob/release-1.12.0/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala





Re: Batch loading into postgres database

2020-12-08 Thread Dylan Forciea
After a bit more playing around with it today, I figured out that what I needed 
to call was:

statementSet.execute().getJobClient().get().getJobExecutionResult(getClass().getClassLoader()).get()

The fact that getJobExecutionResult required a classloader is what threw me 
off. Since I’m using an application cluster, I didn’t really think about the 
fact that I would need to supply one. It might be nice to have a flavor of this 
that just uses the default system class loader.

However, I’m still interested in if there is a better way to handle reloading a 
table or tables in active use than the series of steps that I went through, if 
anybody has any suggestions!

Thanks,
Dylan Forciea

From: Dylan Forciea 
Date: Monday, December 7, 2020 at 5:33 PM
To: "user@flink.apache.org" 
Subject: Re: Batch loading into postgres database

As a follow up – I’m trying to follow the approach I outlined below, and I’m 
having trouble figuring out how to perform the step of doing the delete/insert 
after the job is complete.

I’ve tried adding a job listener, like so, but that doesn’t seem to ever get 
fired off:

val statementSet = streamTableEnv.createStatementSet()
statementSet.addInsertSql("""
  INSERT INTO table1_staging
SELECT * FROM table
""")

statementSet.addInsertSql("""
  INSERT INTO table2_staging
SELECT * FROM table
""")

statementSet.addInsertSql("""
  INSERT INTO table3_staging
SELECT * FROM table3
""")

streamEnv.registerJobListener(new JobListener() {
  override def onJobSubmitted(jobClient: JobClient, throwable: Throwable): 
Unit = {}

  override def onJobExecuted(result: JobExecutionResult, throwable: 
Throwable): Unit = {
val time = Option(result).map(_.getNetRuntime())
if (throwable == null) {
  Log.info(s"Completed job successfully in $time milliseconds")
} else {
  Log.error(s"Unable to execute job successfully", throwable)
}
  }
})
statementSet.execute()

I tried the above with the execute before and after the register, but it 
doesn’t seem to fire in any case.

I also tried this:

Try(statementSet.execute().getJobClient().get().getJobStatus().join())
  .map { _ =>
Log.info(s"Completed job successfully")
  }
  .recover {
case t => {
  Log.error(s"Unable to execute job successfully", t)
}
  }

And this seems to have fired WAY before the job actually finished flowing all 
the data through. I tried both join and get on the job status CompleteableFuture

Is there anything I’m missing as far as being able to tell when the job is 
complete? Again, this is Flink 1.11.2 that I’m running.

Thanks,
Dylan Forciea

From: Dylan Forciea 
Date: Monday, December 7, 2020 at 8:04 AM
To: "user@flink.apache.org" 
Subject: Batch loading into postgres database

I am setting up a Flink job that will reload a table in a postgres database 
using the Flink SQL functionality. I just wanted to make sure that given the 
current feature set I am going about this the correct way. I am currently using 
version 1.11.2, but plan on upgrading to 1.12 soon whenever it is finalized.

I have setup a staging table and a final table in a postgres database. My plan 
is to have a Flink application that will truncate the contents of the staging 
table before the job begins using JDBC, run the job to completion, and then 
with JDBC delete/insert into the final table from the staging table in a 
transaction after the job completes.

Is this the expected way to interact with postgres in a batch job like this? Or 
is there some functionality or method that I am missing?

Regards,
Dylan Forciae


Re: Batch loading into postgres database

2020-12-07 Thread Dylan Forciea
As a follow up – I’m trying to follow the approach I outlined below, and I’m 
having trouble figuring out how to perform the step of doing the delete/insert 
after the job is complete.

I’ve tried adding a job listener, like so, but that doesn’t seem to ever get 
fired off:

val statementSet = streamTableEnv.createStatementSet()
statementSet.addInsertSql("""
  INSERT INTO table1_staging
SELECT * FROM table
""")

statementSet.addInsertSql("""
  INSERT INTO table2_staging
SELECT * FROM table
""")

statementSet.addInsertSql("""
  INSERT INTO table3_staging
SELECT * FROM table3
""")

streamEnv.registerJobListener(new JobListener() {
  override def onJobSubmitted(jobClient: JobClient, throwable: Throwable): 
Unit = {}

  override def onJobExecuted(result: JobExecutionResult, throwable: 
Throwable): Unit = {
val time = Option(result).map(_.getNetRuntime())
if (throwable == null) {
  Log.info(s"Completed job successfully in $time milliseconds")
} else {
  Log.error(s"Unable to execute job successfully", throwable)
}
  }
})
statementSet.execute()

I tried the above with the execute before and after the register, but it 
doesn’t seem to fire in any case.

I also tried this:

Try(statementSet.execute().getJobClient().get().getJobStatus().join())
  .map { _ =>
Log.info(s"Completed job successfully")
  }
  .recover {
case t => {
  Log.error(s"Unable to execute job successfully", t)
}
  }

And this seems to have fired WAY before the job actually finished flowing all 
the data through. I tried both join and get on the job status CompleteableFuture

Is there anything I’m missing as far as being able to tell when the job is 
complete? Again, this is Flink 1.11.2 that I’m running.

Thanks,
Dylan Forciea

From: Dylan Forciea 
Date: Monday, December 7, 2020 at 8:04 AM
To: "user@flink.apache.org" 
Subject: Batch loading into postgres database

I am setting up a Flink job that will reload a table in a postgres database 
using the Flink SQL functionality. I just wanted to make sure that given the 
current feature set I am going about this the correct way. I am currently using 
version 1.11.2, but plan on upgrading to 1.12 soon whenever it is finalized.

I have setup a staging table and a final table in a postgres database. My plan 
is to have a Flink application that will truncate the contents of the staging 
table before the job begins using JDBC, run the job to completion, and then 
with JDBC delete/insert into the final table from the staging table in a 
transaction after the job completes.

Is this the expected way to interact with postgres in a batch job like this? Or 
is there some functionality or method that I am missing?

Regards,
Dylan Forciae


Batch loading into postgres database

2020-12-07 Thread Dylan Forciea
I am setting up a Flink job that will reload a table in a postgres database 
using the Flink SQL functionality. I just wanted to make sure that given the 
current feature set I am going about this the correct way. I am currently using 
version 1.11.2, but plan on upgrading to 1.12 soon whenever it is finalized.

I have setup a staging table and a final table in a postgres database. My plan 
is to have a Flink application that will truncate the contents of the staging 
table before the job begins using JDBC, run the job to completion, and then 
with JDBC delete/insert into the final table from the staging table in a 
transaction after the job completes.

Is this the expected way to interact with postgres in a batch job like this? Or 
is there some functionality or method that I am missing?

Regards,
Dylan Forciae


Re: Lateral join not finding correlate variable

2020-11-21 Thread Dylan Forciea
Godfrey,

Glad I could help! I suspected that was what the problem was. I have made a 
view in my postgres database to perform the inner lateral join, so that should 
let me work around this for the time being.

Thanks,
Dylan

From: godfrey he 
Date: Friday, November 20, 2020 at 1:09 AM
To: Dylan Forciea 
Cc: "user@flink.apache.org" 
Subject: Re: Lateral join not finding correlate variable

Hi Dylan,

I have reproduced your issue based on your code,
Currently Flink does not support such nested correlate pattern query.
I have created a issue to track this [1].
Thanks for your reporting and help.

[1] https://issues.apache.org/jira/browse/FLINK-20255

Best,
Godfrey

Dylan Forciea mailto:dy...@oseberg.io>> 于2020年11月19日周四 
下午12:10写道:
Godfrey,

I confirmed that in Flink 1.11.2 and in 1.12-SNAPSHOT I get the stack trace 
running exactly this code:

import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row
import org.apache.flink.table.annotation.FunctionHint
import org.apache.flink.table.annotation.DataTypeHint
import org.apache.flink.table.functions.TableFunction


@FunctionHint(output = new DataTypeHint("ROW"))
class SplitStringToRows extends TableFunction[Row] {
  def eval(str: String, separator: String = ";"): Unit = {
if (str != null) {
  str.split(separator).foreach(s => collect(Row.of(s.trim(
}
  }
}
object Job {

  def main(args: Array[String]): Unit = {
val settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)

streamTableEnv.createTemporarySystemFunction(
  "SplitStringToRows",
  classOf[SplitStringToRows]
) // Class defined in previous email

streamTableEnv.executeSql(
  """
  CREATE TABLE table1 (
id_source BIGINT PRIMARY KEY,
attr1_source STRING,
attr2 STRING
  ) WITH (
   'connector' = 'jdbc',
   'url' = 
'jdbc:postgresql://host.domain.com/db1?ssl=true<http://host.domain.com/db1?ssl=true>',
   'table-name' = '',
   'username' = '',
   'password' = '',
   'scan.fetch-size' = '500',
   'scan.auto-commit' = 'false')
""")

streamTableEnv.executeSql(
  """
  CREATE TABLE table2 (
attr1_source STRING,
attr2 STRING,
attr3 DECIMAL,
attr4 DATE
  ) WITH (
   'connector' = 'jdbc',
   'url' = 
'jdbc:postgresql://host.domain.com/db1?ssl=true<http://host.domain.com/db1?ssl=true>',
   'table-name' = '',
   'username' = '',
   'password' = '',
   'scan.fetch-size' = '500',
   'scan.auto-commit' = 'false')
""")

val q1 = streamTableEnv.sqlQuery("""
  SELECT
id_source AS id,
attr1_source AS attr1,
attr2
  FROM table1
""")
streamTableEnv.createTemporaryView("view1", q1)

val q2 = streamTableEnv.sqlQuery(
  """
SELECT
  a.attr1 AS attr1,
  attr2,
  attr3,
  attr4
FROM table2 p, LATERAL TABLE(SplitStringToRows(p.attr1_source, ';')) AS 
a(attr1)
""")
streamTableEnv.createTemporaryView("view2", q2)

val q3 = streamTableEnv.sqlQuery("""
SELECT
  w.attr1,
  p.attr3
FROM view1 w
LEFT JOIN LATERAL (
  SELECT
attr1,
attr3
  FROM (
SELECT
  attr1,
  attr3,
  ROW_NUMBER() OVER (
PARTITION BY attr1
ORDER BY
  attr4 DESC NULLS LAST,
  w.attr2 = attr2 DESC NULLS LAST
  ) AS row_num
  FROM view2)
  WHERE row_num = 1) p
ON (w.attr1 = p.attr1)
""")
streamTableEnv.createTemporaryView("view3", q3)

val view3 = streamTableEnv.from("view3")

view3
  .toRetractStream[Row]
  .writeAsCsv("./view3.csv", WriteMode.OVERWRITE)
  .setParallelism(1)

streamEnv.execute()
  }
}

Thanks,
Dylan Forciea

From: godfrey he mailto:godfre...@gmail.com>>
Date: Wednesday, November 18, 2020 at 8:29 PM
To: Dylan Forciea mailto:dy...@oseberg.io>>
Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" 
mailto:user@flink.apache.org>>
Subject: Re: Lateral join not finding correlate variable

Dylan,

Thanks for you feedback, if the planner encounters
"unexpected correlate variable $cor2 in the plan" exce

Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Dylan Forciea
Ah yes, missed the kafka part and just saw the array part. FLINK-19771 
definitely was solely in the postgres-specific code.

Dylan

From: Jark Wu 
Date: Thursday, November 19, 2020 at 9:12 AM
To: Dylan Forciea 
Cc: Danny Chan , Rex Fenley , Flink 
ML 
Subject: Re: Filter Null in Array in SQL Connector

Hi Dylan,

I think Rex encountered another issue, because he is using Kafka with Debezium 
format.

Hi Rex,

If you can share the json data and the exception stack, that would be helpful!

Besides, you can try to enable 'debezium-json.ignore-parse-errors' option [1] 
to skip the dirty data.

Best,
Jark

[1]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors

On Thu, 19 Nov 2020 at 21:13, Dylan Forciea 
mailto:dy...@oseberg.io>> wrote:
Do you mean that the array contains values that are null, or that the entire 
array itself is null? If it’s the latter, I have an issue written, along with a 
PR to fix it that has been pending review [1].

Regards,
Dylan Forciea

[1] https://issues.apache.org/jira/browse/FLINK-19771

From: Danny Chan mailto:danny0...@apache.org>>
Date: Thursday, November 19, 2020 at 2:24 AM
To: Rex Fenley mailto:r...@remind101.com>>
Cc: Flink ML mailto:user@flink.apache.org>>
Subject: Re: Filter Null in Array in SQL Connector

Hi, Fenley ~

You are right, parsing nulls of ARRAY field is not supported now, i have logged 
an issue [1] and would fix it soon ~

[1] https://issues.apache.org/jira/browse/FLINK-20234

Rex Fenley mailto:r...@remind101.com>> 于2020年11月19日周四 
下午2:51写道:
Hi,

I recently discovered some of our data has NULL values arriving in an 
ARRAY column. This column is being consumed by Flink via the Kafka 
connector Debezium format. We seem to be receiving NullPointerExceptions for 
when these NULL values in the arrays arrive which restarts the source operator 
in a loop.

Is there any way to not throw or to possibly filter out NULLs in an Array of 
Strings in Flink?

We're somewhat stuck on how to solve this problem, we'd like to be defensive 
about this on Flink's side.

Thanks!

(P.S. The exception was not that informative, there may be room for improvement 
in terms of a richer error message when this happens.)

--

Rex Fenley  |  Software Engineer - Mobile and Backend



Remind.com<https://www.remind.com/> |  BLOG<http://blog.remind.com/>  |  FOLLOW 
US<https://twitter.com/remindhq>  |  LIKE US<https://www.facebook.com/remindhq>


Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Dylan Forciea
Do you mean that the array contains values that are null, or that the entire 
array itself is null? If it’s the latter, I have an issue written, along with a 
PR to fix it that has been pending review [1].

Regards,
Dylan Forciea

[1] https://issues.apache.org/jira/browse/FLINK-19771

From: Danny Chan 
Date: Thursday, November 19, 2020 at 2:24 AM
To: Rex Fenley 
Cc: Flink ML 
Subject: Re: Filter Null in Array in SQL Connector

Hi, Fenley ~

You are right, parsing nulls of ARRAY field is not supported now, i have logged 
an issue [1] and would fix it soon ~

[1] https://issues.apache.org/jira/browse/FLINK-20234

Rex Fenley mailto:r...@remind101.com>> 于2020年11月19日周四 
下午2:51写道:
Hi,

I recently discovered some of our data has NULL values arriving in an 
ARRAY column. This column is being consumed by Flink via the Kafka 
connector Debezium format. We seem to be receiving NullPointerExceptions for 
when these NULL values in the arrays arrive which restarts the source operator 
in a loop.

Is there any way to not throw or to possibly filter out NULLs in an Array of 
Strings in Flink?

We're somewhat stuck on how to solve this problem, we'd like to be defensive 
about this on Flink's side.

Thanks!

(P.S. The exception was not that informative, there may be room for improvement 
in terms of a richer error message when this happens.)

--

Rex Fenley  |  Software Engineer - Mobile and Backend



Remind.com<https://www.remind.com/> |  BLOG<http://blog.remind.com/>  |  FOLLOW 
US<https://twitter.com/remindhq>  |  LIKE US<https://www.facebook.com/remindhq>


Re: Lateral join not finding correlate variable

2020-11-18 Thread Dylan Forciea
Godfrey,

I confirmed that in Flink 1.11.2 and in 1.12-SNAPSHOT I get the stack trace 
running exactly this code:

import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row
import org.apache.flink.table.annotation.FunctionHint
import org.apache.flink.table.annotation.DataTypeHint
import org.apache.flink.table.functions.TableFunction


@FunctionHint(output = new DataTypeHint("ROW"))
class SplitStringToRows extends TableFunction[Row] {
  def eval(str: String, separator: String = ";"): Unit = {
if (str != null) {
  str.split(separator).foreach(s => collect(Row.of(s.trim(
}
  }
}
object Job {

  def main(args: Array[String]): Unit = {
val settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)

streamTableEnv.createTemporarySystemFunction(
  "SplitStringToRows",
  classOf[SplitStringToRows]
) // Class defined in previous email

streamTableEnv.executeSql(
  """
  CREATE TABLE table1 (
id_source BIGINT PRIMARY KEY,
attr1_source STRING,
attr2 STRING
  ) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:postgresql://host.domain.com/db1?ssl=true',
   'table-name' = '',
   'username' = '',
   'password' = '',
   'scan.fetch-size' = '500',
   'scan.auto-commit' = 'false')
""")

streamTableEnv.executeSql(
  """
  CREATE TABLE table2 (
attr1_source STRING,
attr2 STRING,
attr3 DECIMAL,
attr4 DATE
  ) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:postgresql://host.domain.com/db1?ssl=true',
   'table-name' = '',
   'username' = '',
   'password' = '',
   'scan.fetch-size' = '500',
   'scan.auto-commit' = 'false')
""")

val q1 = streamTableEnv.sqlQuery("""
  SELECT
id_source AS id,
attr1_source AS attr1,
attr2
  FROM table1
""")
streamTableEnv.createTemporaryView("view1", q1)

val q2 = streamTableEnv.sqlQuery(
  """
SELECT
  a.attr1 AS attr1,
  attr2,
  attr3,
  attr4
FROM table2 p, LATERAL TABLE(SplitStringToRows(p.attr1_source, ';')) AS 
a(attr1)
""")
streamTableEnv.createTemporaryView("view2", q2)

val q3 = streamTableEnv.sqlQuery("""
SELECT
  w.attr1,
  p.attr3
FROM view1 w
LEFT JOIN LATERAL (
  SELECT
attr1,
attr3
  FROM (
SELECT
  attr1,
  attr3,
  ROW_NUMBER() OVER (
PARTITION BY attr1
ORDER BY
  attr4 DESC NULLS LAST,
  w.attr2 = attr2 DESC NULLS LAST
  ) AS row_num
  FROM view2)
  WHERE row_num = 1) p
ON (w.attr1 = p.attr1)
""")
streamTableEnv.createTemporaryView("view3", q3)

    val view3 = streamTableEnv.from("view3")

view3
  .toRetractStream[Row]
  .writeAsCsv("./view3.csv", WriteMode.OVERWRITE)
  .setParallelism(1)

streamEnv.execute()
  }
}

Thanks,
Dylan Forciea

From: godfrey he 
Date: Wednesday, November 18, 2020 at 8:29 PM
To: Dylan Forciea 
Cc: "user@flink.apache.org" 
Subject: Re: Lateral join not finding correlate variable

Dylan,

Thanks for you feedback, if the planner encounters
"unexpected correlate variable $cor2 in the plan" exception,
There's a high probability that FlinkDecorrelateProgram has some bugs
or the query pattern is not supported now. I try to use JDBC Connector as the 
input tables,
but I still don't reproduce the exception. Could you provide your full code, 
including ddl, query, etc.
Thanks so much.

Best,
Godfrey



Dylan Forciea mailto:dy...@oseberg.io>> 于2020年11月18日周三 
下午10:09写道:
Godfrey,

I was using Flink 1.11.2, but I just tried switching to 1.12-SNAPSHOT and am 
still having the same issue. Note that I am using the JDBC Connector for the 
input tables, and table1 and table2 are actually created from queries on those 
connector tables and not directly.

Since you indicated what I did should work, I played around a bit more, and 
determined it’s something inside of the table2 query that is triggering the 
error. The id field there is generated by a table function. Removing that piece 
made the plan start working. Table 2 is formulated as follows:

SELECT
  T.id,
  attr2,

Re: Lateral join not finding correlate variable

2020-11-18 Thread Dylan Forciea
Godfrey,

I was using Flink 1.11.2, but I just tried switching to 1.12-SNAPSHOT and am 
still having the same issue. Note that I am using the JDBC Connector for the 
input tables, and table1 and table2 are actually created from queries on those 
connector tables and not directly.

Since you indicated what I did should work, I played around a bit more, and 
determined it’s something inside of the table2 query that is triggering the 
error. The id field there is generated by a table function. Removing that piece 
made the plan start working. Table 2 is formulated as follows:

SELECT
  T.id,
  attr2,
  attr3,
  attr4
FROM table3 t3, LATERAL TABLE(SplitStringToRows(t3.id, ';')) AS T(id)

Where SplitStringToRows is defined as:

@FunctionHint(output = new DataTypeHint("ROW"))
class SplitStringToRows extends TableFunction[Row] {

  def eval(str: String, separator: String = ";"): Unit = {
if (str != null) {
  str.split(separator).foreach(s => collect(Row.of(s.trim(
}
  }
}

Removing the lateral table bit in that first table made the original query plan 
work correctly.

I greatly appreciate your assistance!

Regards,
Dylan Forciea

From: godfrey he 
Date: Wednesday, November 18, 2020 at 7:33 AM
To: Dylan Forciea 
Cc: "user@flink.apache.org" 
Subject: Re: Lateral join not finding correlate variable

Hi Dylan,

Could you provide which Flink version you find out the problem with?
I test the above query on master, and I get the plan, no errors occur.
Here is my test case:

@Test
def testLateralJoin(): Unit = {
  util.addTableSource[(String, String, String, String, String)]("table1", 'id, 
'attr1, 'attr2, 'attr3, 'attr4)
  util.addTableSource[(String, String, String, String, String)]("table2", 'id, 
'attr1, 'attr2, 'attr3, 'attr4)
  val query =
"""
  |SELECT
  |  t1.id<http://t1.id>,
  |  t1.attr1,
  |  t2.attr2
  |FROM table1 t1
  |LEFT JOIN LATERAL (
  |  SELECT
  |id,
  |attr2
  |  FROM (
  |SELECT
  |  id,
  |  attr2,
  |  ROW_NUMBER() OVER (
  |PARTITION BY id
  |ORDER BY
  |  attr3 DESC,
  |  t1.attr4 = attr4 DESC
  |  ) AS row_num
  |FROM table2)
  |WHERE row_num = 1) t2
  |ON t1.id<http://t1.id> = t2.id<http://t2.id>
  |""".stripMargin
  util.verifyPlan(query)
}
Best,
Godfrey

Dylan Forciea mailto:dy...@oseberg.io>> 于2020年11月18日周三 
上午7:44写道:
This may be due to not understanding  lateral joins in Flink – perhaps you can 
only do so on temporal variables – but I figured I’d ask since the error 
message isn’t intuitive.

I am trying to do a combination of a lateral join and a top N query. Part of my 
ordering is based upon whether the a value in the left side of the query 
matches up. I’m trying to do this in the general form of:

SELECT
  t1.id<http://t1.id>,
  t1.attr1,
  t2.attr2
FROM table1 t1
LEFT JOIN LATERAL (
  SELECT
id,
attr2
  FROM (
SELECT
  id,
  attr2,
  ROW_NUMBER() OVER (
PARTITION BY id
ORDER BY
  attr3 DESC,
  t1.attr4 = attr4 DESC
  ) AS row_num
FROM table2
WHERE row_num = 1) t2
ON (t1.id<http://t1.id> = t2.id<http://t2.id>)

I am getting an error that looks like:

Exception in thread "main" org.apache.flink.table.api.TableException: 
unexpected correlate variable $cor2 in the plan
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:58)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
 at 
scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
 at 
scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
 at scala.collection.Iterator.foreach(Iterator.scala:943)
 at scala.collection.Iterator.foreach$(Iterator.scala:943)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
 at scala.collection.IterableLike.foreach(IterableLike.scala:74)
 at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
 at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
 at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
 at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamComm

Lateral join not finding correlate variable

2020-11-17 Thread Dylan Forciea
This may be due to not understanding  lateral joins in Flink – perhaps you can 
only do so on temporal variables – but I figured I’d ask since the error 
message isn’t intuitive.

I am trying to do a combination of a lateral join and a top N query. Part of my 
ordering is based upon whether the a value in the left side of the query 
matches up. I’m trying to do this in the general form of:

SELECT
  t1.id,
  t1.attr1,
  t2.attr2
FROM table1 t1
LEFT JOIN LATERAL (
  SELECT
id,
attr2
  FROM (
SELECT
  id,
  attr2,
  ROW_NUMBER() OVER (
PARTITION BY id
ORDER BY
  attr3 DESC,
  t1.attr4 = attr4 DESC
  ) AS row_num
FROM table2
WHERE row_num = 1) t2
ON (t1.id = t2.id)

I am getting an error that looks like:

Exception in thread "main" org.apache.flink.table.api.TableException: 
unexpected correlate variable $cor2 in the plan
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:58)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
 at 
scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
 at 
scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
 at scala.collection.Iterator.foreach(Iterator.scala:943)
 at scala.collection.Iterator.foreach$(Iterator.scala:943)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
 at scala.collection.IterableLike.foreach(IterableLike.scala:74)
 at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
 at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
 at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
 at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
 at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:294)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
 at 
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178)
 at 
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113)
 at 
org.apache.flink.table.api.bridge.scala.TableConversions.toRetractStream(TableConversions.scala:97)
 at io.oseberg.flink.well.ok.Job$.main(Job.scala:57)
 at io.oseberg.flink.well.ok.Job.main(Job.scala)

The only other thing I can think of doing is creating a Table Aggregate 
function to pull this off. But, I wanted to check to make sure I wasn’t doing 
something wrong in the above first, or if there is something I’m not thinking 
of doing.

Regards,
Dylan Forciea


Re: NullPointerException when trying to read null array in Postgres using JDBC Connector

2020-10-22 Thread Dylan Forciea
Danny,

Thanks! I have created a new JIRA issue [1]. I’ll look into how hard it is to 
get a patch and unit test myself, although I may need a hand on the process of 
making a change to both the master branch and a release branch if it is desired 
to get a fix into 1.11.

Regards,
Dylan Forciea

[1] https://issues.apache.org/jira/browse/FLINK-19771

From: Danny Chan 
Date: Thursday, October 22, 2020 at 4:34 AM
To: Dylan Forciea 
Cc: Flink ML 
Subject: Re: NullPointerException when trying to read null array in Postgres 
using JDBC Connector

Yes, the current code throws directly for NULLs, can you log an issue there ?

Dylan Forciea mailto:dy...@oseberg.io>> 于2020年10月21日周三 
上午4:30写道:
I believe I am getting an error because I have a nullable postgres array of 
text that is set to NULL that I’m reading using the JDBC SQL Connector. Is this 
something that should be allowed? Looking at the source code line below, it 
doesn’t look like the case of an array being null would be handled.

[error] Caused by: java.io.IOException: Couldn't access resultSet
[error]   at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:266)
[error]   at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:57)
[error]   at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
[error]   at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
[error]   at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
[error]   at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
[error] Caused by: java.lang.NullPointerException
[error]   at 
org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.lambda$createPostgresArrayConverter$c06ce9f4$2(PostgresRowConverter.java:97)
[error]   at 
org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.toInternal(AbstractJdbcRowConverter.java:79)
[error]   at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:259)
[error]   ... 5 more

Thanks,
Dylan Forciea


NullPointerException when trying to read null array in Postgres using JDBC Connector

2020-10-20 Thread Dylan Forciea
I believe I am getting an error because I have a nullable postgres array of 
text that is set to NULL that I’m reading using the JDBC SQL Connector. Is this 
something that should be allowed? Looking at the source code line below, it 
doesn’t look like the case of an array being null would be handled.

[error] Caused by: java.io.IOException: Couldn't access resultSet
[error]   at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:266)
[error]   at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:57)
[error]   at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
[error]   at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
[error]   at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
[error]   at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
[error] Caused by: java.lang.NullPointerException
[error]   at 
org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.lambda$createPostgresArrayConverter$c06ce9f4$2(PostgresRowConverter.java:97)
[error]   at 
org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.toInternal(AbstractJdbcRowConverter.java:79)
[error]   at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:259)
[error]   ... 5 more

Thanks,
Dylan Forciea


Re: Setting JDBC connector options using JdbcCatalog

2020-10-20 Thread Dylan Forciea
(Re-sending, so it also goes to the list serve)

Leonard,

That does seem to work for me, thank you! I was looking in the Table API, so 
that’s why I didn’t find that option.

Is there a way to specify the sql hints using the Table API? I had tried 
appending them onto a call to .from on the table environment, but it wasn’t 
happy with that.

Thanks,
Dylan Forciea


From: Leonard Xu 
Date: Wednesday, October 14, 2020 at 10:20 PM
To: Dylan Forciea 
Cc: "user@flink.apache.org" 
Subject: Re: Setting JDBC connector options using JdbcCatalog

Hi, Dylan

The table in JdbcCatalog only contains basic options, it’s normal the table 
from JdbcCatalog does not bring some options.
Flink provides SQL Hints feature to specify or override table options[1], you 
can have a try.

Best,
Leonard
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/hints.html#dynamic-table-options



在 2020年10月15日,10:20,Dylan Forciea mailto:dy...@oseberg.io>> 
写道:

 scan.fetch-size



Setting JDBC connector options using JdbcCatalog

2020-10-14 Thread Dylan Forciea
I was experimenting with the JdbcCatalog, and I see that the options match some 
of the SQL WITH options. I looked at the source code, and even see that it 
directly references those options from JdbcDynamicTableFactory. However, I 
didn’t see any obvious way to set scan.fetch-size or any way to generically set 
those properties that would go in the WITH clause of the SQL interface beyond 
the JdbcCatalog constructor arguments. Is there any way to set those after the 
catalog is created/registered?

Thanks,
Dylan Forciea


Re: autoCommit for postgres jdbc streaming in Table/SQL API

2020-10-09 Thread Dylan Forciea
Jark,

Thank you! I had actually mistyped the JIRA issue; autoCommit needs to be set 
to false for streaming to work. The default on the driver is true when the 
option isn’t specified. I’ve updated the issue accordingly.

Setting this to false automatically on the read path would fix my issue. 
However, I’m only certain that this is proper for Postgres. I’m not sure if 
this should be done for other drivers, although my gut would say it should be 
fine if it’s only done for reading. My patch as it is will set the builder to 
not specify whether to set autoCommit if the option is not specified, which 
means it would then be left at the default of true. That would conflict with 
the 1.11 patch you suggested. Let me know if you think I should make the 
default true in the SQL API.

https://github.com/apache/flink/pull/13570

Regards,
Dylan

From: Jark Wu 
Date: Thursday, October 8, 2020 at 10:15 PM
To: Dylan Forciea 
Cc: Till Rohrmann , dev , Shengkai 
Fang , "user@flink.apache.org" , 
Leonard Xu 
Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API

Hi Dylan,

Sorry for the late reply. We've just come back from a long holiday.

Thanks for reporting this problem. First, I think this is a bug that 
`autoCommit` is false by default (JdbcRowDataInputFormat.Builder).
We can fix the default to true in 1.11 series, and I think this can solve your 
problem in a short term?
Besides, we should expose the connector options to set auto commit and this can 
be another issue to be implemented in master.
I'm glad to review the code.

What do you think?

Regarding to the failed JMXReporterFactoryTest, I think this is a known issue, 
see FLINK-19539 [1]

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-19539

On Fri, 9 Oct 2020 at 01:29, Dylan Forciea 
mailto:dy...@oseberg.io>> wrote:
I’ve updated the unit tests and documentation, and I was running the azure test 
pipeline as described in the instructions. However, it appears that what seems 
to be an unrelated test for the JMX code failed. Is this a matter of me not 
setting things up correctly? I wanted to ensure everything looked good before I 
submitted the PR.

[ERROR] Failures:
[ERROR]   JMXReporterFactoryTest.testPortRangeArgument:46
Expected: (a value equal to or greater than <9000> and a value less than or 
equal to <9010>)
 but: a value less than or equal to <9010> <9040> was greater than <9010>
[ERROR]   JMXReporterFactoryTest.testWithoutArgument:60
[INFO]
[ERROR] Tests run: 10, Failures: 2, Errors: 0, Skipped: 0

Thanks,
Dylan Forciea

From: Till Rohrmann mailto:trohrm...@apache.org>>
Date: Thursday, October 8, 2020 at 2:29 AM
To: Dylan Forciea mailto:dy...@oseberg.io>>
Cc: dev mailto:d...@flink.apache.org>>, Shengkai Fang 
mailto:fskm...@gmail.com>>, 
"user@flink.apache.org<mailto:user@flink.apache.org>" 
mailto:user@flink.apache.org>>, 
"j...@apache.org<mailto:j...@apache.org>" 
mailto:j...@apache.org>>, Leonard Xu 
mailto:xbjt...@gmail.com>>
Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API

This sounds good. Maybe there are others in the community who can help with the 
review before the Jark and Leonard are back.

Cheers,
Till

On Wed, Oct 7, 2020 at 7:33 PM Dylan Forciea 
mailto:dy...@oseberg.io>> wrote:
Actually…. It looks like what I did covers both cases. I’ll see about getting 
some unit tests and documentation updated.

Dylan

From: Dylan Forciea mailto:dy...@oseberg.io>>
Date: Wednesday, October 7, 2020 at 11:47 AM
To: Till Rohrmann mailto:trohrm...@apache.org>>, dev 
mailto:d...@flink.apache.org>>
Cc: Shengkai Fang mailto:fskm...@gmail.com>>, 
"user@flink.apache.org<mailto:user@flink.apache.org>" 
mailto:user@flink.apache.org>>, 
"j...@apache.org<mailto:j...@apache.org>" 
mailto:j...@apache.org>>, Leonard Xu 
mailto:xbjt...@gmail.com>>
Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API

Ok, I have created FLINK-19522 describing the issue. I have the code I made so 
far checked in at 
https://github.com/apache/flink/compare/master...dforciea:FLINK-19522 but this 
only fixes the SQL API. It sounds like there may be another change needed for 
the Table API… I’ll look into that and see if I can figure it out on my own 
while they’re out. I will also need to add some unit tests and update some 
documentation to get this ready for a PR.

Thanks,
Dylan

From: Till Rohrmann mailto:trohrm...@apache.org>>
Date: Wednesday, October 7, 2020 at 10:55 AM
To: dev mailto:d...@flink.apache.org>>
Cc: Shengkai Fang mailto:fskm...@gmail.com>>, 
"user@flink.apache.org<mailto:user@flink.apache.org>" 
mailto:user@flink.apache.org>>, 
"j...@apache.org<mailto:j...@apache.org>" 
mailto:j...@apache.org>>, Leonard Xu 
mailto:xbjt...@gmail.com>

Re: autoCommit for postgres jdbc streaming in Table/SQL API

2020-10-08 Thread Dylan Forciea
I’ve updated the unit tests and documentation, and I was running the azure test 
pipeline as described in the instructions. However, it appears that what seems 
to be an unrelated test for the JMX code failed. Is this a matter of me not 
setting things up correctly? I wanted to ensure everything looked good before I 
submitted the PR.

[ERROR] Failures:
[ERROR]   JMXReporterFactoryTest.testPortRangeArgument:46
Expected: (a value equal to or greater than <9000> and a value less than or 
equal to <9010>)
 but: a value less than or equal to <9010> <9040> was greater than <9010>
[ERROR]   JMXReporterFactoryTest.testWithoutArgument:60
[INFO]
[ERROR] Tests run: 10, Failures: 2, Errors: 0, Skipped: 0

Thanks,
Dylan Forciea

From: Till Rohrmann 
Date: Thursday, October 8, 2020 at 2:29 AM
To: Dylan Forciea 
Cc: dev , Shengkai Fang , 
"user@flink.apache.org" , "j...@apache.org" 
, Leonard Xu 
Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API

This sounds good. Maybe there are others in the community who can help with the 
review before the Jark and Leonard are back.

Cheers,
Till

On Wed, Oct 7, 2020 at 7:33 PM Dylan Forciea 
mailto:dy...@oseberg.io>> wrote:
Actually…. It looks like what I did covers both cases. I’ll see about getting 
some unit tests and documentation updated.

Dylan

From: Dylan Forciea mailto:dy...@oseberg.io>>
Date: Wednesday, October 7, 2020 at 11:47 AM
To: Till Rohrmann mailto:trohrm...@apache.org>>, dev 
mailto:d...@flink.apache.org>>
Cc: Shengkai Fang mailto:fskm...@gmail.com>>, 
"user@flink.apache.org<mailto:user@flink.apache.org>" 
mailto:user@flink.apache.org>>, 
"j...@apache.org<mailto:j...@apache.org>" 
mailto:j...@apache.org>>, Leonard Xu 
mailto:xbjt...@gmail.com>>
Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API

Ok, I have created FLINK-19522 describing the issue. I have the code I made so 
far checked in at 
https://github.com/apache/flink/compare/master...dforciea:FLINK-19522 but this 
only fixes the SQL API. It sounds like there may be another change needed for 
the Table API… I’ll look into that and see if I can figure it out on my own 
while they’re out. I will also need to add some unit tests and update some 
documentation to get this ready for a PR.

Thanks,
Dylan

From: Till Rohrmann mailto:trohrm...@apache.org>>
Date: Wednesday, October 7, 2020 at 10:55 AM
To: dev mailto:d...@flink.apache.org>>
Cc: Shengkai Fang mailto:fskm...@gmail.com>>, 
"user@flink.apache.org<mailto:user@flink.apache.org>" 
mailto:user@flink.apache.org>>, 
"j...@apache.org<mailto:j...@apache.org>" 
mailto:j...@apache.org>>, Leonard Xu 
mailto:xbjt...@gmail.com>>
Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API

Hi Dylan,

thanks for reaching out to the Flink community and excuse our late response. I 
am not an expert for the Table API and its JDBC connector but what you describe 
sounds like a missing feature. Also given that FLINK-12198 enabled this feature 
for the JDBCInputFormat indicates that we might simply need to make it 
configurable from the JdbcTableSource. I am pulling in Jark and Leonard who 
worked on the JdbcTableSource and might help you to get this feature into 
Flink. Their response could take a week because they are currently on vacation 
if I am not mistaken.

What you could already do is to open an issue linking FLINK-12198 and 
describing the problem and your solution proposal.

[1] https://issues.apache.org/jira/browse/FLINK-12198

Cheers,
Till

On Wed, Oct 7, 2020 at 5:00 PM Dylan Forciea 
mailto:dy...@oseberg.io>> wrote:
I appreciate it! Let me know if you want me to submit a PR against the issue 
after it is created. It wasn’t a huge amount of code, so it’s probably not a 
big deal if you wanted to redo it.

Thanks,
Dylan

From: Shengkai Fang mailto:fskm...@gmail.com>>
Date: Wednesday, October 7, 2020 at 9:06 AM
To: Dylan Forciea mailto:dy...@oseberg.io>>
Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API

Sorry for late response. +1 to support it. I will open a jira about it later.

Dylan Forciea 
mailto:dy...@oseberg.io><mailto:dy...@oseberg.io<mailto:dy...@oseberg.io>>>于2020年10月7日
 周三下午9:53写道:













I hadn’t heard a response on this, so I’m going to expand this to the dev email 
list.



If this is indeed an issue and not my misunderstanding, I have most of a patch 
already coded up. Please let me know, and I can create a JIRA issue and send 
out a PR.



Regards,

Dylan Forciea

Oseberg




From: Dylan Forciea 
mailto:dy...@oseberg.io><mailto:dy...@oseberg.io<mailto:dy...@oseberg.io>>>


Date: Thursday, October 1, 2020 at 5:14 PM


To: 
"user@flink.apache.org<mailto:user@flink.apache.org><mailto:user@flink.apache.org<mailt

Re: autoCommit for postgres jdbc streaming in Table/SQL API

2020-10-07 Thread Dylan Forciea
Actually…. It looks like what I did covers both cases. I’ll see about getting 
some unit tests and documentation updated.

Dylan

From: Dylan Forciea 
Date: Wednesday, October 7, 2020 at 11:47 AM
To: Till Rohrmann , dev 
Cc: Shengkai Fang , "user@flink.apache.org" 
, "j...@apache.org" , Leonard Xu 

Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API

Ok, I have created FLINK-19522 describing the issue. I have the code I made so 
far checked in at 
https://github.com/apache/flink/compare/master...dforciea:FLINK-19522 but this 
only fixes the SQL API. It sounds like there may be another change needed for 
the Table API… I’ll look into that and see if I can figure it out on my own 
while they’re out. I will also need to add some unit tests and update some 
documentation to get this ready for a PR.

Thanks,
Dylan

From: Till Rohrmann 
Date: Wednesday, October 7, 2020 at 10:55 AM
To: dev 
Cc: Shengkai Fang , "user@flink.apache.org" 
, "j...@apache.org" , Leonard Xu 

Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API

Hi Dylan,

thanks for reaching out to the Flink community and excuse our late response. I 
am not an expert for the Table API and its JDBC connector but what you describe 
sounds like a missing feature. Also given that FLINK-12198 enabled this feature 
for the JDBCInputFormat indicates that we might simply need to make it 
configurable from the JdbcTableSource. I am pulling in Jark and Leonard who 
worked on the JdbcTableSource and might help you to get this feature into 
Flink. Their response could take a week because they are currently on vacation 
if I am not mistaken.

What you could already do is to open an issue linking FLINK-12198 and 
describing the problem and your solution proposal.

[1] https://issues.apache.org/jira/browse/FLINK-12198

Cheers,
Till

On Wed, Oct 7, 2020 at 5:00 PM Dylan Forciea 
mailto:dy...@oseberg.io>> wrote:
I appreciate it! Let me know if you want me to submit a PR against the issue 
after it is created. It wasn’t a huge amount of code, so it’s probably not a 
big deal if you wanted to redo it.

Thanks,
Dylan

From: Shengkai Fang mailto:fskm...@gmail.com>>
Date: Wednesday, October 7, 2020 at 9:06 AM
To: Dylan Forciea mailto:dy...@oseberg.io>>
Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API

Sorry for late response. +1 to support it. I will open a jira about it later.

Dylan Forciea 
mailto:dy...@oseberg.io><mailto:dy...@oseberg.io<mailto:dy...@oseberg.io>>>于2020年10月7日
 周三下午9:53写道:













I hadn’t heard a response on this, so I’m going to expand this to the dev email 
list.



If this is indeed an issue and not my misunderstanding, I have most of a patch 
already coded up. Please let me know, and I can create a JIRA issue and send 
out a PR.



Regards,

Dylan Forciea

Oseberg




From: Dylan Forciea 
mailto:dy...@oseberg.io><mailto:dy...@oseberg.io<mailto:dy...@oseberg.io>>>


Date: Thursday, October 1, 2020 at 5:14 PM


To: 
"user@flink.apache.org<mailto:user@flink.apache.org><mailto:user@flink.apache.org<mailto:user@flink.apache.org>>"
 
mailto:user@flink.apache.org><mailto:user@flink.apache.org<mailto:user@flink.apache.org>>>


Subject: autoCommit for postgres jdbc streaming in Table/SQL API






Hi! I’ve just recently started evaluating Flink for our ETL needs, and I ran 
across an issue with streaming postgres data via the Table/SQL API.



I see that the API has the scan.fetch-size option, but not scan.auto-commit per



https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html
 . I had attempted to load a large table in, but it completely slurped it into 
memory before starting the streaming. I modified the flink source code to add a 
scan.auto-commit

option, and I was then able to immediately start streaming and cut my memory 
usage way down.



I see in this thread that there was a similar issue resolved for 
JDBCInputFormat in this thread:



http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-JDBC-Disable-auto-commit-mode-td27256.html
 , but I don’t see a way to utilize that in the Table/SQL API.



Am I missing something on how to pull this off?



Regards,

Dylan Forciea

Oseberg







Re: autoCommit for postgres jdbc streaming in Table/SQL API

2020-10-07 Thread Dylan Forciea
Ok, I have created FLINK-19522 describing the issue. I have the code I made so 
far checked in at 
https://github.com/apache/flink/compare/master...dforciea:FLINK-19522 but this 
only fixes the SQL API. It sounds like there may be another change needed for 
the Table API… I’ll look into that and see if I can figure it out on my own 
while they’re out. I will also need to add some unit tests and update some 
documentation to get this ready for a PR.

Thanks,
Dylan

From: Till Rohrmann 
Date: Wednesday, October 7, 2020 at 10:55 AM
To: dev 
Cc: Shengkai Fang , "user@flink.apache.org" 
, "j...@apache.org" , Leonard Xu 

Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API

Hi Dylan,

thanks for reaching out to the Flink community and excuse our late response. I 
am not an expert for the Table API and its JDBC connector but what you describe 
sounds like a missing feature. Also given that FLINK-12198 enabled this feature 
for the JDBCInputFormat indicates that we might simply need to make it 
configurable from the JdbcTableSource. I am pulling in Jark and Leonard who 
worked on the JdbcTableSource and might help you to get this feature into 
Flink. Their response could take a week because they are currently on vacation 
if I am not mistaken.

What you could already do is to open an issue linking FLINK-12198 and 
describing the problem and your solution proposal.

[1] https://issues.apache.org/jira/browse/FLINK-12198

Cheers,
Till

On Wed, Oct 7, 2020 at 5:00 PM Dylan Forciea 
mailto:dy...@oseberg.io>> wrote:
I appreciate it! Let me know if you want me to submit a PR against the issue 
after it is created. It wasn’t a huge amount of code, so it’s probably not a 
big deal if you wanted to redo it.

Thanks,
Dylan

From: Shengkai Fang mailto:fskm...@gmail.com>>
Date: Wednesday, October 7, 2020 at 9:06 AM
To: Dylan Forciea mailto:dy...@oseberg.io>>
Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API

Sorry for late response. +1 to support it. I will open a jira about it later.

Dylan Forciea 
mailto:dy...@oseberg.io><mailto:dy...@oseberg.io<mailto:dy...@oseberg.io>>>于2020年10月7日
 周三下午9:53写道:













I hadn’t heard a response on this, so I’m going to expand this to the dev email 
list.



If this is indeed an issue and not my misunderstanding, I have most of a patch 
already coded up. Please let me know, and I can create a JIRA issue and send 
out a PR.



Regards,

Dylan Forciea

Oseberg




From: Dylan Forciea 
mailto:dy...@oseberg.io><mailto:dy...@oseberg.io<mailto:dy...@oseberg.io>>>


Date: Thursday, October 1, 2020 at 5:14 PM


To: 
"user@flink.apache.org<mailto:user@flink.apache.org><mailto:user@flink.apache.org<mailto:user@flink.apache.org>>"
 
mailto:user@flink.apache.org><mailto:user@flink.apache.org<mailto:user@flink.apache.org>>>


Subject: autoCommit for postgres jdbc streaming in Table/SQL API






Hi! I’ve just recently started evaluating Flink for our ETL needs, and I ran 
across an issue with streaming postgres data via the Table/SQL API.



I see that the API has the scan.fetch-size option, but not scan.auto-commit per



https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html
 . I had attempted to load a large table in, but it completely slurped it into 
memory before starting the streaming. I modified the flink source code to add a 
scan.auto-commit

option, and I was then able to immediately start streaming and cut my memory 
usage way down.



I see in this thread that there was a similar issue resolved for 
JDBCInputFormat in this thread:



http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-JDBC-Disable-auto-commit-mode-td27256.html
 , but I don’t see a way to utilize that in the Table/SQL API.



Am I missing something on how to pull this off?



Regards,

Dylan Forciea

Oseberg






Re: autoCommit for postgres jdbc streaming in Table/SQL API

2020-10-07 Thread Dylan Forciea
I appreciate it! Let me know if you want me to submit a PR against the issue 
after it is created. It wasn’t a huge amount of code, so it’s probably not a 
big deal if you wanted to redo it.

Thanks,
Dylan

From: Shengkai Fang 
Date: Wednesday, October 7, 2020 at 9:06 AM
To: Dylan Forciea 
Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API

Sorry for late response. +1 to support it. I will open a jira about it later.

Dylan Forciea mailto:dy...@oseberg.io>>于2020年10月7日 周三下午9:53写道:













I hadn’t heard a response on this, so I’m going to expand this to the dev email 
list.



If this is indeed an issue and not my misunderstanding, I have most of a patch 
already coded up. Please let me know, and I can create a JIRA issue and send 
out a PR.



Regards,

Dylan Forciea

Oseberg




From: Dylan Forciea mailto:dy...@oseberg.io>>


Date: Thursday, October 1, 2020 at 5:14 PM


To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
mailto:user@flink.apache.org>>


Subject: autoCommit for postgres jdbc streaming in Table/SQL API






Hi! I’ve just recently started evaluating Flink for our ETL needs, and I ran 
across an issue with streaming postgres data via the Table/SQL API.



I see that the API has the scan.fetch-size option, but not scan.auto-commit per



https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html
 . I had attempted to load a large table in, but it completely slurped it into 
memory before starting the streaming. I modified the flink source code to add a 
scan.auto-commit

option, and I was then able to immediately start streaming and cut my memory 
usage way down.



I see in this thread that there was a similar issue resolved for 
JDBCInputFormat in this thread:



http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-JDBC-Disable-auto-commit-mode-td27256.html
 , but I don’t see a way to utilize that in the Table/SQL API.



Am I missing something on how to pull this off?



Regards,

Dylan Forciea

Oseberg







Re: autoCommit for postgres jdbc streaming in Table/SQL API

2020-10-07 Thread Dylan Forciea
I hadn’t heard a response on this, so I’m going to expand this to the dev email 
list.

If this is indeed an issue and not my misunderstanding, I have most of a patch 
already coded up. Please let me know, and I can create a JIRA issue and send 
out a PR.

Regards,
Dylan Forciea
Oseberg

From: Dylan Forciea 
Date: Thursday, October 1, 2020 at 5:14 PM
To: "user@flink.apache.org" 
Subject: autoCommit for postgres jdbc streaming in Table/SQL API

Hi! I’ve just recently started evaluating Flink for our ETL needs, and I ran 
across an issue with streaming postgres data via the Table/SQL API.

I see that the API has the scan.fetch-size option, but not scan.auto-commit per 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html
 . I had attempted to load a large table in, but it completely slurped it into 
memory before starting the streaming. I modified the flink source code to add a 
scan.auto-commit option, and I was then able to immediately start streaming and 
cut my memory usage way down.

I see in this thread that there was a similar issue resolved for 
JDBCInputFormat in this thread: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-JDBC-Disable-auto-commit-mode-td27256.html
 , but I don’t see a way to utilize that in the Table/SQL API.

Am I missing something on how to pull this off?

Regards,
Dylan Forciea
Oseberg