[jira] [Created] (FLINK-35233) HBase lookup result is wrong when lookup cache is enabled

2024-04-24 Thread tanjialiang (Jira)
tanjialiang created FLINK-35233:
---

 Summary: HBase lookup result is wrong when lookup cache is enabled
 Key: FLINK-35233
 URL: https://issues.apache.org/jira/browse/FLINK-35233
 Project: Flink
  Issue Type: Bug
  Components: Connectors / HBase
Affects Versions: hbase-3.0.0
Reporter: tanjialiang


HBase table
||rowkey||name||age||
|1|ben|18|
|2|ken|19|
|3|mark|20|

 
FlinkSQL lookup join with lookup cahce
{code:java}
CREATE TABLE dim_user (
  rowkey STRING,
  info ROW,
  PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
  'connector' = 'hbase-2.2',
  'zookeeper.quorum' = 'localhost:2181',
  'zookeeper.znode.parent' = '/hbase',
  'table-name' = 'default:test',
  'lookup.cache' = 'PARTIAL',
  'lookup.partial-cache.max-rows' = '1000',
  'lookup.partial-cache.expire-after-write' = '1h'
);

CREATE VIEW user_click AS 
SELECT user_id, proctime() AS proc_time
FROM (
  VALUES('1'), ('2'), ('3'), ('1'), ('2')
) AS t (user_id);

SELECT 
    user_id, 
    info.name, 
    info.age
FROM user_click INNER JOIN dim_user
FOR SYSTEM_TIME AS OF user_click.proc_time
ON dim_user.rowkey = user_click.user_id;{code}
 
Expect Result
||rowkey||name||age||
|1|ben|18|
|2|ken|19|
|3|mark|20|
|1|ben|18|
|2|ken|19|

 

Actual Result
||rowkey||name||age||
|1|ben|18|
|2|ken|19|
|3|mark|20|
|1|mark|20|
|2|mark|20|

 
Wrong result when we lookup user_id 1 and 2 the second time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29370) Protobuf in flink-sql-protobuf is not shaded

2024-02-29 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822169#comment-17822169
 ] 

tanjialiang commented on FLINK-29370:
-

[~jark] [~libenchao] [~maosuhan] I found that flink-sql-orc has the protobuf 
dependency without shading, and it conflicts with the flink-sql-protobuf, my 
flink version is 1.16.1. For now, the temporary solution is to shade the 
protobuf dependency in both the flink-sql-protobuf and user-proto classes by 
myself.

> Protobuf in flink-sql-protobuf is not shaded
> 
>
> Key: FLINK-29370
> URL: https://issues.apache.org/jira/browse/FLINK-29370
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.16.0
>Reporter: Jark Wu
>Priority: Major
>
> The protobuf classes in flink-sql-protobuf is not shaded which may lead to 
> class conflicts. Usually, sql jars should shade common used dependencies, 
> e.g. flink-sql-avro: 
> https://github.com/apache/flink/blob/master/flink-formats/flink-sql-avro/pom.xml#L88
>  
> {code}
> ➜  Downloads jar tvf flink-sql-protobuf-1.16.0.jar | grep com.google
>  0 Tue Sep 13 20:23:44 CST 2022 com/google/
>  0 Tue Sep 13 20:23:44 CST 2022 com/google/protobuf/
>568 Tue Sep 13 20:23:44 CST 2022 
> com/google/protobuf/ProtobufInternalUtils.class
>  19218 Tue Sep 13 20:23:44 CST 2022 
> com/google/protobuf/AbstractMessage$Builder.class
>259 Tue Sep 13 20:23:44 CST 2022 
> com/google/protobuf/AbstractMessage$BuilderParent.class
>  10167 Tue Sep 13 20:23:44 CST 2022 com/google/protobuf/AbstractMessage.class
>   1486 Tue Sep 13 20:23:44 CST 2022 
> com/google/protobuf/AbstractMessageLite$Builder$LimitedInputStream.class
>  12399 Tue Sep 13 20:23:44 CST 2022 
> com/google/protobuf/AbstractMessageLite$Builder.class
>279 Tue Sep 13 20:23:44 CST 2022 
> com/google/protobuf/AbstractMessageLite$InternalOneOfEnu
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34089) Missing check of subscribed and assigned topics at job startup

2024-01-15 Thread tanjialiang (Jira)
tanjialiang created FLINK-34089:
---

 Summary: Missing check of subscribed and assigned topics at job 
startup
 Key: FLINK-34089
 URL: https://issues.apache.org/jira/browse/FLINK-34089
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: kafka-3.0.2
Reporter: tanjialiang


When we unsubscribe a topic and still restore from the old state, the job will 
still read data from the unsubscribed topic. I think we should check if that 
the subscribed topic partitions match the assigned partitions, and throw an 
error so the user can handle it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-24533) Flink SQL Upsert To Hbase Appear data loss

2023-12-07 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17794492#comment-17794492
 ] 

tanjialiang commented on FLINK-24533:
-

Hi [~licp], sorry for late to notice this ticket, it has been fixed in 
FLINK-33304.

> Flink SQL Upsert  To Hbase  Appear  data loss
> -
>
> Key: FLINK-24533
> URL: https://issues.apache.org/jira/browse/FLINK-24533
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / HBase
>Affects Versions: 1.12.0
> Environment: Flink 1.12.0 on Yarn
> HBase1.4
>Reporter: licp
>Priority: Major
> Attachments: Check_Result.png, Specify_some_key.png
>
>
> Data flow direction is described below:
>       Source:Mysql 
>       Sink:HBase
>       Parallelism:1
> ---Code Example One  Using Left Join ---
> -- {color:#FF}Mysql Source ,Total Records:4829{color}
> create table user(
>        user_id string,
>        user_name string,
>        primary key(user_id) not enforced
> )with(
>  'connector' = 'mysql-cdc',
>  'hostname' = 'localhost',
>  'port' = '3308',
>  'username' = 'user_name',
>  'password' = '**',
>  'database-name' = 'database_name',
>  'table-name' = 'table_name',
>  'debezium.event.processing.failure.handling.mode' = 'warn',
>  'debezium.snapshot.locking.mode' = 'none'
> );
> create table user_profile(
>          user_id string,
>          age int,
>          primary key(user_id) not enforced
> )with(
>  'connector' = 'mysql-cdc',
>  'hostname' = 'localhost',
>  'port' = '3308',
>  'username' = 'user_name',
>  'password' = '**',
>  'database-name' = 'database_name',
>  'table-name' = 'table_name',
>  'debezium.event.processing.failure.handling.mode' = 'warn',
>  'debezium.snapshot.locking.mode' = 'none'
> );
> {color:#FF}-- HBase sink ;Total Record:4826{color}
> create table real_dwd_user_info_to_hbase(
>      rowkey string,
>     f ROW(user_name string,age int)
> )with(
>  'connector' = 'hbase-1.4',
>  'table-name' = 'table_name',
>  'zookeeper.quorum' = 'zk',
>  'zookeeper.znode.parent' = '/hbase'
> );
> insert into real_dwd_user_info_to_hbase
> select 
>      u.user_id, 
>      row(u.user_name,up.age) as 
> from user u
> left join user_profile up
> on u.user_id = up.user_id
> where u.user_id<100
> ;
> --Code Example Two  Using Left Join  And  Specify some key-
> insert into real_dwd_user_info_to_hbase
> select 
>      u.user_id, 
>      row(u.user_name,up.age) as 
> from user u
> left join user_profile up
> on u.user_id = up.user_id
> where u.user_id=0
> ---
> I printed the same code logic results, I can find that the specified key has 
> three results of " + i -d + i", but in HBase is still not find the key



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29770) hbase connector supports out-of-order data

2023-12-07 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17794489#comment-17794489
 ] 

tanjialiang commented on FLINK-29770:
-

[~Bo Cui] Thanks for reporting this issue, but i think it can be closed because 
it had been support in 
[FLINK-33208|https://issues.apache.org/jira/browse/FLINK-33208].

> hbase connector supports out-of-order data
> --
>
> Key: FLINK-29770
> URL: https://issues.apache.org/jira/browse/FLINK-29770
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / HBase
>Reporter: Bo Cui
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
>
> The data may be out of order and has no timestamp. As a result, the data 
> written to the HBase is incorrect



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-19648) Support the nested projection push down for the hbase connector

2023-10-31 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781290#comment-17781290
 ] 

tanjialiang commented on FLINK-19648:
-

I tried but i found lookup doesn't support nested projection yet, maybe we 
should support the lookup nested projection frist, and then we move on this 
issue.

> Support the nested projection push down for the hbase connector
> ---
>
> Key: FLINK-19648
> URL: https://issues.apache.org/jira/browse/FLINK-19648
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / HBase, Table SQL / Ecosystem
>Reporter: Shengkai Fang
>Priority: Major
>
> Support nested projection push down for hbase.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-19648) Support the nested projection push down for the hbase connector

2023-10-30 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780934#comment-17780934
 ] 

tanjialiang commented on FLINK-19648:
-

Hi [~jark] [~fsk119] I want to take this ticket, can you please assign to me?

> Support the nested projection push down for the hbase connector
> ---
>
> Key: FLINK-19648
> URL: https://issues.apache.org/jira/browse/FLINK-19648
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / HBase, Table SQL / Ecosystem
>Reporter: Shengkai Fang
>Priority: Major
>
> Support nested projection push down for hbase.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33304) Atomicity of RowMutations would broken when Delete and Put on same columnFamily/column/row

2023-10-24 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779005#comment-17779005
 ] 

tanjialiang commented on FLINK-33304:
-

Hi [~MartijnVisser], can you help me request a reviewer?

> Atomicity of RowMutations would broken when Delete and Put on same 
> columnFamily/column/row
> --
>
> Key: FLINK-33304
> URL: https://issues.apache.org/jira/browse/FLINK-33304
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Affects Versions: hbase-3.0.1
>Reporter: tanjialiang
>Priority: Blocker
>  Labels: pull-request-available
>
> Current we put the {{Mutation}} into {{BufferedMutator}} directly, when 
> {{Mutation}} have a {{Delete}} followed by {{Put}} to same column family or 
> columns or rows, only the {{Delete}} is happening while the {{Put}} is 
> ignored so atomicity of {{Mutation}} is broken for such cases.
> See https://issues.apache.org/jira/browse/HBASE-8626.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33173) Support currentFetchEventTimeLag metrics for KafkaSource

2023-10-19 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1535#comment-1535
 ] 

tanjialiang commented on FLINK-33173:
-

[~martijnvisser] It's my pleasure, i will try it.

> Support currentFetchEventTimeLag metrics for KafkaSource
> 
>
> Key: FLINK-33173
> URL: https://issues.apache.org/jira/browse/FLINK-33173
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.0.0
>Reporter: tanjialiang
>Priority: Major
>  Labels: pull-request-available
>
> In the connector specifications of 
> [FLIP-33|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33], 
> {{currentFetchEventTimeLag}} metrics is the time in milliseconds from the 
> record event timestamp to the timestamp Flink fetched the record. It can 
> reflect the consupition latency before deserialization.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33304) Atomicity of RowMutations would broken when Delete and Put on same columnFamily/column/row

2023-10-18 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-33304:

Description: 
Current we put the {{Mutation}} into {{BufferedMutator}} directly, when 
{{Mutation}} have a {{Delete}} followed by {{Put}} to same column family or 
columns or rows, only the {{Delete}} is happening while the {{Put}} is ignored 
so atomicity of {{Mutation}} is broken for such cases.

See https://issues.apache.org/jira/browse/HBASE-8626.

  was:
Current we put the mutations into BufferedMutator directly, when RowMutations 
have a Delete followed by Put to same column family or columns or rows, only 
the Delete is happening while the Put is ignored so atomicity of RowMutations 
is broken for such cases.


 

See https://issues.apache.org/jira/browse/HBASE-8626.


> Atomicity of RowMutations would broken when Delete and Put on same 
> columnFamily/column/row
> --
>
> Key: FLINK-33304
> URL: https://issues.apache.org/jira/browse/FLINK-33304
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Affects Versions: hbase-3.0.1
>Reporter: tanjialiang
>Priority: Blocker
>  Labels: pull-request-available
>
> Current we put the {{Mutation}} into {{BufferedMutator}} directly, when 
> {{Mutation}} have a {{Delete}} followed by {{Put}} to same column family or 
> columns or rows, only the {{Delete}} is happening while the {{Put}} is 
> ignored so atomicity of {{Mutation}} is broken for such cases.
> See https://issues.apache.org/jira/browse/HBASE-8626.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33304) Atomicity of RowMutations would broken when Delete and Put on same columnFamily/column/row

2023-10-18 Thread tanjialiang (Jira)
tanjialiang created FLINK-33304:
---

 Summary: Atomicity of RowMutations would broken when Delete and 
Put on same columnFamily/column/row
 Key: FLINK-33304
 URL: https://issues.apache.org/jira/browse/FLINK-33304
 Project: Flink
  Issue Type: Bug
  Components: Connectors / HBase
Affects Versions: hbase-3.0.1
Reporter: tanjialiang


Current we put the mutations into BufferedMutator directly, when RowMutations 
have a Delete followed by Put to same column family or columns or rows, only 
the Delete is happening while the Put is ignored so atomicity of RowMutations 
is broken for such cases.


 

See https://issues.apache.org/jira/browse/HBASE-8626.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33173) Support currentFetchEventTimeLag metrics for KafkaSource

2023-10-18 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17776501#comment-17776501
 ] 

tanjialiang commented on FLINK-33173:
-

We need to make SourceReaderBase provide an interface for implementations to 
deliver FetchTime all the way to the place extracting EventTime in 
SourceOutput. Then we can continue working on this.

> Support currentFetchEventTimeLag metrics for KafkaSource
> 
>
> Key: FLINK-33173
> URL: https://issues.apache.org/jira/browse/FLINK-33173
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.0.0
>Reporter: tanjialiang
>Priority: Major
>  Labels: pull-request-available
>
> In the connector specifications of 
> [FLIP-33|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33], 
> {{currentFetchEventTimeLag}} metrics is the time in milliseconds from the 
> record event timestamp to the timestamp Flink fetched the record. It can 
> reflect the consupition latency before deserialization.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs

2023-10-17 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17776082#comment-17776082
 ] 

tanjialiang commented on FLINK-29692:
-

Hi everyone, I notice that there is no solution yet. I want to share my 
thoughts about this feature. Maybe it can help.
 
I think support early-fire may not be the best solution to the current window 
function. Because every window triggers are expensive, and also the early-fire 
is not the realtime trigger.

For example
{code:sql}
SET table.exec.emit.early-fire.enabled = true;
SET table.exec.emit.early-fire.delay = 1min;

SELECT user_id,   
   COUNT(*) AS total,   
   HOP_START(rowtime, INTERVAL '24' HOUR, INTERVAL '48' HOUR) AS 
window_start,
   HOP_END(rowtime, rowtime, INTERVAL '24' HOUR, INTERVAL '48' HOUR) AS 
window_end,
FROM user_click
GROUP BY user_id, HOP(rowtime, INTERVAL '24' HOUR, INTERVAL '48' HOUR); {code}
1. whether HOP/TUMBLE/CUMULATE window or enable early-fire, there are having a 
time delay, which are not realtime enough.

2. when the cardinal of user_id is large, everytime to trigger window is very 
expensive, which would make job instability, easy to make checkpoint timeout.

3. everytime early-fire would trigger all user_id's windows, but maybe only a 
small part of the data actually changed in this early-fire trigger interval, 
which maybe cause write pressure to the sink.

 
In my company, I've added a window TVF function for this case, named 
HOPv2/TUMBLEv2 (maybe the name is not fit for the community).
{code:sql}
select user_id,   
   COUNT(*) AS total, 
   window_start,
   window_time, -- the record rowtime
   window_end
FROM TABLE(
HOPV2(
DATA => TABLE user_click,
TIMECOL => DESCRIPTOR(rowtime),
SLIDE => INTERVAL '24' HOUR
SIZE => INTERVAL '48' HOUR,
ALLOWED_LATENESS => true))
GROUP BY user_id, window_start, window_time, window_end; {code}
1. similar to OVER window,we accumulate and output the result when record 
comming (actually is on timer trigger), which is in realtime trigger and also 
there is not a lot of write pressure for sink.
2. the window_time is the record rowtime, which is represents the current 
progress.
3. similar to HOP window, we fire and purge when window_end come.
4. support allowedLateness option, when process the late event, if its window 
have not been purge, allow acuumulate without emit.

 
I would like to contribute it but maybe need more discussion and help because i 
am still a novice for flink contribution.
 

> Support early/late fires for Windowing TVFs
> ---
>
> Key: FLINK-29692
> URL: https://issues.apache.org/jira/browse/FLINK-29692
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.15.3
>Reporter: Canope Nerda
>Priority: Major
>
> I have cases where I need to 1) output data as soon as possible and 2) handle 
> late arriving data to achieve eventual correctness. In the logic, I need to 
> do window deduplication which is based on Windowing TVFs and according to 
> source code, early/late fires are not supported yet in Windowing TVFs.
> Actually 1) contradicts with 2). Without early/late fires, we had to 
> compromise, either live with fresh incorrect data or tolerate excess latency 
> for correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33208) Support the writable metadata timestamp for hbase connector

2023-10-08 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-33208:

Summary: Support the writable metadata timestamp for hbase connector  (was: 
Support the timestamp writable metadata for hbase connector)

> Support the writable metadata timestamp for hbase connector
> ---
>
> Key: FLINK-33208
> URL: https://issues.apache.org/jira/browse/FLINK-33208
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / HBase
>Affects Versions: hbase-3.0.1
>Reporter: tanjialiang
>Priority: Major
>
> Currently, the hbase sink does not support write data with `timestamp`, which 
> may cause the data to be written out of order. I suggest to support the 
> timestamp writable metadata for hbase connector so that we can set the 
> `timestamp` when we writing.
> {code:java}
> CREATE TABLE hTable (
>  rowkey INT,
>  family1 ROW,
>  version TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
>  PRIMARY KEY (rowkey) NOT ENFORCED
> ) WITH (
>  'connector' = 'hbase-2.2',
>  'table-name' = 'mytable',
>  'zookeeper.quorum' = 'localhost:2181'
> );{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33208) Support the timestamp writable metadata for hbase connector

2023-10-08 Thread tanjialiang (Jira)
tanjialiang created FLINK-33208:
---

 Summary: Support the timestamp writable metadata for hbase 
connector
 Key: FLINK-33208
 URL: https://issues.apache.org/jira/browse/FLINK-33208
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / HBase
Affects Versions: hbase-3.0.1
Reporter: tanjialiang


Currently, the hbase sink does not support write data with `timestamp`, which 
may cause the data to be written out of order. I suggest to support the 
timestamp writable metadata for hbase connector so that we can set the 
`timestamp` when we writing.
{code:java}
CREATE TABLE hTable (
 rowkey INT,
 family1 ROW,
 version TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
 PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
 'connector' = 'hbase-2.2',
 'table-name' = 'mytable',
 'zookeeper.quorum' = 'localhost:2181'
);{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33207) Scan hbase table will throw error when table is empty

2023-10-08 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-33207:

Priority: Blocker  (was: Critical)

> Scan hbase table will throw error when table is empty
> -
>
> Key: FLINK-33207
> URL: https://issues.apache.org/jira/browse/FLINK-33207
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Affects Versions: hbase-3.0.1
>Reporter: tanjialiang
>Priority: Blocker
>  Labels: pull-request-available
>
> When i scan the empty hbase table, it will throw an error when 
> createInputSplits, we should return empty split instead.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33207) Scan hbase table will throw error when table is empty

2023-10-08 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-33207:

Description: When i scan the empty hbase table, it will throw an error when 
createInputSplits, we should return empty split instead.  (was: When i scan the 
empty hbase table, it will throw an error when createInputSplits, we shuold 
return empty split instead.)

> Scan hbase table will throw error when table is empty
> -
>
> Key: FLINK-33207
> URL: https://issues.apache.org/jira/browse/FLINK-33207
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Affects Versions: hbase-3.0.1
>Reporter: tanjialiang
>Priority: Critical
>  Labels: pull-request-available
>
> When i scan the empty hbase table, it will throw an error when 
> createInputSplits, we should return empty split instead.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33207) Scan hbase table will throw error when table is empty

2023-10-08 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-33207:

Description: When i scan the empty hbase table, it will throw an error when 
createInputSplits, we shuold return empty split to instead.  (was: When i scan 
the hbase table when it is empty, it will throw an error when 
createInputSplits, we shuold return empty split to instead.)

> Scan hbase table will throw error when table is empty
> -
>
> Key: FLINK-33207
> URL: https://issues.apache.org/jira/browse/FLINK-33207
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Affects Versions: hbase-3.0.1
>Reporter: tanjialiang
>Priority: Critical
>  Labels: pull-request-available
>
> When i scan the empty hbase table, it will throw an error when 
> createInputSplits, we shuold return empty split to instead.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33207) Scan hbase table will throw error when table is empty

2023-10-08 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-33207:

Description: When i scan the empty hbase table, it will throw an error when 
createInputSplits, we shuold return empty split instead.  (was: When i scan the 
empty hbase table, it will throw an error when createInputSplits, we shuold 
return empty split to instead.)

> Scan hbase table will throw error when table is empty
> -
>
> Key: FLINK-33207
> URL: https://issues.apache.org/jira/browse/FLINK-33207
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Affects Versions: hbase-3.0.1
>Reporter: tanjialiang
>Priority: Critical
>  Labels: pull-request-available
>
> When i scan the empty hbase table, it will throw an error when 
> createInputSplits, we shuold return empty split instead.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33207) Scan hbase table will throw error when table is empty

2023-10-08 Thread tanjialiang (Jira)
tanjialiang created FLINK-33207:
---

 Summary: Scan hbase table will throw error when table is empty
 Key: FLINK-33207
 URL: https://issues.apache.org/jira/browse/FLINK-33207
 Project: Flink
  Issue Type: Bug
  Components: Connectors / HBase
Affects Versions: hbase-3.0.1
Reporter: tanjialiang


When i scan the hbase table when it is empty, it will throw an error when 
createInputSplits, we shuold return empty split to instead.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33206) Verify the existence of hbase table before read/write

2023-10-08 Thread tanjialiang (Jira)
tanjialiang created FLINK-33206:
---

 Summary: Verify the existence of hbase table before read/write
 Key: FLINK-33206
 URL: https://issues.apache.org/jira/browse/FLINK-33206
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / HBase
Affects Versions: hbase-3.0.1
Reporter: tanjialiang
 Attachments: image-2023-10-08-16-54-05-917.png

Currently, we do not verify the existence of hbase table before read/write, and 
the error would make the user confused.

The `HBaseSinkFunction` throws `TableNotFoundException` when do flush.
The `inputFormat` throws not obvious enough.
!image-2023-10-08-16-54-05-917.png!

So i think we should verify the existence of hbase table when call `open` 
function.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33173) Support currentFetchEventTimeLag metrics for KafkaSource

2023-09-30 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-33173:

Description: In the connector specifications of 
[FLIP-33|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33], 
{{currentFetchEventTimeLag}} metrics is the time in milliseconds from the 
record event timestamp to the timestamp Flink fetched the record. It can 
reflect the consupition latency before deserialization.  (was: In the connector 
specifications of 
[FLIP-33|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33[],], 
currentFetchEventTimeLag metrics is the time in milliseconds from the record 
event timestamp to the timestamp Flink fetched the record.)

> Support currentFetchEventTimeLag metrics for KafkaSource
> 
>
> Key: FLINK-33173
> URL: https://issues.apache.org/jira/browse/FLINK-33173
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: kafka-3.0.0
>Reporter: tanjialiang
>Priority: Major
>  Labels: pull-request-available
>
> In the connector specifications of 
> [FLIP-33|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33], 
> {{currentFetchEventTimeLag}} metrics is the time in milliseconds from the 
> record event timestamp to the timestamp Flink fetched the record. It can 
> reflect the consupition latency before deserialization.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33173) Support currentFetchEventTimeLag metrics for KafkaSource

2023-09-30 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-33173:

Component/s: Connectors / Kafka

> Support currentFetchEventTimeLag metrics for KafkaSource
> 
>
> Key: FLINK-33173
> URL: https://issues.apache.org/jira/browse/FLINK-33173
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.0.0
>Reporter: tanjialiang
>Priority: Major
>  Labels: pull-request-available
>
> In the connector specifications of 
> [FLIP-33|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33], 
> {{currentFetchEventTimeLag}} metrics is the time in milliseconds from the 
> record event timestamp to the timestamp Flink fetched the record. It can 
> reflect the consupition latency before deserialization.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33173) Support currentFetchEventTimeLag metrics for KafkaSource

2023-09-30 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-33173:

Description: In the connector specifications of 
[FLIP-33|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33[],|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33]
 currentFetchEventTimeLag metrics is the time in milliseconds from the record 
event timestamp to the timestamp Flink fetched the record.  (was: In the 
connector specifications of FLIP-33, currentFetchEventTimeLag metrics is the 
time in milliseconds from the record event timestamp to the timestamp Flink 
fetched the record.)

> Support currentFetchEventTimeLag metrics for KafkaSource
> 
>
> Key: FLINK-33173
> URL: https://issues.apache.org/jira/browse/FLINK-33173
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: kafka-3.0.0
>Reporter: tanjialiang
>Priority: Major
>
> In the connector specifications of 
> [FLIP-33|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33[],|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33]
>  currentFetchEventTimeLag metrics is the time in milliseconds from the record 
> event timestamp to the timestamp Flink fetched the record.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33173) Support currentFetchEventTimeLag metrics for KafkaSource

2023-09-30 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-33173:

Description: In the connector specifications of 
[FLIP-33|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33[],], 
currentFetchEventTimeLag metrics is the time in milliseconds from the record 
event timestamp to the timestamp Flink fetched the record.  (was: In the 
connector specifications of 
[FLIP-33|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33[],|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33]
 currentFetchEventTimeLag metrics is the time in milliseconds from the record 
event timestamp to the timestamp Flink fetched the record.)

> Support currentFetchEventTimeLag metrics for KafkaSource
> 
>
> Key: FLINK-33173
> URL: https://issues.apache.org/jira/browse/FLINK-33173
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: kafka-3.0.0
>Reporter: tanjialiang
>Priority: Major
>
> In the connector specifications of 
> [FLIP-33|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33[],], 
> currentFetchEventTimeLag metrics is the time in milliseconds from the record 
> event timestamp to the timestamp Flink fetched the record.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33173) Support currentFetchEventTimeLag metrics for KafkaSource

2023-09-30 Thread tanjialiang (Jira)
tanjialiang created FLINK-33173:
---

 Summary: Support currentFetchEventTimeLag metrics for KafkaSource
 Key: FLINK-33173
 URL: https://issues.apache.org/jira/browse/FLINK-33173
 Project: Flink
  Issue Type: Improvement
Affects Versions: kafka-3.0.0
Reporter: tanjialiang


In the connector specifications of FLIP-33, currentFetchEventTimeLag metrics is 
the time in milliseconds from the record event timestamp to the timestamp Flink 
fetched the record.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33164) HBase connector support ignore null value for partial update

2023-09-27 Thread tanjialiang (Jira)
tanjialiang created FLINK-33164:
---

 Summary: HBase connector support ignore null value for partial 
update
 Key: FLINK-33164
 URL: https://issues.apache.org/jira/browse/FLINK-33164
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / HBase
Affects Versions: hbase-3.0.0
Reporter: tanjialiang


Sometimes, user want to write data and ignore null value to achieve partial 
update. So i suggest adding an options: sink.ignore-null-value.

 
{code:java}
CREATE TABLE hTable (
 rowkey STRING,
 cf1 ROW,
 PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
 'connector' = 'hbase-2.2',
 'table-name' = 'default:test',
 'zookeeper.quorum' = 'localhost:2181',
 'sink.ignore-null-value' = 'true' -- default is false, true is enabled
);

INSERT INTO hTable VALUES('1', ROW('10', 'hello, world'));
INSERT INTO hTable VALUES('1', ROW('30', CAST(NULL AS STRING))); -- null value 
to cf1.q2

-- when sink.ignore-null-value is false
// after first insert
{rowkey: "1", "cf1": {q1: "10", q2: "hello, world"}} 
// after second insert, cf1.q2 update to null
{rowkey: "1", "cf1": {q1: "30", q2: "null"}} 


-- when sink.ignore-null-value is true
// after first insert 
{rowkey: "1", "cf1": {q1: "10", q2: "hello, world"}}
// after second insert, cf1.q2 is still the old value 
{rowkey: "1", "cf1": {q1: "30", q2: "hello, world"}} {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28303) Kafka SQL Connector loses data when restoring from a savepoint with a topic with empty partitions

2023-09-26 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769056#comment-17769056
 ] 

tanjialiang commented on FLINK-28303:
-

Maybe we should implement the LatestOffsetsInitializer to look up the end 
offset and pass it to the reader, instead of pass 
KafkaPartitionSplit#LATEST_OFFSET(-1).

> Kafka SQL Connector loses data when restoring from a savepoint with a topic 
> with empty partitions
> -
>
> Key: FLINK-28303
> URL: https://issues.apache.org/jira/browse/FLINK-28303
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4
>Reporter: Robert Metzger
>Priority: Major
>
> Steps to reproduce:
> - Set up a Kafka topic with 10 partitions
> - produce records 0-9 into the topic
> - take a savepoint and stop the job
> - produce records 10-19 into the topic
> - restore the job from the savepoint.
> The job will be missing usually 2-4 records from 10-19.
> My assumption is that if a partition never had data (which is likely with 10 
> partitions and 10 records), the savepoint will only contain offsets for 
> partitions with data. 
> While the job was offline (and we've written record 10-19 into the topic), 
> all partitions got filled. Now, when Kafka comes online again, it will use 
> the "latest" offset for those partitions, skipping some data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28303) Kafka SQL Connector loses data when restoring from a savepoint with a topic with empty partitions

2023-09-25 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769014#comment-17769014
 ] 

tanjialiang commented on FLINK-28303:
-

[~martijnvisser] I had already check the latest kafka connector code, this 
problem still exists.

> Kafka SQL Connector loses data when restoring from a savepoint with a topic 
> with empty partitions
> -
>
> Key: FLINK-28303
> URL: https://issues.apache.org/jira/browse/FLINK-28303
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4
>Reporter: Robert Metzger
>Priority: Major
>
> Steps to reproduce:
> - Set up a Kafka topic with 10 partitions
> - produce records 0-9 into the topic
> - take a savepoint and stop the job
> - produce records 10-19 into the topic
> - restore the job from the savepoint.
> The job will be missing usually 2-4 records from 10-19.
> My assumption is that if a partition never had data (which is likely with 10 
> partitions and 10 records), the savepoint will only contain offsets for 
> partitions with data. 
> While the job was offline (and we've written record 10-19 into the topic), 
> all partitions got filled. Now, when Kafka comes online again, it will use 
> the "latest" offset for those partitions, skipping some data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28303) Kafka SQL Connector loses data when restoring from a savepoint with a topic with empty partitions

2023-09-25 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17768637#comment-17768637
 ] 

tanjialiang commented on FLINK-28303:
-

Maybe this this is the reason? 
[FLINK-33153|https://issues.apache.org/jira/browse/FLINK-33153]

> Kafka SQL Connector loses data when restoring from a savepoint with a topic 
> with empty partitions
> -
>
> Key: FLINK-28303
> URL: https://issues.apache.org/jira/browse/FLINK-28303
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4
>Reporter: Robert Metzger
>Priority: Major
>
> Steps to reproduce:
> - Set up a Kafka topic with 10 partitions
> - produce records 0-9 into the topic
> - take a savepoint and stop the job
> - produce records 10-19 into the topic
> - restore the job from the savepoint.
> The job will be missing usually 2-4 records from 10-19.
> My assumption is that if a partition never had data (which is likely with 10 
> partitions and 10 records), the savepoint will only contain offsets for 
> partitions with data. 
> While the job was offline (and we've written record 10-19 into the topic), 
> all partitions got filled. Now, when Kafka comes online again, it will use 
> the "latest" offset for those partitions, skipping some data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33153) Kafka using latest-offset maybe missing data

2023-09-25 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17768631#comment-17768631
 ] 

tanjialiang commented on FLINK-33153:
-

It relate to https://issues.apache.org/jira/browse/FLINK-28303, so i close this 
issue.

> Kafka using latest-offset maybe missing data
> 
>
> Key: FLINK-33153
> URL: https://issues.apache.org/jira/browse/FLINK-33153
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-4.1.0
>Reporter: tanjialiang
>Priority: Minor
>
> When Kafka start with the latest-offset strategy, it does not fetch the 
> latest snapshot offset and specify it for consumption. Instead, it sets the 
> startingOffset to -1 (KafkaPartitionSplit.LATEST_OFFSET, which makes 
> currentOffset = -1, and call the KafkaConsumer's  seekToEnd API). The 
> currentOffset is only set to the consumed offset + 1 when the task consumes 
> data, and this currentOffset is stored in the state during checkpointing. If 
> there are very few messages in Kafka and a partition has not consumed any 
> data, and I stop the task with a savepoint, then write data to that 
> partition, and start the task with the savepoint, the task will resume from 
> the saved state. Due to the startingOffset in the state being -1, it will 
> cause the task to miss the data that was written before the recovery point.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33153) Kafka using latest-offset maybe missing data

2023-09-25 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-33153:

External issue URL:   (was: 
https://issues.apache.org/jira/browse/FLINK-28303)

> Kafka using latest-offset maybe missing data
> 
>
> Key: FLINK-33153
> URL: https://issues.apache.org/jira/browse/FLINK-33153
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-4.1.0
>Reporter: tanjialiang
>Priority: Minor
>
> When Kafka start with the latest-offset strategy, it does not fetch the 
> latest snapshot offset and specify it for consumption. Instead, it sets the 
> startingOffset to -1 (KafkaPartitionSplit.LATEST_OFFSET, which makes 
> currentOffset = -1, and call the KafkaConsumer's  seekToEnd API). The 
> currentOffset is only set to the consumed offset + 1 when the task consumes 
> data, and this currentOffset is stored in the state during checkpointing. If 
> there are very few messages in Kafka and a partition has not consumed any 
> data, and I stop the task with a savepoint, then write data to that 
> partition, and start the task with the savepoint, the task will resume from 
> the saved state. Due to the startingOffset in the state being -1, it will 
> cause the task to miss the data that was written before the recovery point.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33153) Kafka using latest-offset maybe missing data

2023-09-25 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-33153:

External issue URL: https://issues.apache.org/jira/browse/FLINK-28303
  Release Note:   (was: realte to 
https://issues.apache.org/jira/browse/FLINK-28303)

> Kafka using latest-offset maybe missing data
> 
>
> Key: FLINK-33153
> URL: https://issues.apache.org/jira/browse/FLINK-33153
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-4.1.0
>Reporter: tanjialiang
>Priority: Minor
>
> When Kafka start with the latest-offset strategy, it does not fetch the 
> latest snapshot offset and specify it for consumption. Instead, it sets the 
> startingOffset to -1 (KafkaPartitionSplit.LATEST_OFFSET, which makes 
> currentOffset = -1, and call the KafkaConsumer's  seekToEnd API). The 
> currentOffset is only set to the consumed offset + 1 when the task consumes 
> data, and this currentOffset is stored in the state during checkpointing. If 
> there are very few messages in Kafka and a partition has not consumed any 
> data, and I stop the task with a savepoint, then write data to that 
> partition, and start the task with the savepoint, the task will resume from 
> the saved state. Due to the startingOffset in the state being -1, it will 
> cause the task to miss the data that was written before the recovery point.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33153) Kafka using latest-offset maybe missing data

2023-09-25 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang closed FLINK-33153.
---
Release Note: realte to https://issues.apache.org/jira/browse/FLINK-28303
  Resolution: Duplicate

> Kafka using latest-offset maybe missing data
> 
>
> Key: FLINK-33153
> URL: https://issues.apache.org/jira/browse/FLINK-33153
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-4.1.0
>Reporter: tanjialiang
>Priority: Minor
>
> When Kafka start with the latest-offset strategy, it does not fetch the 
> latest snapshot offset and specify it for consumption. Instead, it sets the 
> startingOffset to -1 (KafkaPartitionSplit.LATEST_OFFSET, which makes 
> currentOffset = -1, and call the KafkaConsumer's  seekToEnd API). The 
> currentOffset is only set to the consumed offset + 1 when the task consumes 
> data, and this currentOffset is stored in the state during checkpointing. If 
> there are very few messages in Kafka and a partition has not consumed any 
> data, and I stop the task with a savepoint, then write data to that 
> partition, and start the task with the savepoint, the task will resume from 
> the saved state. Due to the startingOffset in the state being -1, it will 
> cause the task to miss the data that was written before the recovery point.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33153) Kafka using latest-offset maybe missing data

2023-09-25 Thread tanjialiang (Jira)
tanjialiang created FLINK-33153:
---

 Summary: Kafka using latest-offset maybe missing data
 Key: FLINK-33153
 URL: https://issues.apache.org/jira/browse/FLINK-33153
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: kafka-4.1.0
Reporter: tanjialiang


When Kafka start with the latest-offset strategy, it does not fetch the latest 
snapshot offset and specify it for consumption. Instead, it sets the 
startingOffset to -1 (KafkaPartitionSplit.LATEST_OFFSET, which makes 
currentOffset = -1, and call the KafkaConsumer's  seekToEnd API). The 
currentOffset is only set to the consumed offset + 1 when the task consumes 
data, and this currentOffset is stored in the state during checkpointing. If 
there are very few messages in Kafka and a partition has not consumed any data, 
and I stop the task with a savepoint, then write data to that partition, and 
start the task with the savepoint, the task will resume from the saved state. 
Due to the startingOffset in the state being -1, it will cause the task to miss 
the data that was written before the recovery point.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32252) SELECT COUNT(*) will return nothing when source no data return

2023-06-05 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang closed FLINK-32252.
---
Resolution: Duplicate

> SELECT COUNT(*) will return nothing when source no data return
> --
>
> Key: FLINK-32252
> URL: https://issues.apache.org/jira/browse/FLINK-32252
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: jdbc-3.1.0
>Reporter: tanjialiang
>Priority: Major
>
>  
> mysql source
> {code:java}
> CREATE TABLE student(
> id int primary key auto_increment,
> name varchar(32),
> age int
> );
> INSERT INTO student(name, age) VALUES 
> ('tanjl',18),('jark',20),('mike',16),('rose',21);{code}
>  
> Flink SQL
> {code:java}
> CREATE TABLE student (
> `id` INT PRIMARY KEY,
> `name` STRING,
> `age` INT
> ) WITH (
>   'connector' = 'jdbc',
>   'url' = 'jdbc:mysql://localhost/test?serverTimezone=UTC',
>   'username' = 'root',
>   'password' = 'root',
>   'table-name' = 'student'
> ); 
> SELECT count(*) FROM student WHERE age < 15;{code}
> flink will return nothing because jdbc connector push the filter down(after 
> flink-connector-jdbc-3.1.0), which make source no data return.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32252) SELECT COUNT(*) will return nothing when source no data return

2023-06-05 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17729567#comment-17729567
 ] 

tanjialiang commented on FLINK-32252:
-

[~jark] Got it, thanks.

> SELECT COUNT(*) will return nothing when source no data return
> --
>
> Key: FLINK-32252
> URL: https://issues.apache.org/jira/browse/FLINK-32252
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: jdbc-3.1.0
>Reporter: tanjialiang
>Priority: Major
>
>  
> mysql source
> {code:java}
> CREATE TABLE student(
> id int primary key auto_increment,
> name varchar(32),
> age int
> );
> INSERT INTO student(name, age) VALUES 
> ('tanjl',18),('jark',20),('mike',16),('rose',21);{code}
>  
> Flink SQL
> {code:java}
> CREATE TABLE student (
> `id` INT PRIMARY KEY,
> `name` STRING,
> `age` INT
> ) WITH (
>   'connector' = 'jdbc',
>   'url' = 'jdbc:mysql://localhost/test?serverTimezone=UTC',
>   'username' = 'root',
>   'password' = 'root',
>   'table-name' = 'student'
> ); 
> SELECT count(*) FROM student WHERE age < 15;{code}
> flink will return nothing because jdbc connector push the filter down(after 
> flink-connector-jdbc-3.1.0), which make source no data return.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32252) SELECT COUNT(*) will return nothing when source no data return

2023-06-05 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17729556#comment-17729556
 ] 

tanjialiang edited comment on FLINK-32252 at 6/6/23 2:39 AM:
-

[~jark] Whether in BATCH mode or STREAMING mode the final reuslts should be 
consitent? So i think maybe something wrong in STREAMING mode.


was (Author: JIRAUSER279823):
[~jark] Whether in BATCH mode or STREAMING mode the reuslts should be 
consitent? So i think maybe something wrong in STREAMING mode.

> SELECT COUNT(*) will return nothing when source no data return
> --
>
> Key: FLINK-32252
> URL: https://issues.apache.org/jira/browse/FLINK-32252
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: jdbc-3.1.0
>Reporter: tanjialiang
>Priority: Major
>
>  
> mysql source
> {code:java}
> CREATE TABLE student(
> id int primary key auto_increment,
> name varchar(32),
> age int
> );
> INSERT INTO student(name, age) VALUES 
> ('tanjl',18),('jark',20),('mike',16),('rose',21);{code}
>  
> Flink SQL
> {code:java}
> CREATE TABLE student (
> `id` INT PRIMARY KEY,
> `name` STRING,
> `age` INT
> ) WITH (
>   'connector' = 'jdbc',
>   'url' = 'jdbc:mysql://localhost/test?serverTimezone=UTC',
>   'username' = 'root',
>   'password' = 'root',
>   'table-name' = 'student'
> ); 
> SELECT count(*) FROM student WHERE age < 15;{code}
> flink will return nothing because jdbc connector push the filter down(after 
> flink-connector-jdbc-3.1.0), which make source no data return.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32252) SELECT COUNT(*) will return nothing when source no data return

2023-06-05 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17729556#comment-17729556
 ] 

tanjialiang commented on FLINK-32252:
-

[~jark] Whether in BATCH mode or STREAMING mode the reuslts should be 
consitent? So i think maybe something wrong in STREAMING mode.

> SELECT COUNT(*) will return nothing when source no data return
> --
>
> Key: FLINK-32252
> URL: https://issues.apache.org/jira/browse/FLINK-32252
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: jdbc-3.1.0
>Reporter: tanjialiang
>Priority: Major
>
>  
> mysql source
> {code:java}
> CREATE TABLE student(
> id int primary key auto_increment,
> name varchar(32),
> age int
> );
> INSERT INTO student(name, age) VALUES 
> ('tanjl',18),('jark',20),('mike',16),('rose',21);{code}
>  
> Flink SQL
> {code:java}
> CREATE TABLE student (
> `id` INT PRIMARY KEY,
> `name` STRING,
> `age` INT
> ) WITH (
>   'connector' = 'jdbc',
>   'url' = 'jdbc:mysql://localhost/test?serverTimezone=UTC',
>   'username' = 'root',
>   'password' = 'root',
>   'table-name' = 'student'
> ); 
> SELECT count(*) FROM student WHERE age < 15;{code}
> flink will return nothing because jdbc connector push the filter down(after 
> flink-connector-jdbc-3.1.0), which make source no data return.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32252) SELECT COUNT(*) will return nothing when source no data return

2023-06-05 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17729552#comment-17729552
 ] 

tanjialiang commented on FLINK-32252:
-

[~jark] I tried, STREAMING mode return nothing and BATCH mode return 0.

> SELECT COUNT(*) will return nothing when source no data return
> --
>
> Key: FLINK-32252
> URL: https://issues.apache.org/jira/browse/FLINK-32252
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: jdbc-3.1.0
>Reporter: tanjialiang
>Priority: Major
>
>  
> mysql source
> {code:java}
> CREATE TABLE student(
> id int primary key auto_increment,
> name varchar(32),
> age int
> );
> INSERT INTO student(name, age) VALUES 
> ('tanjl',18),('jark',20),('mike',16),('rose',21);{code}
>  
> Flink SQL
> {code:java}
> CREATE TABLE student (
> `id` INT PRIMARY KEY,
> `name` STRING,
> `age` INT
> ) WITH (
>   'connector' = 'jdbc',
>   'url' = 'jdbc:mysql://localhost/test?serverTimezone=UTC',
>   'username' = 'root',
>   'password' = 'root',
>   'table-name' = 'student'
> ); 
> SELECT count(*) FROM student WHERE age < 15;{code}
> flink will return nothing because jdbc connector push the filter down(after 
> flink-connector-jdbc-3.1.0), which make source no data return.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32252) SELECT COUNT(*) will return nothing when source no data return

2023-06-05 Thread tanjialiang (Jira)
tanjialiang created FLINK-32252:
---

 Summary: SELECT COUNT(*) will return nothing when source no data 
return
 Key: FLINK-32252
 URL: https://issues.apache.org/jira/browse/FLINK-32252
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.16.1
Reporter: tanjialiang


 

mysql source
{code:java}
CREATE TABLE student(
id int primary key auto_increment,
name varchar(32),
age int
);

INSERT INTO student(name, age) VALUES 
('tanjl',18),('jark',20),('mike',16),('rose',21);{code}
 

Flink SQL
{code:java}
CREATE TABLE student (
`id` INT PRIMARY KEY,
`name` STRING,
`age` INT
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://localhost/test?serverTimezone=UTC',
  'username' = 'root',
  'password' = 'root',
  'table-name' = 'student'
); 

SELECT count(*) FROM student WHERE age < 15;{code}
flink will return nothing because jdbc connector push the filter down(after 
flink-connector-jdbc-3.1.0), which make source no data return.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31308) JobManager's metaspace out-of-memory when submit a flinksessionjobs

2023-04-17 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-31308:

  Component/s: Table SQL / Planner
   Table SQL / Runtime
   (was: Kubernetes Operator)
   (was: Table SQL / API)
Affects Version/s: (was: kubernetes-operator-1.4.0)

> JobManager's metaspace out-of-memory when submit a flinksessionjobs
> ---
>
> Key: FLINK-31308
> URL: https://issues.apache.org/jira/browse/FLINK-31308
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner, Table SQL / Runtime
>Affects Versions: 1.16.1
>Reporter: tanjialiang
>Priority: Major
> Attachments: image-2023-03-03-10-34-46-681.png
>
>
> Hello teams, when i try to recurring submit a flinksessionjobs by flink 
> operator, it will be make JobManager's metaspace OOM. My Job having some 
> flink-sql logic, it is the userclassloader didn't closed? Or may be beacuase 
> of flink-sql's codegen? By the way, it not appear when i using 
> flink-sql-gateway to submit.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31686) Filesystem connector should replace the shallow copy with deep copy

2023-04-01 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17707522#comment-17707522
 ] 

tanjialiang commented on FLINK-31686:
-

Maybe i can take this ticket, i would like to try it.

> Filesystem connector should replace the shallow copy with deep copy
> ---
>
> Key: FLINK-31686
> URL: https://issues.apache.org/jira/browse/FLINK-31686
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.16.1
>Reporter: tanjialiang
>Priority: Major
> Attachments: image-2023-04-01-16-18-48-762.png, 
> image-2023-04-01-16-18-56-075.png
>
>
> Hi team, when i using the following sql
> {code:java}
> CREATE TABLE student (
>     `id` STRING,
>     `name` STRING,
>     `age` INT
> ) WITH (
>   'connector' = 'filesystem',
>   'path' = '...',
>   'format' = 'orc'
> );
> select
>     t1.total,
>     t2.total
> from
>     (
>         select
>             count(*) as total,
>             1 as join_key
>         from student
>         where name = 'tanjialiang'
>     ) t1
>     LEFT JOIN (
>         select
>             count(*) as total,
>             1 as join_key
>         from student;
>     ) t2 
>     ON t1.join_key = t2.join_key; {code}
>  
> it will throw an error
> !image-2023-04-01-16-18-48-762.png!
>  
> I tried to solve it, and i found filesystem connector's copy function using a 
> shallow copy instread of deep copy.   It lead to all of query from a same 
> table source reuse the same bulkWriterFormat, and my query have filter 
> condition, which will push down into the bulkWriterFormat, so the filter 
> condition maybe reuse.   
> I found the DynamicTableSource and DynamicTableSink's copy function comment 
> to ask we should impletement it with deep copy, but i found every connector 
> are using shallow copy to impletement it.   So i think not only the 
> filesystem connector have this problem.
> !image-2023-04-01-16-18-56-075.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31686) Filesystem connector should replace the shallow copy with deep copy

2023-04-01 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-31686:

Summary: Filesystem connector should replace the shallow copy with deep 
copy  (was: Filesystem Connector should replace the shallow copy with deep copy)

> Filesystem connector should replace the shallow copy with deep copy
> ---
>
> Key: FLINK-31686
> URL: https://issues.apache.org/jira/browse/FLINK-31686
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.16.1
>Reporter: tanjialiang
>Priority: Major
> Attachments: image-2023-04-01-16-18-48-762.png, 
> image-2023-04-01-16-18-56-075.png
>
>
> Hi team, when i using the following sql
> {code:java}
> CREATE TABLE student (
>     `id` STRING,
>     `name` STRING,
>     `age` INT
> ) WITH (
>   'connector' = 'filesystem',
>   'path' = '...',
>   'format' = 'orc'
> );
> select
>     t1.total,
>     t2.total
> from
>     (
>         select
>             count(*) as total,
>             1 as join_key
>         from student
>         where name = 'tanjialiang'
>     ) t1
>     LEFT JOIN (
>         select
>             count(*) as total,
>             1 as join_key
>         from student;
>     ) t2 
>     ON t1.join_key = t2.join_key; {code}
>  
> it will throw an error
> !image-2023-04-01-16-18-48-762.png!
>  
> I tried to solve it, and i found filesystem connector's copy function using a 
> shallow copy instread of deep copy.   It lead to all of query from a same 
> table source reuse the same bulkWriterFormat, and my query have filter 
> condition, which will push down into the bulkWriterFormat, so the filter 
> condition maybe reuse.   
> I found the DynamicTableSource and DynamicTableSink's copy function comment 
> to ask we should impletement it with deep copy, but i found every connector 
> are using shallow copy to impletement it.   So i think not only the 
> filesystem connector have this problem.
> !image-2023-04-01-16-18-56-075.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31686) Filesystem Connector should replace the shallow copy with deep copy

2023-04-01 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-31686:

Summary: Filesystem Connector should replace the shallow copy with deep 
copy  (was: Connector should replace the shallow copy with deep copy)

> Filesystem Connector should replace the shallow copy with deep copy
> ---
>
> Key: FLINK-31686
> URL: https://issues.apache.org/jira/browse/FLINK-31686
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.16.1
>Reporter: tanjialiang
>Priority: Major
> Attachments: image-2023-04-01-16-18-48-762.png, 
> image-2023-04-01-16-18-56-075.png
>
>
> Hi team, when i using the following sql
> {code:java}
> CREATE TABLE student (
>     `id` STRING,
>     `name` STRING,
>     `age` INT
> ) WITH (
>   'connector' = 'filesystem',
>   'path' = '...',
>   'format' = 'orc'
> );
> select
>     t1.total,
>     t2.total
> from
>     (
>         select
>             count(*) as total,
>             1 as join_key
>         from student
>         where name = 'tanjialiang'
>     ) t1
>     LEFT JOIN (
>         select
>             count(*) as total,
>             1 as join_key
>         from student;
>     ) t2 
>     ON t1.join_key = t2.join_key; {code}
>  
> it will throw an error
> !image-2023-04-01-16-18-48-762.png!
>  
> I tried to solve it, and i found filesystem connector's copy function using a 
> shallow copy instread of deep copy.   It lead to all of query from a same 
> table source reuse the same bulkWriterFormat, and my query have filter 
> condition, which will push down into the bulkWriterFormat, so the filter 
> condition maybe reuse.   
> I found the DynamicTableSource and DynamicTableSink's copy function comment 
> to ask we should impletement it with deep copy, but i found every connector 
> are using shallow copy to impletement it.   So i think not only the 
> filesystem connector have this problem.
> !image-2023-04-01-16-18-56-075.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31686) Connector should replace the shallow copy with deep copy

2023-04-01 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-31686:

Summary: Connector should replace the shallow copy with deep copy  (was: 
All of connector should replace the shallow copy with deep copy)

> Connector should replace the shallow copy with deep copy
> 
>
> Key: FLINK-31686
> URL: https://issues.apache.org/jira/browse/FLINK-31686
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.16.1
>Reporter: tanjialiang
>Priority: Major
> Attachments: image-2023-04-01-16-18-48-762.png, 
> image-2023-04-01-16-18-56-075.png
>
>
> Hi team, when i using the following sql
> {code:java}
> CREATE TABLE student (
>     `id` STRING,
>     `name` STRING,
>     `age` INT
> ) WITH (
>   'connector' = 'filesystem',
>   'path' = '...',
>   'format' = 'orc'
> );
> select
>     t1.total,
>     t2.total
> from
>     (
>         select
>             count(*) as total,
>             1 as join_key
>         from student
>         where name = 'tanjialiang'
>     ) t1
>     LEFT JOIN (
>         select
>             count(*) as total,
>             1 as join_key
>         from student;
>     ) t2 
>     ON t1.join_key = t2.join_key; {code}
>  
> it will throw an error
> !image-2023-04-01-16-18-48-762.png!
>  
> I tried to solve it, and i found filesystem connector's copy function using a 
> shallow copy instread of deep copy.   It lead to all of query from a same 
> table source reuse the same bulkWriterFormat, and my query have filter 
> condition, which will push down into the bulkWriterFormat, so the filter 
> condition maybe reuse.   
> I found the DynamicTableSource and DynamicTableSink's copy function comment 
> to ask we should impletement it with deep copy, but i found every connector 
> are using shallow copy to impletement it.   So i think not only the 
> filesystem connector have this problem.
> !image-2023-04-01-16-18-56-075.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31686) All of connector should replace the shallow copy with deep copy

2023-04-01 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-31686:

Summary: All of connector should replace the shallow copy with deep copy  
(was: All of connector should replace the shallow copy with a deep copy)

> All of connector should replace the shallow copy with deep copy
> ---
>
> Key: FLINK-31686
> URL: https://issues.apache.org/jira/browse/FLINK-31686
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.16.1
>Reporter: tanjialiang
>Priority: Major
> Attachments: image-2023-04-01-16-18-48-762.png, 
> image-2023-04-01-16-18-56-075.png
>
>
> Hi team, when i using the following sql
> {code:java}
> CREATE TABLE student (
>     `id` STRING,
>     `name` STRING,
>     `age` INT
> ) WITH (
>   'connector' = 'filesystem',
>   'path' = '...',
>   'format' = 'orc'
> );
> select
>     t1.total,
>     t2.total
> from
>     (
>         select
>             count(*) as total,
>             1 as join_key
>         from student
>         where name = 'tanjialiang'
>     ) t1
>     LEFT JOIN (
>         select
>             count(*) as total,
>             1 as join_key
>         from student;
>     ) t2 
>     ON t1.join_key = t2.join_key; {code}
>  
> it will throw an error
> !image-2023-04-01-16-18-48-762.png!
>  
> I tried to solve it, and i found filesystem connector's copy function using a 
> shallow copy instread of deep copy.   It lead to all of query from a same 
> table source reuse the same bulkWriterFormat, and my query have filter 
> condition, which will push down into the bulkWriterFormat, so the filter 
> condition maybe reuse.   
> I found the DynamicTableSource and DynamicTableSink's copy function comment 
> to ask we should impletement it with deep copy, but i found every connector 
> are using shallow copy to impletement it.   So i think not only the 
> filesystem connector have this problem.
> !image-2023-04-01-16-18-56-075.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31686) All of connector should replace the shallow copy with a deep copy

2023-04-01 Thread tanjialiang (Jira)
tanjialiang created FLINK-31686:
---

 Summary: All of connector should replace the shallow copy with a 
deep copy
 Key: FLINK-31686
 URL: https://issues.apache.org/jira/browse/FLINK-31686
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Affects Versions: 1.16.1
Reporter: tanjialiang
 Attachments: image-2023-04-01-16-18-48-762.png, 
image-2023-04-01-16-18-56-075.png

Hi team, when i using the following sql
{code:java}
CREATE TABLE student (
    `id` STRING,
    `name` STRING,
    `age` INT
) WITH (
  'connector' = 'filesystem',
  'path' = '...',
  'format' = 'orc'
);

select
    t1.total,
    t2.total
from
    (
        select
            count(*) as total,
            1 as join_key
        from student
        where name = 'tanjialiang'
    ) t1
    LEFT JOIN (
        select
            count(*) as total,
            1 as join_key
        from student;
    ) t2 
    ON t1.join_key = t2.join_key; {code}
 

it will throw an error

!image-2023-04-01-15-53-40-060.png!

 

I tried to solve it, and i found filesystem connector's copy function using a 
shallow copy instread of deep copy.   It lead to all of query from a same table 
source reuse the same bulkWriterFormat, and my query have filter condition, 
which will push down into the bulkWriterFormat, so the filter condition maybe 
reuse.   

I found the DynamicTableSource and DynamicTableSink's copy function comment to 
ask we should impletement it with deep copy, but i found every connector are 
using shallow copy to impletement it.   So i think not only the filesystem 
connector have this problem.

!image-2023-04-01-16-12-29-927.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31686) All of connector should replace the shallow copy with a deep copy

2023-04-01 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-31686:

Attachment: image-2023-04-01-16-18-48-762.png

> All of connector should replace the shallow copy with a deep copy
> -
>
> Key: FLINK-31686
> URL: https://issues.apache.org/jira/browse/FLINK-31686
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.16.1
>Reporter: tanjialiang
>Priority: Major
> Attachments: image-2023-04-01-16-18-48-762.png, 
> image-2023-04-01-16-18-56-075.png
>
>
> Hi team, when i using the following sql
> {code:java}
> CREATE TABLE student (
>     `id` STRING,
>     `name` STRING,
>     `age` INT
> ) WITH (
>   'connector' = 'filesystem',
>   'path' = '...',
>   'format' = 'orc'
> );
> select
>     t1.total,
>     t2.total
> from
>     (
>         select
>             count(*) as total,
>             1 as join_key
>         from student
>         where name = 'tanjialiang'
>     ) t1
>     LEFT JOIN (
>         select
>             count(*) as total,
>             1 as join_key
>         from student;
>     ) t2 
>     ON t1.join_key = t2.join_key; {code}
>  
> it will throw an error
> !image-2023-04-01-15-53-40-060.png!
>  
> I tried to solve it, and i found filesystem connector's copy function using a 
> shallow copy instread of deep copy.   It lead to all of query from a same 
> table source reuse the same bulkWriterFormat, and my query have filter 
> condition, which will push down into the bulkWriterFormat, so the filter 
> condition maybe reuse.   
> I found the DynamicTableSource and DynamicTableSink's copy function comment 
> to ask we should impletement it with deep copy, but i found every connector 
> are using shallow copy to impletement it.   So i think not only the 
> filesystem connector have this problem.
> !image-2023-04-01-16-12-29-927.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31686) All of connector should replace the shallow copy with a deep copy

2023-04-01 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-31686:

Description: 
Hi team, when i using the following sql
{code:java}
CREATE TABLE student (
    `id` STRING,
    `name` STRING,
    `age` INT
) WITH (
  'connector' = 'filesystem',
  'path' = '...',
  'format' = 'orc'
);

select
    t1.total,
    t2.total
from
    (
        select
            count(*) as total,
            1 as join_key
        from student
        where name = 'tanjialiang'
    ) t1
    LEFT JOIN (
        select
            count(*) as total,
            1 as join_key
        from student;
    ) t2 
    ON t1.join_key = t2.join_key; {code}
 

it will throw an error

!image-2023-04-01-16-18-48-762.png!

 

I tried to solve it, and i found filesystem connector's copy function using a 
shallow copy instread of deep copy.   It lead to all of query from a same table 
source reuse the same bulkWriterFormat, and my query have filter condition, 
which will push down into the bulkWriterFormat, so the filter condition maybe 
reuse.   

I found the DynamicTableSource and DynamicTableSink's copy function comment to 
ask we should impletement it with deep copy, but i found every connector are 
using shallow copy to impletement it.   So i think not only the filesystem 
connector have this problem.

!image-2023-04-01-16-18-56-075.png!

  was:
Hi team, when i using the following sql
{code:java}
CREATE TABLE student (
    `id` STRING,
    `name` STRING,
    `age` INT
) WITH (
  'connector' = 'filesystem',
  'path' = '...',
  'format' = 'orc'
);

select
    t1.total,
    t2.total
from
    (
        select
            count(*) as total,
            1 as join_key
        from student
        where name = 'tanjialiang'
    ) t1
    LEFT JOIN (
        select
            count(*) as total,
            1 as join_key
        from student;
    ) t2 
    ON t1.join_key = t2.join_key; {code}
 

it will throw an error

!image-2023-04-01-15-53-40-060.png!

 

I tried to solve it, and i found filesystem connector's copy function using a 
shallow copy instread of deep copy.   It lead to all of query from a same table 
source reuse the same bulkWriterFormat, and my query have filter condition, 
which will push down into the bulkWriterFormat, so the filter condition maybe 
reuse.   

I found the DynamicTableSource and DynamicTableSink's copy function comment to 
ask we should impletement it with deep copy, but i found every connector are 
using shallow copy to impletement it.   So i think not only the filesystem 
connector have this problem.

!image-2023-04-01-16-12-29-927.png!


> All of connector should replace the shallow copy with a deep copy
> -
>
> Key: FLINK-31686
> URL: https://issues.apache.org/jira/browse/FLINK-31686
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.16.1
>Reporter: tanjialiang
>Priority: Major
> Attachments: image-2023-04-01-16-18-48-762.png, 
> image-2023-04-01-16-18-56-075.png
>
>
> Hi team, when i using the following sql
> {code:java}
> CREATE TABLE student (
>     `id` STRING,
>     `name` STRING,
>     `age` INT
> ) WITH (
>   'connector' = 'filesystem',
>   'path' = '...',
>   'format' = 'orc'
> );
> select
>     t1.total,
>     t2.total
> from
>     (
>         select
>             count(*) as total,
>             1 as join_key
>         from student
>         where name = 'tanjialiang'
>     ) t1
>     LEFT JOIN (
>         select
>             count(*) as total,
>             1 as join_key
>         from student;
>     ) t2 
>     ON t1.join_key = t2.join_key; {code}
>  
> it will throw an error
> !image-2023-04-01-16-18-48-762.png!
>  
> I tried to solve it, and i found filesystem connector's copy function using a 
> shallow copy instread of deep copy.   It lead to all of query from a same 
> table source reuse the same bulkWriterFormat, and my query have filter 
> condition, which will push down into the bulkWriterFormat, so the filter 
> condition maybe reuse.   
> I found the DynamicTableSource and DynamicTableSink's copy function comment 
> to ask we should impletement it with deep copy, but i found every connector 
> are using shallow copy to impletement it.   So i think not only the 
> filesystem connector have this problem.
> !image-2023-04-01-16-18-56-075.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31686) All of connector should replace the shallow copy with a deep copy

2023-04-01 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-31686:

Attachment: image-2023-04-01-16-18-56-075.png

> All of connector should replace the shallow copy with a deep copy
> -
>
> Key: FLINK-31686
> URL: https://issues.apache.org/jira/browse/FLINK-31686
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.16.1
>Reporter: tanjialiang
>Priority: Major
> Attachments: image-2023-04-01-16-18-48-762.png, 
> image-2023-04-01-16-18-56-075.png
>
>
> Hi team, when i using the following sql
> {code:java}
> CREATE TABLE student (
>     `id` STRING,
>     `name` STRING,
>     `age` INT
> ) WITH (
>   'connector' = 'filesystem',
>   'path' = '...',
>   'format' = 'orc'
> );
> select
>     t1.total,
>     t2.total
> from
>     (
>         select
>             count(*) as total,
>             1 as join_key
>         from student
>         where name = 'tanjialiang'
>     ) t1
>     LEFT JOIN (
>         select
>             count(*) as total,
>             1 as join_key
>         from student;
>     ) t2 
>     ON t1.join_key = t2.join_key; {code}
>  
> it will throw an error
> !image-2023-04-01-15-53-40-060.png!
>  
> I tried to solve it, and i found filesystem connector's copy function using a 
> shallow copy instread of deep copy.   It lead to all of query from a same 
> table source reuse the same bulkWriterFormat, and my query have filter 
> condition, which will push down into the bulkWriterFormat, so the filter 
> condition maybe reuse.   
> I found the DynamicTableSource and DynamicTableSink's copy function comment 
> to ask we should impletement it with deep copy, but i found every connector 
> are using shallow copy to impletement it.   So i think not only the 
> filesystem connector have this problem.
> !image-2023-04-01-16-12-29-927.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31541) Get metrics in Flink WEB UI error

2023-03-21 Thread tanjialiang (Jira)
tanjialiang created FLINK-31541:
---

 Summary: Get metrics in Flink WEB UI error
 Key: FLINK-31541
 URL: https://issues.apache.org/jira/browse/FLINK-31541
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Metrics, Runtime / Web Frontend
Affects Versions: 1.16.1
Reporter: tanjialiang
 Attachments: image-2023-03-21-20-28-56-348.png

When i get a metrics from a operator which name contains '[' or ']', it will be 
return 400 from rest response.

The reason is we can not submit an GET request which params contains '[' or 
']', it is invaild in REST.

 

!image-2023-03-21-20-28-56-348.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31367) Support filter's function push down

2023-03-08 Thread tanjialiang (Jira)
tanjialiang created FLINK-31367:
---

 Summary: Support filter's function push down
 Key: FLINK-31367
 URL: https://issues.apache.org/jira/browse/FLINK-31367
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / JDBC, Table SQL / API
Affects Versions: 1.16.1
Reporter: tanjialiang


Hi teams, as far as i known, source ability support simply filter push down, it 
may be just push down constant value like this:
{code:java}
CREATE TABLE student (
   id int,
   brithday string
) WITH (
   ...
);

# it can support push down its filter if connector implements 
SupportsFilterPushDown
# id and birthday will be push down
SELECT * FROM student WHERE id = 1 AND birthday = '1997-04-13';{code}
But it will not push down like this:
{code:java}
CREATE TABLE student (
   id int,
   brithday string
) WITH (
   ...
);

# it will not support push down its filter though connector implements 
SupportsFilterPushDown
# id and birthday will not push down, so it will be filter in flink task
SELECT * FROM student WHERE id = 1 AND birthday = TO_DATE('1997-04-13 
00:00:00');{code}
 

Can we get the flink function in SupportsFilterPushDown, so we can adapt the 
flink function in every connector?

For example, I can adapt the Flink function TO_DATE to mysql's function 
STR_TO_DATE.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31308) JobManager's metaspace out-of-memory when submit a flinksessionjobs

2023-03-02 Thread tanjialiang (Jira)
tanjialiang created FLINK-31308:
---

 Summary: JobManager's metaspace out-of-memory when submit a 
flinksessionjobs
 Key: FLINK-31308
 URL: https://issues.apache.org/jira/browse/FLINK-31308
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator, Table SQL / API
Affects Versions: kubernetes-operator-1.4.0, 1.16.1
Reporter: tanjialiang
 Attachments: image-2023-03-03-10-34-46-681.png

Hello teams, when i try to recurring submit a flinksessionjobs by flink 
operator, it will be make JobManager's metaspace OOM. My Job having some 
flink-sql logic, it is the userclassloader didn't closed? Or may be beacuase of 
flink-sql's codegen? By the way, it not appear when i using flink-sql-gateway 
to submit.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30551) Add open method to PartitionCommitPolicy

2023-02-20 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17691406#comment-17691406
 ] 

tanjialiang commented on FLINK-30551:
-

I think it is necessary for custom partition commit policy to get the flink 
configuration from open method. I can set some custom option into flink conf, 
and use in my custom commit policy. WDYT? [~gaoyunhaii] [~lzljs3620320] 

> Add open method to PartitionCommitPolicy
> 
>
> Key: FLINK-30551
> URL: https://issues.apache.org/jira/browse/FLINK-30551
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Aitozi
>Priority: Major
>  Labels: pull-request-available
>
> Currently, the {{PartitionCommitPolicy}} do not have the open hook. The 
> custom partition commit policy does not have an appropriate entry point for 
> the init work.
> So I purpose to add an {{open}} method to make this work.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30687) FILTER not effect in count(*)

2023-02-10 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-30687:

Description: 
When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' 
is not effect.
{code:java}
CREATE TABLE student
(
id INT NOT NULL,
name STRING,
class_id INT NOT NULL
)
WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/test',
'table-name' = 'student',
'username' = 'root',
'password' = '12345678'
);

SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student;

or

SELECT COUNT(1) FILTER (WHERE class_id = 1) FROM student;{code}
 

But when i tried Flink SQL like this, it worked.
{code:java}
SELECT COUNT(*) FROM student WHERE class_id = 1;

or 

SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code}
 

By the way, mysql connector has a bug and fixed in 
https://issues.apache.org/jira/browse/FLINK-29558. Maybe you try this demo 
should cherry-pick this PR first.

  was:
When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' 
is not effect.
{code:java}
CREATE TABLE student
(
id INT NOT NULL,
name STRING,
class_id INT NOT NULL
)
WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/test',
'table-name' = 'student',
'username' = 'root',
'password' = '12345678'
);

SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student;

or

SELECT COUNT(1) FILTER (WHERE class_id = 1) FROM student;{code}
 

But when i tried Flink SQL like this, it worked.
{code:java}
SELECT COUNT(*) FROM student WHERE class_id = 1;

or 

SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code}
 

By the way, mysql connector has a bug and fixed in FLINK-27268. Maybe you try 
this demo should cherry-pick this PR first.


> FILTER not effect in count(*)
> -
>
> Key: FLINK-30687
> URL: https://issues.apache.org/jira/browse/FLINK-30687
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: tanjialiang
>Priority: Major
> Attachments: image-2023-01-16-10-54-04-673.png
>
>
> When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' 
> is not effect.
> {code:java}
> CREATE TABLE student
> (
> id INT NOT NULL,
> name STRING,
> class_id INT NOT NULL
> )
> WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://localhost:3306/test',
> 'table-name' = 'student',
> 'username' = 'root',
> 'password' = '12345678'
> );
> SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student;
> or
> SELECT COUNT(1) FILTER (WHERE class_id = 1) FROM student;{code}
>  
> But when i tried Flink SQL like this, it worked.
> {code:java}
> SELECT COUNT(*) FROM student WHERE class_id = 1;
> or 
> SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code}
>  
> By the way, mysql connector has a bug and fixed in 
> https://issues.apache.org/jira/browse/FLINK-29558. Maybe you try this demo 
> should cherry-pick this PR first.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-30922) SQL validate fail in parsing writable metadata

2023-02-10 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686968#comment-17686968
 ] 

tanjialiang edited comment on FLINK-30922 at 2/10/23 9:04 AM:
--

Great! Thanks for [~csq] 's contribute. Hope to merge it soon.


was (Author: JIRAUSER279823):
Great! Thanks for [~csq] 's contribute. Hope to merge it 
soon.[|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=godfreyhe]

> SQL validate fail in parsing writable metadata
> --
>
> Key: FLINK-30922
> URL: https://issues.apache.org/jira/browse/FLINK-30922
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.1
>Reporter: tanjialiang
>Priority: Major
>  Labels: pull-request-available
>
> When i tried an simple demo sql with writing metadata to the kafka in flink 
> sql client
> {code:java}
> CREATE TABLE KafkaTable (
>   `user_id` BIGINT,
>   `item_id` BIGINT,
>   `behavior` STRING,
>   `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'user_behavior',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'testGroup',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'csv'
> )
> INSERT INTO KafkaTable(user_id, ts) SELECT '1', CURRENT_TIMESTAMP; {code}
>  
> it will be throw an error
> {code:java}
> org.apache.flink.table.client.gateway.SqlExecutionException: Failed to parse 
> statement: INSERT INTO KafkaTable(user_id, ts) SELECT '1', CURRENT_TIMESTAMP;
>         at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:174)
>  ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.cli.SqlCommandParserImpl.parseCommand(SqlCommandParserImpl.java:45)
>  ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.cli.SqlMultiLineParser.parse(SqlMultiLineParser.java:71)
>  ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.jline.reader.impl.LineReaderImpl.acceptLine(LineReaderImpl.java:2964) 
> ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.jline.reader.impl.LineReaderImpl$$Lambda$364/1900307803.apply(Unknown 
> Source) ~[?:?]
>         at 
> org.jline.reader.impl.LineReaderImpl$1.apply(LineReaderImpl.java:3778) 
> ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.jline.reader.impl.LineReaderImpl.readLine(LineReaderImpl.java:679) 
> ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:295)
>  [flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:280)
>  [flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:228)
>  [flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) 
> [flink-sql-client-1.16.1.jar:1.16.1]
>         at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) 
> [flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) 
> [flink-sql-client-1.16.1.jar:1.16.1]
>         at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) 
> [flink-sql-client-1.16.1.jar:1.16.1]
> Caused by: org.apache.flink.table.api.ValidationException: SQL validation 
> failed. From line 1, column 33 to line 1, column 34: Unknown target column 
> 'ts'
>         at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>  ~[?:?]
>         at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172)
>  ~[flink-sql-client-1.16.1.jar:1.16.1]
>         ... 13 more
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
> column 33 to line 1, column 34: Unknown target column 'ts'
>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
> Method) ~[?:1.8.0_41]
>         at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  ~[?:1.8.0_41]
>         at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  ~[?:1.8.0_41]
>         at 

[jira] [Commented] (FLINK-30922) SQL validate fail in parsing writable metadata

2023-02-10 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686968#comment-17686968
 ] 

tanjialiang commented on FLINK-30922:
-

Great! Thanks for [~csq] 's contribute. Hope to merge it 
soon.[|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=godfreyhe]

> SQL validate fail in parsing writable metadata
> --
>
> Key: FLINK-30922
> URL: https://issues.apache.org/jira/browse/FLINK-30922
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.1
>Reporter: tanjialiang
>Priority: Major
>  Labels: pull-request-available
>
> When i tried an simple demo sql with writing metadata to the kafka in flink 
> sql client
> {code:java}
> CREATE TABLE KafkaTable (
>   `user_id` BIGINT,
>   `item_id` BIGINT,
>   `behavior` STRING,
>   `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'user_behavior',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'testGroup',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'csv'
> )
> INSERT INTO KafkaTable(user_id, ts) SELECT '1', CURRENT_TIMESTAMP; {code}
>  
> it will be throw an error
> {code:java}
> org.apache.flink.table.client.gateway.SqlExecutionException: Failed to parse 
> statement: INSERT INTO KafkaTable(user_id, ts) SELECT '1', CURRENT_TIMESTAMP;
>         at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:174)
>  ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.cli.SqlCommandParserImpl.parseCommand(SqlCommandParserImpl.java:45)
>  ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.cli.SqlMultiLineParser.parse(SqlMultiLineParser.java:71)
>  ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.jline.reader.impl.LineReaderImpl.acceptLine(LineReaderImpl.java:2964) 
> ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.jline.reader.impl.LineReaderImpl$$Lambda$364/1900307803.apply(Unknown 
> Source) ~[?:?]
>         at 
> org.jline.reader.impl.LineReaderImpl$1.apply(LineReaderImpl.java:3778) 
> ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.jline.reader.impl.LineReaderImpl.readLine(LineReaderImpl.java:679) 
> ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:295)
>  [flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:280)
>  [flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:228)
>  [flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) 
> [flink-sql-client-1.16.1.jar:1.16.1]
>         at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) 
> [flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) 
> [flink-sql-client-1.16.1.jar:1.16.1]
>         at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) 
> [flink-sql-client-1.16.1.jar:1.16.1]
> Caused by: org.apache.flink.table.api.ValidationException: SQL validation 
> failed. From line 1, column 33 to line 1, column 34: Unknown target column 
> 'ts'
>         at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>  ~[?:?]
>         at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172)
>  ~[flink-sql-client-1.16.1.jar:1.16.1]
>         ... 13 more
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
> column 33 to line 1, column 34: Unknown target column 'ts'
>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
> Method) ~[?:1.8.0_41]
>         at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  ~[?:1.8.0_41]
>         at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  ~[?:1.8.0_41]
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:422) 
> ~[?:1.8.0_41]
>         at 
> 

[jira] [Updated] (FLINK-30922) SQL validate fail in parsing writable metadata

2023-02-07 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-30922:

Summary: SQL validate fail in parsing writable metadata  (was: SQL validate 
fail to parse writing metadata)

> SQL validate fail in parsing writable metadata
> --
>
> Key: FLINK-30922
> URL: https://issues.apache.org/jira/browse/FLINK-30922
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.1
>Reporter: tanjialiang
>Priority: Major
>
> When i tried an simple demo sql with writing metadata to the kafka in flink 
> sql client
> {code:java}
> CREATE TABLE KafkaTable (
>   `user_id` BIGINT,
>   `item_id` BIGINT,
>   `behavior` STRING,
>   `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'user_behavior',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'testGroup',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'csv'
> )
> INSERT INTO KafkaTable(user_id, ts) SELECT '1', CURRENT_TIMESTAMP; {code}
>  
> it will be throw an error
> {code:java}
> org.apache.flink.table.client.gateway.SqlExecutionException: Failed to parse 
> statement: INSERT INTO KafkaTable(user_id, ts) SELECT '1', CURRENT_TIMESTAMP;
>         at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:174)
>  ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.cli.SqlCommandParserImpl.parseCommand(SqlCommandParserImpl.java:45)
>  ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.cli.SqlMultiLineParser.parse(SqlMultiLineParser.java:71)
>  ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.jline.reader.impl.LineReaderImpl.acceptLine(LineReaderImpl.java:2964) 
> ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.jline.reader.impl.LineReaderImpl$$Lambda$364/1900307803.apply(Unknown 
> Source) ~[?:?]
>         at 
> org.jline.reader.impl.LineReaderImpl$1.apply(LineReaderImpl.java:3778) 
> ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.jline.reader.impl.LineReaderImpl.readLine(LineReaderImpl.java:679) 
> ~[flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:295)
>  [flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:280)
>  [flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:228)
>  [flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) 
> [flink-sql-client-1.16.1.jar:1.16.1]
>         at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) 
> [flink-sql-client-1.16.1.jar:1.16.1]
>         at 
> org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) 
> [flink-sql-client-1.16.1.jar:1.16.1]
>         at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) 
> [flink-sql-client-1.16.1.jar:1.16.1]
> Caused by: org.apache.flink.table.api.ValidationException: SQL validation 
> failed. From line 1, column 33 to line 1, column 34: Unknown target column 
> 'ts'
>         at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261)
>  ~[?:?]
>         at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>  ~[?:?]
>         at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172)
>  ~[flink-sql-client-1.16.1.jar:1.16.1]
>         ... 13 more
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
> column 33 to line 1, column 34: Unknown target column 'ts'
>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
> Method) ~[?:1.8.0_41]
>         at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  ~[?:1.8.0_41]
>         at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  ~[?:1.8.0_41]
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:422) 
> ~[?:1.8.0_41]
>         at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) 
> ~[?:?]
>         at 
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883) 

[jira] [Created] (FLINK-30922) SQL validate fail to parse writing metadata

2023-02-06 Thread tanjialiang (Jira)
tanjialiang created FLINK-30922:
---

 Summary: SQL validate fail to parse writing metadata
 Key: FLINK-30922
 URL: https://issues.apache.org/jira/browse/FLINK-30922
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.16.1
Reporter: tanjialiang


When i tried an simple demo sql with writing metadata to the kafka in flink sql 
client
{code:java}
CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
)

INSERT INTO KafkaTable(user_id, ts) SELECT '1', CURRENT_TIMESTAMP; {code}
 

it will be throw an error
{code:java}
org.apache.flink.table.client.gateway.SqlExecutionException: Failed to parse 
statement: INSERT INTO KafkaTable(user_id, ts) SELECT '1', CURRENT_TIMESTAMP;
        at 
org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:174)
 ~[flink-sql-client-1.16.1.jar:1.16.1]
        at 
org.apache.flink.table.client.cli.SqlCommandParserImpl.parseCommand(SqlCommandParserImpl.java:45)
 ~[flink-sql-client-1.16.1.jar:1.16.1]
        at 
org.apache.flink.table.client.cli.SqlMultiLineParser.parse(SqlMultiLineParser.java:71)
 ~[flink-sql-client-1.16.1.jar:1.16.1]
        at 
org.jline.reader.impl.LineReaderImpl.acceptLine(LineReaderImpl.java:2964) 
~[flink-sql-client-1.16.1.jar:1.16.1]
        at 
org.jline.reader.impl.LineReaderImpl$$Lambda$364/1900307803.apply(Unknown 
Source) ~[?:?]
        at 
org.jline.reader.impl.LineReaderImpl$1.apply(LineReaderImpl.java:3778) 
~[flink-sql-client-1.16.1.jar:1.16.1]
        at 
org.jline.reader.impl.LineReaderImpl.readLine(LineReaderImpl.java:679) 
~[flink-sql-client-1.16.1.jar:1.16.1]
        at 
org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:295)
 [flink-sql-client-1.16.1.jar:1.16.1]
        at 
org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:280)
 [flink-sql-client-1.16.1.jar:1.16.1]
        at 
org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:228)
 [flink-sql-client-1.16.1.jar:1.16.1]
        at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) 
[flink-sql-client-1.16.1.jar:1.16.1]
        at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) 
[flink-sql-client-1.16.1.jar:1.16.1]
        at 
org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) 
[flink-sql-client-1.16.1.jar:1.16.1]
        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) 
[flink-sql-client-1.16.1.jar:1.16.1]
Caused by: org.apache.flink.table.api.ValidationException: SQL validation 
failed. From line 1, column 33 to line 1, column 34: Unknown target column 'ts'
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186)
 ~[?:?]
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
 ~[?:?]
        at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261)
 ~[?:?]
        at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) 
~[?:?]
        at 
org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172)
 ~[flink-sql-client-1.16.1.jar:1.16.1]
        ... 13 more
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
column 33 to line 1, column 34: Unknown target column 'ts'
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
Method) ~[?:1.8.0_41]
        at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 ~[?:1.8.0_41]
        at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 ~[?:1.8.0_41]
        at java.lang.reflect.Constructor.newInstance(Constructor.java:422) 
~[?:1.8.0_41]
        at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) 
~[?:?]
        at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883) 
~[?:?]
        at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868) 
~[?:?]
        at 
org.apache.flink.table.planner.calcite.PreValidateReWriter$.newValidationError(PreValidateReWriter.scala:401)
 ~[?:?]
        at 
org.apache.flink.table.planner.calcite.PreValidateReWriter$.validateField(PreValidateReWriter.scala:389)
 ~[?:?]
        at 
org.apache.flink.table.planner.calcite.PreValidateReWriter$.$anonfun$appendPartitionAndNullsProjects$3(PreValidateReWriter.scala:172)
 ~[?:?]
    

[jira] [Comment Edited] (FLINK-30687) FILTER not effect in count(*)

2023-01-15 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677122#comment-17677122
 ] 

tanjialiang edited comment on FLINK-30687 at 1/16/23 2:58 AM:
--

I try to found out how to fix this, i found in 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator#createFilterExpression
 the filterArg must be >0, but ‘SELECT COUNT( * ) FILTER (WHERE cluster_id = 1) 
FROM ${table}’ 's filterArg is 0. I fixed the condition in 'filterArg >= 0', 
and it work.

!image-2023-01-16-10-54-04-673.png!

 

So i want to ask why we can not filterArg=0? Did it some scenes we can not use? 
Because 'inputFieldTypes' is a 'scala.collection.Seq' which index is start from 
0.


was (Author: JIRAUSER279823):
I try to found out how to fix this, i found in 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator#createFilterExpression
 the filterArg must be >0, but ‘SELECT COUNT(*) FILTER (WHERE cluster_id = 1) 
FROM ${table}’ 's filterArg is 0. I fixed the condition in 'filterArg >= 0', 
and it work.

!image-2023-01-16-10-54-04-673.png!

 

So i want to ask why we can not filterArg=0? Did it some scenes we can not use? 
Because 'inputFieldTypes' is a 'scala.collection.Seq' which index is start from 
0.

> FILTER not effect in count(*)
> -
>
> Key: FLINK-30687
> URL: https://issues.apache.org/jira/browse/FLINK-30687
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: tanjialiang
>Priority: Major
> Attachments: image-2023-01-16-10-54-04-673.png
>
>
> When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' 
> is not effect.
> {code:java}
> CREATE TABLE student
> (
> id INT NOT NULL,
> name STRING,
> class_id INT NOT NULL
> )
> WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://localhost:3306/test',
> 'table-name' = 'student',
> 'username' = 'root',
> 'password' = '12345678'
> );
> SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student;
> or
> SELECT COUNT(1) FILTER (WHERE class_id = 1) FROM student;{code}
>  
> But when i tried Flink SQL like this, it worked.
> {code:java}
> SELECT COUNT(*) FROM student WHERE class_id = 1;
> or 
> SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code}
>  
> By the way, mysql connector has a bug and fixed in FLINK-27268. Maybe you try 
> this demo should cherry-pick this PR first.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30687) FILTER not effect in count(*)

2023-01-15 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677122#comment-17677122
 ] 

tanjialiang commented on FLINK-30687:
-

I try to found out how to fix this, i found in 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator#createFilterExpression
 the filterArg must be >0, but ‘SELECT COUNT(*) FILTER (WHERE cluster_id = 1) 
FROM ${table}’ 's filterArg is 0. I fixed the condition in 'filterArg >= 0', 
and it work.

!image-2023-01-16-10-54-04-673.png!

 

So i want to ask why we can not filterArg=0? Did it some scenes we can not use? 
Because 'inputFieldTypes' is a 'scala.collection.Seq' which index is start from 
0.

> FILTER not effect in count(*)
> -
>
> Key: FLINK-30687
> URL: https://issues.apache.org/jira/browse/FLINK-30687
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: tanjialiang
>Priority: Major
> Attachments: image-2023-01-16-10-54-04-673.png
>
>
> When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' 
> is not effect.
> {code:java}
> CREATE TABLE student
> (
> id INT NOT NULL,
> name STRING,
> class_id INT NOT NULL
> )
> WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://localhost:3306/test',
> 'table-name' = 'student',
> 'username' = 'root',
> 'password' = '12345678'
> );
> SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student;
> or
> SELECT COUNT(1) FILTER (WHERE class_id = 1) FROM student;{code}
>  
> But when i tried Flink SQL like this, it worked.
> {code:java}
> SELECT COUNT(*) FROM student WHERE class_id = 1;
> or 
> SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code}
>  
> By the way, mysql connector has a bug and fixed in FLINK-27268. Maybe you try 
> this demo should cherry-pick this PR first.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30687) FILTER not effect in count(*)

2023-01-15 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-30687:

Attachment: image-2023-01-16-10-54-04-673.png

> FILTER not effect in count(*)
> -
>
> Key: FLINK-30687
> URL: https://issues.apache.org/jira/browse/FLINK-30687
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: tanjialiang
>Priority: Major
> Attachments: image-2023-01-16-10-54-04-673.png
>
>
> When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' 
> is not effect.
> {code:java}
> CREATE TABLE student
> (
> id INT NOT NULL,
> name STRING,
> class_id INT NOT NULL
> )
> WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://localhost:3306/test',
> 'table-name' = 'student',
> 'username' = 'root',
> 'password' = '12345678'
> );
> SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student;
> or
> SELECT COUNT(1) FILTER (WHERE class_id = 1) FROM student;{code}
>  
> But when i tried Flink SQL like this, it worked.
> {code:java}
> SELECT COUNT(*) FROM student WHERE class_id = 1;
> or 
> SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code}
>  
> By the way, mysql connector has a bug and fixed in FLINK-27268. Maybe you try 
> this demo should cherry-pick this PR first.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30687) FILTER not effect in count(*)

2023-01-13 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-30687:

Description: 
When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' 
is not effect.
{code:java}
CREATE TABLE student
(
id INT NOT NULL,
name STRING,
class_id INT NOT NULL
)
WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/test',
'table-name' = 'student',
'username' = 'root',
'password' = '12345678'
);

SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student;

or

SELECT COUNT(1) FILTER (WHERE class_id = 1) FROM student;{code}
 

But when i tried Flink SQL like this, it worked.
{code:java}
SELECT COUNT(*) FROM student WHERE class_id = 1;

or 

SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code}
 

By the way, mysql connector has a bug and fixed in FLINK-27268. Maybe you try 
this demo should cherry-pick this PR first.

  was:
When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' 
is not effect.
{code:java}
CREATE TABLE student
(
id INT NOT NULL,
name STRING,
class_id INT NOT NULL
)
WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/test',
'table-name' = 'student',
'username' = 'root',
'password' = '12345678'
);

SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student;
{code}
 

But when i tried Flink SQL like this, it worked.
{code:java}
SELECT COUNT(*) FROM student WHERE class_id = 1;

or 

SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code}
 

By the way, mysql connector has a bug and fixed in 
[FLINK-27268|https://issues.apache.org/jira/browse/FLINK-27268]. Maybe you try 
this demo should cherry-pick this PR first.


> FILTER not effect in count(*)
> -
>
> Key: FLINK-30687
> URL: https://issues.apache.org/jira/browse/FLINK-30687
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: tanjialiang
>Priority: Major
>
> When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' 
> is not effect.
> {code:java}
> CREATE TABLE student
> (
> id INT NOT NULL,
> name STRING,
> class_id INT NOT NULL
> )
> WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://localhost:3306/test',
> 'table-name' = 'student',
> 'username' = 'root',
> 'password' = '12345678'
> );
> SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student;
> or
> SELECT COUNT(1) FILTER (WHERE class_id = 1) FROM student;{code}
>  
> But when i tried Flink SQL like this, it worked.
> {code:java}
> SELECT COUNT(*) FROM student WHERE class_id = 1;
> or 
> SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code}
>  
> By the way, mysql connector has a bug and fixed in FLINK-27268. Maybe you try 
> this demo should cherry-pick this PR first.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30687) FILTER not effect in count(*)

2023-01-13 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-30687:

Description: 
When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' 
is not effect.
{code:java}
CREATE TABLE student
(
id INT NOT NULL,
name STRING,
class_id INT NOT NULL
)
WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/test',
'table-name' = 'student',
'username' = 'root',
'password' = '12345678'
);

SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student;
{code}
 

But when i tried Flink SQL like this, it worked.
{code:java}
SELECT COUNT(*) FROM student WHERE class_id = 1;

or 

SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code}
 

By the way, mysql connector has a bug and fixed in 
[FLINK-27268|https://issues.apache.org/jira/browse/FLINK-27268]. Maybe you try 
this demo should cherry-pick this PR first.

  was:
When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' 
is not effect.
{code:java}
CREATE TABLE student
(
id INT NOT NULL,
name STRING,
class_id INT NOT NULL
)
WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/test',
'table-name' = 'student',
'username' = 'root',
'password' = '12345678'
);

SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student;
{code}
 

But when i tried Flink SQL like this, it worked.
{code:java}
SELECT COUNT(*) FROM student WHERE class_id = 1;

or 

SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code}
 

By the way, mysql connector has a bug and fixed in 
[FLINK-27268|(https://issues.apache.org/jira/browse/FLINK-27268]. Maybe you try 
this demo should cherry-pick this PR first.


> FILTER not effect in count(*)
> -
>
> Key: FLINK-30687
> URL: https://issues.apache.org/jira/browse/FLINK-30687
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: tanjialiang
>Priority: Major
>
> When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' 
> is not effect.
> {code:java}
> CREATE TABLE student
> (
> id INT NOT NULL,
> name STRING,
> class_id INT NOT NULL
> )
> WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://localhost:3306/test',
> 'table-name' = 'student',
> 'username' = 'root',
> 'password' = '12345678'
> );
> SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student;
> {code}
>  
> But when i tried Flink SQL like this, it worked.
> {code:java}
> SELECT COUNT(*) FROM student WHERE class_id = 1;
> or 
> SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code}
>  
> By the way, mysql connector has a bug and fixed in 
> [FLINK-27268|https://issues.apache.org/jira/browse/FLINK-27268]. Maybe you 
> try this demo should cherry-pick this PR first.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30687) FILTER not effect in count(*)

2023-01-13 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-30687:

Description: 
When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' 
is not effect.
{code:java}
CREATE TABLE student
(
id INT NOT NULL,
name STRING,
class_id INT NOT NULL
)
WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/test',
'table-name' = 'student',
'username' = 'root',
'password' = '12345678'
);

SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student;
{code}
 

But when i tried Flink SQL like this, it worked.
{code:java}
SELECT COUNT(*) FROM student WHERE class_id = 1;

or 

SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code}
 

By the way, mysql connector has a bug and fixed in 
[FLINK-27268|(https://issues.apache.org/jira/browse/FLINK-27268]. Maybe you try 
this demo should cherry-pick this PR first.

  was:
When i try to using Flink SQL like this
{code:java}
CREATE TABLE student
(
id INT NOT NULL,
name STRING,
class_id INT NOT NULL
)
WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/test',
'table-name' = 'student',
'username' = 'root',
'password' = '12345678'
);

SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student;
{code}
 

I found 'FILTER(WHERE class_id = 1)' is not effect.

But when i tried Flink SQL like this, it worked.
{code:java}
SELECT COUNT(*) FROM student WHERE class_id = 1;

or 

SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code}
 

By the way, mysql connector has a bug and fixed in 
[FLINK-27268|(https://issues.apache.org/jira/browse/FLINK-27268]. Maybe you try 
this demo should cherry-pick this PR first.


> FILTER not effect in count(*)
> -
>
> Key: FLINK-30687
> URL: https://issues.apache.org/jira/browse/FLINK-30687
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: tanjialiang
>Priority: Major
>
> When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' 
> is not effect.
> {code:java}
> CREATE TABLE student
> (
> id INT NOT NULL,
> name STRING,
> class_id INT NOT NULL
> )
> WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://localhost:3306/test',
> 'table-name' = 'student',
> 'username' = 'root',
> 'password' = '12345678'
> );
> SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student;
> {code}
>  
> But when i tried Flink SQL like this, it worked.
> {code:java}
> SELECT COUNT(*) FROM student WHERE class_id = 1;
> or 
> SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code}
>  
> By the way, mysql connector has a bug and fixed in 
> [FLINK-27268|(https://issues.apache.org/jira/browse/FLINK-27268]. Maybe you 
> try this demo should cherry-pick this PR first.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30687) FILTER not effect in count(*)

2023-01-13 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-30687:

Description: 
When i try to using Flink SQL like this
{code:java}
CREATE TABLE student
(
id INT NOT NULL,
name STRING,
class_id INT NOT NULL
)
WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/test',
'table-name' = 'student',
'username' = 'root',
'password' = '12345678'
);

SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student;
{code}
 

I found 'FILTER(WHERE class_id = 1)' is not effect.

But when i tried Flink SQL like this, it worked.
{code:java}
SELECT COUNT(*) FROM student WHERE class_id = 1;

or 

SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code}
 

By the way, mysql connector has a bug and fixed in 
[FLINK-27268|(https://issues.apache.org/jira/browse/FLINK-27268]. Maybe you try 
this demo should cherry-pick this PR first.

  was:
When i try to using Flink SQL like this

 
{code:java}
CREATE TABLE student
(
id INT NOT NULL,
name STRING,
class_id INT NOT NULL
)
WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/test',
'table-name' = 'student',
'username' = 'root',
'password' = '12345678'
);

SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student;
{code}
 

 

I found 'FILTER(WHERE class_id = 1)' is not effect.

But when i tried Flink SQL like this, it worked.

 
{code:java}
SELECT COUNT(*) FROM student WHERE class_id = 1;

or 

SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code}
 

 

By the way, mysql connector has a bug and fixed in 
[FLINK-27268|(https://issues.apache.org/jira/browse/FLINK-27268]. Maybe you try 
this demo should cherry-pick this PR first.


> FILTER not effect in count(*)
> -
>
> Key: FLINK-30687
> URL: https://issues.apache.org/jira/browse/FLINK-30687
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: tanjialiang
>Priority: Major
>
> When i try to using Flink SQL like this
> {code:java}
> CREATE TABLE student
> (
> id INT NOT NULL,
> name STRING,
> class_id INT NOT NULL
> )
> WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://localhost:3306/test',
> 'table-name' = 'student',
> 'username' = 'root',
> 'password' = '12345678'
> );
> SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student;
> {code}
>  
> I found 'FILTER(WHERE class_id = 1)' is not effect.
> But when i tried Flink SQL like this, it worked.
> {code:java}
> SELECT COUNT(*) FROM student WHERE class_id = 1;
> or 
> SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code}
>  
> By the way, mysql connector has a bug and fixed in 
> [FLINK-27268|(https://issues.apache.org/jira/browse/FLINK-27268]. Maybe you 
> try this demo should cherry-pick this PR first.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30687) FILTER not effect in count(*)

2023-01-13 Thread tanjialiang (Jira)
tanjialiang created FLINK-30687:
---

 Summary: FILTER not effect in count(*)
 Key: FLINK-30687
 URL: https://issues.apache.org/jira/browse/FLINK-30687
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.16.0
Reporter: tanjialiang


When i try to using Flink SQL like this

 
{code:java}
CREATE TABLE student
(
id INT NOT NULL,
name STRING,
class_id INT NOT NULL
)
WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/test',
'table-name' = 'student',
'username' = 'root',
'password' = '12345678'
);

SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student;
{code}
 

 

I found 'FILTER(WHERE class_id = 1)' is not effect.

But when i tried Flink SQL like this, it worked.

 
{code:java}
SELECT COUNT(*) FROM student WHERE class_id = 1;

or 

SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code}
 

 

By the way, mysql connector has a bug and fixed in 
[FLINK-27268|(https://issues.apache.org/jira/browse/FLINK-27268]. Maybe you try 
this demo should cherry-pick this PR first.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29558) Use select count(*) from xxx; and get SQL syntax

2023-01-13 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17676845#comment-17676845
 ] 

tanjialiang commented on FLINK-29558:
-

[~StarBoy1005] It had been fixed in 
[FLINK-27268|https://issues.apache.org/jira/browse/FLINK-27268], but i found a 
problem, when using flink sql like this "SELECT COUNT(*) FROM table" or "SELECT 
COUNT(1) FROM table", projection always return 'ROW<> NOT NULL', so it cause 
mysql connector transform to "SELECT FROM table". After 
[FLINK-27268|https://issues.apache.org/jira/browse/FLINK-27268], it transform 
to "SELECT '' FROM table". But is it projection's bug? When projection can not 
match something, it will be return 'ROW<> NOT NULL'. I think maybe not only 
mysql connector has this bug. cc [~martijnvisser] [~jark] 

> Use select count(*) from xxx; and get SQL syntax
> 
>
> Key: FLINK-29558
> URL: https://issues.apache.org/jira/browse/FLINK-29558
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.15.3
> Environment: flink 1.15.2
> CentOS Linux release 7.9.2009 (Core)
> 5.7.32-log MySQL Community Server (GPL)
>Reporter: StarBoy1005
>Priority: Major
> Attachments: image-2022-10-10-15-31-34-341.png, 
> image-2022-10-19-17-55-14-700.png, image-2022-11-03-18-16-12-127.png, 
> screenshot-1.png, screenshot-2.png
>
>
> Hi, I use flink sql to make kafka records to mysql.
> so I create these 2 tables in flink sql,here is the mysql ,and I created the 
> table in mysql before I did the insert action in flink sql.
>   CREATE TABLE mysql_MyUserTable (
>   id STRING,
>   name STRING,
>   age STRING,
>   status STRING,
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>'connector' = 'jdbc',
>'url' = 'jdbc:mysql://10.19.29.170:3306/fromflink152',
>'table-name' = 'users',
>'username' = 'root',
>'password' = '**'
> );
> In mysql, I created database "fromflink152" then created the table like this 
> way
>  CREATE TABLE `users` (
>   `id` varchar(64) NOT NULL DEFAULT '',
>   `name` varchar(255) DEFAULT NULL,
>   `age` varchar(255) DEFAULT NULL,
>   `status` varchar(255) DEFAULT NULL,
>   PRIMARY KEY (`id`)
> )  
> After executed insert sql,I found 'select * from mysql_MyUserTable' can get 
> correct result,but ’select count(\*) from mysql_MyUserTable‘  or ’select 
> count(id) from mysql_MyUserTable‘ ,the collect job in flink app keep 
> restarting again and again.The exception is:
>  !image-2022-10-10-15-31-34-341.png! 
> So I wonder which config that I missed about the table in flink or mysql side 
> :(



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-29558) Use select count(*) from xxx; and get SQL syntax

2023-01-13 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17676845#comment-17676845
 ] 

tanjialiang edited comment on FLINK-29558 at 1/14/23 5:48 AM:
--

[~StarBoy1005] It had been fixed in FLINK-27268, but i found a problem, when 
using flink sql like this "SELECT COUNT(*) FROM table" or "SELECT COUNT(1) FROM 
table", projection always return 'ROW<> NOT NULL', so it cause mysql connector 
transform to "SELECT FROM table". After FLINK-27268, it transform to "SELECT '' 
FROM table". But is it projection's bug? When projection can not match 
something, it will be return 'ROW<> NOT NULL'. I think maybe not only mysql 
connector has this bug. cc [~martijnvisser] [~jark] 


was (Author: JIRAUSER279823):
[~StarBoy1005] It had been fixed in 
[FLINK-27268|https://issues.apache.org/jira/browse/FLINK-27268], but i found a 
problem, when using flink sql like this "SELECT COUNT(*) FROM table" or "SELECT 
COUNT(1) FROM table", projection always return 'ROW<> NOT NULL', so it cause 
mysql connector transform to "SELECT FROM table". After 
[FLINK-27268|https://issues.apache.org/jira/browse/FLINK-27268], it transform 
to "SELECT '' FROM table". But is it projection's bug? When projection can not 
match something, it will be return 'ROW<> NOT NULL'. I think maybe not only 
mysql connector has this bug. cc [~martijnvisser] [~jark] 

> Use select count(*) from xxx; and get SQL syntax
> 
>
> Key: FLINK-29558
> URL: https://issues.apache.org/jira/browse/FLINK-29558
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.15.3
> Environment: flink 1.15.2
> CentOS Linux release 7.9.2009 (Core)
> 5.7.32-log MySQL Community Server (GPL)
>Reporter: StarBoy1005
>Priority: Major
> Attachments: image-2022-10-10-15-31-34-341.png, 
> image-2022-10-19-17-55-14-700.png, image-2022-11-03-18-16-12-127.png, 
> screenshot-1.png, screenshot-2.png
>
>
> Hi, I use flink sql to make kafka records to mysql.
> so I create these 2 tables in flink sql,here is the mysql ,and I created the 
> table in mysql before I did the insert action in flink sql.
>   CREATE TABLE mysql_MyUserTable (
>   id STRING,
>   name STRING,
>   age STRING,
>   status STRING,
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>'connector' = 'jdbc',
>'url' = 'jdbc:mysql://10.19.29.170:3306/fromflink152',
>'table-name' = 'users',
>'username' = 'root',
>'password' = '**'
> );
> In mysql, I created database "fromflink152" then created the table like this 
> way
>  CREATE TABLE `users` (
>   `id` varchar(64) NOT NULL DEFAULT '',
>   `name` varchar(255) DEFAULT NULL,
>   `age` varchar(255) DEFAULT NULL,
>   `status` varchar(255) DEFAULT NULL,
>   PRIMARY KEY (`id`)
> )  
> After executed insert sql,I found 'select * from mysql_MyUserTable' can get 
> correct result,but ’select count(\*) from mysql_MyUserTable‘  or ’select 
> count(id) from mysql_MyUserTable‘ ,the collect job in flink app keep 
> restarting again and again.The exception is:
>  !image-2022-10-10-15-31-34-341.png! 
> So I wonder which config that I missed about the table in flink or mysql side 
> :(



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-29558) Use select count(*) from xxx; and get SQL syntax

2023-01-13 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17676845#comment-17676845
 ] 

tanjialiang edited comment on FLINK-29558 at 1/14/23 5:48 AM:
--

[~StarBoy1005] It had been fixed in FLINK-27268, but i found a problem, when 
using flink sql like this "SELECT COUNT( * ) FROM table" or "SELECT COUNT(1) 
FROM table", projection always return 'ROW<> NOT NULL', so it cause mysql 
connector transform to "SELECT FROM table". After FLINK-27268, it transform to 
"SELECT '' FROM table". But is it projection's bug? When projection can not 
match something, it will be return 'ROW<> NOT NULL'. I think maybe not only 
mysql connector has this bug. cc [~martijnvisser] [~jark] 


was (Author: JIRAUSER279823):
[~StarBoy1005] It had been fixed in FLINK-27268, but i found a problem, when 
using flink sql like this "SELECT COUNT(*) FROM table" or "SELECT COUNT(1) FROM 
table", projection always return 'ROW<> NOT NULL', so it cause mysql connector 
transform to "SELECT FROM table". After FLINK-27268, it transform to "SELECT '' 
FROM table". But is it projection's bug? When projection can not match 
something, it will be return 'ROW<> NOT NULL'. I think maybe not only mysql 
connector has this bug. cc [~martijnvisser] [~jark] 

> Use select count(*) from xxx; and get SQL syntax
> 
>
> Key: FLINK-29558
> URL: https://issues.apache.org/jira/browse/FLINK-29558
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.15.3
> Environment: flink 1.15.2
> CentOS Linux release 7.9.2009 (Core)
> 5.7.32-log MySQL Community Server (GPL)
>Reporter: StarBoy1005
>Priority: Major
> Attachments: image-2022-10-10-15-31-34-341.png, 
> image-2022-10-19-17-55-14-700.png, image-2022-11-03-18-16-12-127.png, 
> screenshot-1.png, screenshot-2.png
>
>
> Hi, I use flink sql to make kafka records to mysql.
> so I create these 2 tables in flink sql,here is the mysql ,and I created the 
> table in mysql before I did the insert action in flink sql.
>   CREATE TABLE mysql_MyUserTable (
>   id STRING,
>   name STRING,
>   age STRING,
>   status STRING,
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>'connector' = 'jdbc',
>'url' = 'jdbc:mysql://10.19.29.170:3306/fromflink152',
>'table-name' = 'users',
>'username' = 'root',
>'password' = '**'
> );
> In mysql, I created database "fromflink152" then created the table like this 
> way
>  CREATE TABLE `users` (
>   `id` varchar(64) NOT NULL DEFAULT '',
>   `name` varchar(255) DEFAULT NULL,
>   `age` varchar(255) DEFAULT NULL,
>   `status` varchar(255) DEFAULT NULL,
>   PRIMARY KEY (`id`)
> )  
> After executed insert sql,I found 'select * from mysql_MyUserTable' can get 
> correct result,but ’select count(\*) from mysql_MyUserTable‘  or ’select 
> count(id) from mysql_MyUserTable‘ ,the collect job in flink app keep 
> restarting again and again.The exception is:
>  !image-2022-10-10-15-31-34-341.png! 
> So I wonder which config that I missed about the table in flink or mysql side 
> :(



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30520) Arguments contains with '#' will error split in loadYAMLResource

2022-12-29 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17652797#comment-17652797
 ] 

tanjialiang commented on FLINK-30520:
-

[~xtsong], thanks for your reply, it is the best way to introduce proper 
standard yaml parsing. Hope to fix it soon.

> Arguments contains with '#' will error split in loadYAMLResource
> 
>
> Key: FLINK-30520
> URL: https://issues.apache.org/jira/browse/FLINK-30520
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.16.0, 1.15.3
>Reporter: tanjialiang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2022-12-28-17-13-25-351.png, 
> image-2022-12-28-17-13-38-337.png, image-2022-12-28-18-03-40-806.png
>
>
> When i submit a flink jar job in Kubernetes Application mode which main args 
> contains '#', it will be error split by 
> org.apache.flink.configuration.GlobalConfiguration#loadYAMLResource
>  
> such as i using flink-kubernetes-operator to submit a job in kubernetes 
> application mode
>  
> {code:java}
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   name: word-count
> spec:
>   image: apache/flink:1.16.0-scala_2.12-java8
>   flinkVersion: v1_16
>   flinkConfiguration:
> taskmanager.numberOfTaskSlots: "1"
>   jobManager:
> resource:
>   memory: "2048m"
>   cpu: 1
>   taskManager:
> resource:
>   memory: "2048m"
>   cpu: 1
>   serviceAccount: flink
>   job:
> jarURI: local:///opt/flink/examples/streaming/WordCount.jar
> args:
>   - --output
>   - /tmp/1#.txt
> parallelism: 2
> upgradeMode: stateless
> {code}
>  
>  
> It will be error split when loading the flink-conf.yaml
> !image-2022-12-28-17-13-25-351.png!
>  
> And i enter to the jobmanager's pod to saw the flink-conf.yaml's program-args 
> is right
> !image-2022-12-28-17-13-38-337.png!
>  
> Maybe we should have a more strict validate for yaml comment? this is the 
> yaml editor by vim, we can see comment should be start with '#' in line or 
> comment start with ' #'
> !image-2022-12-28-18-03-40-806.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30520) Arguments contains with '#' will error split in loadYAMLResource

2022-12-28 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-30520:

Description: 
When i submit a flink jar job in Kubernetes Application mode which main args 
contains '#', it will be error split by 
org.apache.flink.configuration.GlobalConfiguration#loadYAMLResource

 

such as i using flink-kubernetes-operator to submit a job in kubernetes 
application mode

 
{code:java}
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: word-count
spec:
  image: apache/flink:1.16.0-scala_2.12-java8
  flinkVersion: v1_16
  flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
  jobManager:
resource:
  memory: "2048m"
  cpu: 1
  taskManager:
resource:
  memory: "2048m"
  cpu: 1
  serviceAccount: flink
  job:
jarURI: local:///opt/flink/examples/streaming/WordCount.jar
args:
  - --output
  - /tmp/1#.txt
parallelism: 2
upgradeMode: stateless
{code}
 

 

It will be error split when loading the flink-conf.yaml

!image-2022-12-28-17-13-25-351.png!

 

And i enter to the jobmanager's pod to saw the flink-conf.yaml's program-args 
is right

!image-2022-12-28-17-13-38-337.png!

 

Maybe we should have a more strict validate for yaml comment? this is the yaml 
editor by vim, we can see comment should be start with '#' in line or comment 
start with ' #'

!image-2022-12-28-18-03-40-806.png!

  was:
When i submit a flink jar job in Kubernetes Application mode which main args 
contains '#', it will be error split by 
org.apache.flink.configuration.GlobalConfiguration#loadYAMLResource

 

such as i using flink-kubernetes-operator to submit a job in kubernetes 
application mode

 
{code:java}
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: word-count
spec:
  image: apache/flink:1.16.0-scala_2.12-java8
  flinkVersion: v1_16
  flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
  jobManager:
resource:
  memory: "2048m"
  cpu: 1
  taskManager:
resource:
  memory: "2048m"
  cpu: 1
  serviceAccount: flink
  job:
jarURI: local:///opt/flink/examples/streaming/WordCount.jar
args:
  - --output
  - /tmp/1#.txt
parallelism: 2
upgradeMode: stateless
{code}
 

 

It will be error split when loading the flink-conf.yaml

!image-2022-12-28-17-13-25-351.png!

 

And i enter to the jobmanager's pod to saw the flink-conf.yaml's program-args 
is right

!image-2022-12-28-17-13-38-337.png!

 

Maybe we should have a more strict validate for yaml comment?

 


> Arguments contains with '#' will error split in loadYAMLResource
> 
>
> Key: FLINK-30520
> URL: https://issues.apache.org/jira/browse/FLINK-30520
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.16.0, 1.14.6, 1.15.3
>Reporter: tanjialiang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2022-12-28-17-13-25-351.png, 
> image-2022-12-28-17-13-38-337.png, image-2022-12-28-18-03-40-806.png
>
>
> When i submit a flink jar job in Kubernetes Application mode which main args 
> contains '#', it will be error split by 
> org.apache.flink.configuration.GlobalConfiguration#loadYAMLResource
>  
> such as i using flink-kubernetes-operator to submit a job in kubernetes 
> application mode
>  
> {code:java}
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   name: word-count
> spec:
>   image: apache/flink:1.16.0-scala_2.12-java8
>   flinkVersion: v1_16
>   flinkConfiguration:
> taskmanager.numberOfTaskSlots: "1"
>   jobManager:
> resource:
>   memory: "2048m"
>   cpu: 1
>   taskManager:
> resource:
>   memory: "2048m"
>   cpu: 1
>   serviceAccount: flink
>   job:
> jarURI: local:///opt/flink/examples/streaming/WordCount.jar
> args:
>   - --output
>   - /tmp/1#.txt
> parallelism: 2
> upgradeMode: stateless
> {code}
>  
>  
> It will be error split when loading the flink-conf.yaml
> !image-2022-12-28-17-13-25-351.png!
>  
> And i enter to the jobmanager's pod to saw the flink-conf.yaml's program-args 
> is right
> !image-2022-12-28-17-13-38-337.png!
>  
> Maybe we should have a more strict validate for yaml comment? this is the 
> yaml editor by vim, we can see comment should be start with '#' in line or 
> comment start with ' #'
> !image-2022-12-28-18-03-40-806.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30520) Arguments contains with '#' will error split in loadYAMLResource

2022-12-28 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-30520:

Attachment: image-2022-12-28-18-03-40-806.png

> Arguments contains with '#' will error split in loadYAMLResource
> 
>
> Key: FLINK-30520
> URL: https://issues.apache.org/jira/browse/FLINK-30520
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.16.0, 1.14.6, 1.15.3
>Reporter: tanjialiang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2022-12-28-17-13-25-351.png, 
> image-2022-12-28-17-13-38-337.png, image-2022-12-28-18-03-40-806.png
>
>
> When i submit a flink jar job in Kubernetes Application mode which main args 
> contains '#', it will be error split by 
> org.apache.flink.configuration.GlobalConfiguration#loadYAMLResource
>  
> such as i using flink-kubernetes-operator to submit a job in kubernetes 
> application mode
>  
> {code:java}
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   name: word-count
> spec:
>   image: apache/flink:1.16.0-scala_2.12-java8
>   flinkVersion: v1_16
>   flinkConfiguration:
> taskmanager.numberOfTaskSlots: "1"
>   jobManager:
> resource:
>   memory: "2048m"
>   cpu: 1
>   taskManager:
> resource:
>   memory: "2048m"
>   cpu: 1
>   serviceAccount: flink
>   job:
> jarURI: local:///opt/flink/examples/streaming/WordCount.jar
> args:
>   - --output
>   - /tmp/1#.txt
> parallelism: 2
> upgradeMode: stateless
> {code}
>  
>  
> It will be error split when loading the flink-conf.yaml
> !image-2022-12-28-17-13-25-351.png!
>  
> And i enter to the jobmanager's pod to saw the flink-conf.yaml's program-args 
> is right
> !image-2022-12-28-17-13-38-337.png!
>  
> Maybe we should have a more strict validate for yaml comment?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30520) Arguments contains with '#' will error split in loadYAMLResource

2022-12-28 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-30520:

Attachment: image-2022-12-28-17-13-38-337.png

> Arguments contains with '#' will error split in loadYAMLResource
> 
>
> Key: FLINK-30520
> URL: https://issues.apache.org/jira/browse/FLINK-30520
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.16.0, 1.14.6, 1.15.3
>Reporter: tanjialiang
>Priority: Major
> Attachments: image-2022-12-28-17-13-25-351.png, 
> image-2022-12-28-17-13-38-337.png
>
>
> When i submit a flink jar job in Kubernetes Application mode which main args 
> contains '#', it will be error split by 
> org.apache.flink.configuration.GlobalConfiguration#loadYAMLResource
>  
> such as i using flink-kubernetes-operator to submit a job in kubernetes 
> application mode
>  
> {code:java}
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   name: word-count
> spec:
>   image: apache/flink:1.16.0-scala_2.12-java8
>   flinkVersion: v1_16
>   flinkConfiguration:
> taskmanager.numberOfTaskSlots: "1"
>   jobManager:
> resource:
>   memory: "2048m"
>   cpu: 1
>   taskManager:
> resource:
>   memory: "2048m"
>   cpu: 1
>   serviceAccount: flink
>   job:
> jarURI: local:///opt/flink/examples/streaming/WordCount.jar
> args:
>   - --output
>   - /tmp/1#.txt
> parallelism: 2
> upgradeMode: stateless
> {code}
>  
>  
> It will be error split when loading the flink-conf.yaml
> !image-2022-12-28-16-30-30-645.png!
>  
> And i enter to the jobmanager's pod to saw the flink-conf.yaml's program-args 
> is right
> !image-2022-12-28-16-34-25-819.png!
>  
> Maybe we should have a more strict validate for yaml comment?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30520) Arguments contains with '#' will error split in loadYAMLResource

2022-12-28 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-30520:

Attachment: image-2022-12-28-17-13-25-351.png

> Arguments contains with '#' will error split in loadYAMLResource
> 
>
> Key: FLINK-30520
> URL: https://issues.apache.org/jira/browse/FLINK-30520
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.16.0, 1.14.6, 1.15.3
>Reporter: tanjialiang
>Priority: Major
> Attachments: image-2022-12-28-17-13-25-351.png, 
> image-2022-12-28-17-13-38-337.png
>
>
> When i submit a flink jar job in Kubernetes Application mode which main args 
> contains '#', it will be error split by 
> org.apache.flink.configuration.GlobalConfiguration#loadYAMLResource
>  
> such as i using flink-kubernetes-operator to submit a job in kubernetes 
> application mode
>  
> {code:java}
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   name: word-count
> spec:
>   image: apache/flink:1.16.0-scala_2.12-java8
>   flinkVersion: v1_16
>   flinkConfiguration:
> taskmanager.numberOfTaskSlots: "1"
>   jobManager:
> resource:
>   memory: "2048m"
>   cpu: 1
>   taskManager:
> resource:
>   memory: "2048m"
>   cpu: 1
>   serviceAccount: flink
>   job:
> jarURI: local:///opt/flink/examples/streaming/WordCount.jar
> args:
>   - --output
>   - /tmp/1#.txt
> parallelism: 2
> upgradeMode: stateless
> {code}
>  
>  
> It will be error split when loading the flink-conf.yaml
> !image-2022-12-28-16-30-30-645.png!
>  
> And i enter to the jobmanager's pod to saw the flink-conf.yaml's program-args 
> is right
> !image-2022-12-28-16-34-25-819.png!
>  
> Maybe we should have a more strict validate for yaml comment?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30520) Arguments contains with '#' will error split in loadYAMLResource

2022-12-28 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-30520:

Description: 
When i submit a flink jar job in Kubernetes Application mode which main args 
contains '#', it will be error split by 
org.apache.flink.configuration.GlobalConfiguration#loadYAMLResource

 

such as i using flink-kubernetes-operator to submit a job in kubernetes 
application mode

 
{code:java}
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: word-count
spec:
  image: apache/flink:1.16.0-scala_2.12-java8
  flinkVersion: v1_16
  flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
  jobManager:
resource:
  memory: "2048m"
  cpu: 1
  taskManager:
resource:
  memory: "2048m"
  cpu: 1
  serviceAccount: flink
  job:
jarURI: local:///opt/flink/examples/streaming/WordCount.jar
args:
  - --output
  - /tmp/1#.txt
parallelism: 2
upgradeMode: stateless
{code}
 

 

It will be error split when loading the flink-conf.yaml

!image-2022-12-28-17-13-25-351.png!

 

And i enter to the jobmanager's pod to saw the flink-conf.yaml's program-args 
is right

!image-2022-12-28-17-13-38-337.png!

 

Maybe we should have a more strict validate for yaml comment?

 

  was:
When i submit a flink jar job in Kubernetes Application mode which main args 
contains '#', it will be error split by 
org.apache.flink.configuration.GlobalConfiguration#loadYAMLResource

 

such as i using flink-kubernetes-operator to submit a job in kubernetes 
application mode

 
{code:java}
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: word-count
spec:
  image: apache/flink:1.16.0-scala_2.12-java8
  flinkVersion: v1_16
  flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
  jobManager:
resource:
  memory: "2048m"
  cpu: 1
  taskManager:
resource:
  memory: "2048m"
  cpu: 1
  serviceAccount: flink
  job:
jarURI: local:///opt/flink/examples/streaming/WordCount.jar
args:
  - --output
  - /tmp/1#.txt
parallelism: 2
upgradeMode: stateless
{code}
 

 

It will be error split when loading the flink-conf.yaml

!image-2022-12-28-16-30-30-645.png!

 

And i enter to the jobmanager's pod to saw the flink-conf.yaml's program-args 
is right

!image-2022-12-28-16-34-25-819.png!

 

Maybe we should have a more strict validate for yaml comment?

 


> Arguments contains with '#' will error split in loadYAMLResource
> 
>
> Key: FLINK-30520
> URL: https://issues.apache.org/jira/browse/FLINK-30520
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.16.0, 1.14.6, 1.15.3
>Reporter: tanjialiang
>Priority: Major
> Attachments: image-2022-12-28-17-13-25-351.png, 
> image-2022-12-28-17-13-38-337.png
>
>
> When i submit a flink jar job in Kubernetes Application mode which main args 
> contains '#', it will be error split by 
> org.apache.flink.configuration.GlobalConfiguration#loadYAMLResource
>  
> such as i using flink-kubernetes-operator to submit a job in kubernetes 
> application mode
>  
> {code:java}
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   name: word-count
> spec:
>   image: apache/flink:1.16.0-scala_2.12-java8
>   flinkVersion: v1_16
>   flinkConfiguration:
> taskmanager.numberOfTaskSlots: "1"
>   jobManager:
> resource:
>   memory: "2048m"
>   cpu: 1
>   taskManager:
> resource:
>   memory: "2048m"
>   cpu: 1
>   serviceAccount: flink
>   job:
> jarURI: local:///opt/flink/examples/streaming/WordCount.jar
> args:
>   - --output
>   - /tmp/1#.txt
> parallelism: 2
> upgradeMode: stateless
> {code}
>  
>  
> It will be error split when loading the flink-conf.yaml
> !image-2022-12-28-17-13-25-351.png!
>  
> And i enter to the jobmanager's pod to saw the flink-conf.yaml's program-args 
> is right
> !image-2022-12-28-17-13-38-337.png!
>  
> Maybe we should have a more strict validate for yaml comment?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30520) Arguments contains with '#' will error split in loadYAMLResource

2022-12-28 Thread tanjialiang (Jira)
tanjialiang created FLINK-30520:
---

 Summary: Arguments contains with '#' will error split in 
loadYAMLResource
 Key: FLINK-30520
 URL: https://issues.apache.org/jira/browse/FLINK-30520
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.15.3, 1.14.6, 1.16.0
Reporter: tanjialiang


When i submit a flink jar job in Kubernetes Application mode which main args 
contains '#', it will be error split by 
org.apache.flink.configuration.GlobalConfiguration#loadYAMLResource

 

such as i using flink-kubernetes-operator to submit a job in kubernetes 
application mode

 
{code:java}
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: word-count
spec:
  image: apache/flink:1.16.0-scala_2.12-java8
  flinkVersion: v1_16
  flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
  jobManager:
resource:
  memory: "2048m"
  cpu: 1
  taskManager:
resource:
  memory: "2048m"
  cpu: 1
  serviceAccount: flink
  job:
jarURI: local:///opt/flink/examples/streaming/WordCount.jar
args:
  - --output
  - /tmp/1#.txt
parallelism: 2
upgradeMode: stateless
{code}
 

 

It will be error split when loading the flink-conf.yaml

!image-2022-12-28-16-30-30-645.png!

 

And i enter to the jobmanager's pod to saw the flink-conf.yaml's program-args 
is right

!image-2022-12-28-16-34-25-819.png!

 

Maybe we should have a more strict validate for yaml comment?

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30498) frocksdb throw an UnsatisfiedLinkError

2022-12-25 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17651955#comment-17651955
 ] 

tanjialiang commented on FLINK-30498:
-

Hi [~Yanfei Lei] , i made a stupid mistake, it was my team's problem, so i 
close this issue. And yes, it was caused by thre conflict with the frocksdbjni 
jar, but not caused by the flink official jar, At first i think it was the 
flink docker's bug, because i found it in the 1.14 not in 1.15 and 1.16, but 
when i start the 1.14 cluster just in docker and without any extend jar, it was 
running success. So i think caused by extend jar in 1.14 cluster, at the end, i 
found it was my team's defined extend jar.

> frocksdb throw an UnsatisfiedLinkError
> --
>
> Key: FLINK-30498
> URL: https://issues.apache.org/jira/browse/FLINK-30498
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.14.6
>Reporter: tanjialiang
>Priority: Major
>
> Using an simple TopN SQL with table.exec.state.ttl options, statebackend use 
> rocksdb
> {code:java}
> SET table.exec.state.ttl = 1min;
> CREATE TABLE `student` (
>   id int,
>   name string,
>   update_time bigint
> ) WITH (
>   'connector' = 'datagen',
>   'fields.id.kind' = 'random',
>   'fields.id.min' = '0',
>   'fields.id.max' = '100'
> );
> CREATE TABLE `blackhole` (
>   id int,
>   name string,
>   update_time bigint
> ) WITH (
>   'connector' = 'blackhole'
> );
> INSERT INTO blackhole
> SELECT 
>     id, 
>     name,
>     update_time
> FROM (
>   SELECT
>       id, 
>       name,
>       update_time,
>       ROW_NUMBER() OVER (PARTITION BY id ORDER BY update_time DESC) AS 
> row_num 
>   FROM student
> ) AS t
> WHERE t.row_num=1; {code}
> It will throw an error from frocksdb
> {code:java}
> java.lang.UnsatisfiedLinkError: 
> org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder()J
>     at 
> org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder(Native
>  Method) ~[flink-dist_2.12-1.14.2.jar:?]
>     at 
> org.rocksdb.FlinkCompactionFilter.access$000(FlinkCompactionFilter.java:14) 
> ~[flink-dist_2.12-1.14.2.jar:?]
>     at 
> org.rocksdb.FlinkCompactionFilter$ConfigHolder.(FlinkCompactionFilter.java:115)
>  ~[flink-dist_2.12-1.14.2.jar:?]
>     at 
> org.rocksdb.FlinkCompactionFilter$FlinkCompactionFilterFactory.(FlinkCompactionFilter.java:142)
>  ~[flink-dist_2.12-1.14.2.jar:?]
>     at 
> org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.createAndSetCompactFilterFactory(RocksDbTtlCompactFiltersManager.java:84)
>  ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>     at 
> org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(RocksDbTtlCompactFiltersManager.java:74)
>  ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyDescriptor(RocksDBOperationUtils.java:157)
>  ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createStateInfo(RocksDBOperationUtils.java:134)
>  ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:643)
>  ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
>  ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>     at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:225)
>  ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>     at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:148)
>  ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>     at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:132)
>  ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>     at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
>  ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>     at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302)
>  ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>     at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353)
>  ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>     at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
>  ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>     at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>  ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>     at 
> 

[jira] [Closed] (FLINK-30498) frocksdb throw an UnsatisfiedLinkError

2022-12-23 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang closed FLINK-30498.
---
Resolution: Not A Problem

> frocksdb throw an UnsatisfiedLinkError
> --
>
> Key: FLINK-30498
> URL: https://issues.apache.org/jira/browse/FLINK-30498
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.14.6
>Reporter: tanjialiang
>Priority: Major
>
> Using an simple TopN SQL with table.exec.state.ttl options, statebackend use 
> rocksdb
> {code:java}
> SET table.exec.state.ttl = 1min;
> CREATE TABLE `student` (
>   id int,
>   name string,
>   update_time bigint
> ) WITH (
>   'connector' = 'datagen',
>   'fields.id.kind' = 'random',
>   'fields.id.min' = '0',
>   'fields.id.max' = '100'
> );
> CREATE TABLE `blackhole` (
>   id int,
>   name string,
>   update_time bigint
> ) WITH (
>   'connector' = 'blackhole'
> );
> INSERT INTO blackhole
> SELECT 
>     id, 
>     name,
>     update_time
> FROM (
>   SELECT
>       id, 
>       name,
>       update_time,
>       ROW_NUMBER() OVER (PARTITION BY id ORDER BY update_time DESC) AS 
> row_num 
>   FROM student
> ) AS t
> WHERE t.row_num=1; {code}
> It will throw an error from frocksdb
> {code:java}
> java.lang.UnsatisfiedLinkError: 
> org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder()J
>     at 
> org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder(Native
>  Method) ~[flink-dist_2.12-1.14.2.jar:?]
>     at 
> org.rocksdb.FlinkCompactionFilter.access$000(FlinkCompactionFilter.java:14) 
> ~[flink-dist_2.12-1.14.2.jar:?]
>     at 
> org.rocksdb.FlinkCompactionFilter$ConfigHolder.(FlinkCompactionFilter.java:115)
>  ~[flink-dist_2.12-1.14.2.jar:?]
>     at 
> org.rocksdb.FlinkCompactionFilter$FlinkCompactionFilterFactory.(FlinkCompactionFilter.java:142)
>  ~[flink-dist_2.12-1.14.2.jar:?]
>     at 
> org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.createAndSetCompactFilterFactory(RocksDbTtlCompactFiltersManager.java:84)
>  ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>     at 
> org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(RocksDbTtlCompactFiltersManager.java:74)
>  ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyDescriptor(RocksDBOperationUtils.java:157)
>  ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createStateInfo(RocksDBOperationUtils.java:134)
>  ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:643)
>  ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
>  ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>     at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:225)
>  ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>     at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:148)
>  ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>     at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:132)
>  ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>     at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
>  ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>     at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302)
>  ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>     at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353)
>  ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>     at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
>  ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>     at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>  ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>     at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:203)
>  ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>     at 
> org.apache.flink.table.runtime.operators.rank.FastTop1Function.open(FastTop1Function.java:114)
>  ~[flink-table_2.12-1.14.2.jar:1.14.2]
>     at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>  ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>     at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
> 

[jira] [Updated] (FLINK-30498) frocksdb throw an UnsatisfiedLinkError

2022-12-23 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-30498:

Description: 
Using an simple TopN SQL with table.exec.state.ttl options, statebackend use 
rocksdb
{code:java}
SET table.exec.state.ttl = 1min;

CREATE TABLE `student` (
  id int,
  name string,
  update_time bigint
) WITH (
  'connector' = 'datagen',
  'fields.id.kind' = 'random',
  'fields.id.min' = '0',
  'fields.id.max' = '100'
);

CREATE TABLE `blackhole` (
  id int,
  name string,
  update_time bigint
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole
SELECT 
    id, 
    name,
    update_time
FROM (
  SELECT
      id, 
      name,
      update_time,
      ROW_NUMBER() OVER (PARTITION BY id ORDER BY update_time DESC) AS row_num 
  FROM student
) AS t
WHERE t.row_num=1; {code}
It will throw an error from frocksdb
{code:java}
java.lang.UnsatisfiedLinkError: 
org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder()J
    at 
org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder(Native
 Method) ~[flink-dist_2.12-1.14.2.jar:?]
    at 
org.rocksdb.FlinkCompactionFilter.access$000(FlinkCompactionFilter.java:14) 
~[flink-dist_2.12-1.14.2.jar:?]
    at 
org.rocksdb.FlinkCompactionFilter$ConfigHolder.(FlinkCompactionFilter.java:115)
 ~[flink-dist_2.12-1.14.2.jar:?]
    at 
org.rocksdb.FlinkCompactionFilter$FlinkCompactionFilterFactory.(FlinkCompactionFilter.java:142)
 ~[flink-dist_2.12-1.14.2.jar:?]
    at 
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.createAndSetCompactFilterFactory(RocksDbTtlCompactFiltersManager.java:84)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(RocksDbTtlCompactFiltersManager.java:74)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyDescriptor(RocksDBOperationUtils.java:157)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createStateInfo(RocksDBOperationUtils.java:134)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:643)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:225)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:148)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:132)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:203)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.table.runtime.operators.rank.FastTop1Function.open(FastTop1Function.java:114)
 ~[flink-table_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:55)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
 

[jira] [Updated] (FLINK-30498) frocksdb throw an UnsatisfiedLinkError

2022-12-23 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-30498:

Description: 
Using an simple TopN SQL with table.exec.state.ttl options, statebackend use 
rocksdb
{code:java}
SET table.exec.state.ttl = 1min;

CREATE TABLE `student` (
  id int,
  name string,
  update_time bigint
) WITH (
  'connector' = 'datagen',
  'fields.id.kind' = 'random',
  'fields.id.min' = '0',
  'fields.id.max' = '100'
);

CREATE TABLE `blackhole` (
  id int,
  name string,
  update_time bigint
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole
SELECT 
    id, 
    name,
    update_time
FROM (
  SELECT
      id, 
      name,
      update_time,
      ROW_NUMBER() OVER (PARTITION BY id ORDER BY update_time DESC) AS row_num 
  FROM student
) AS t
WHERE t.row_num=1; {code}
It will throw an error from frocksdb
{code:java}
java.lang.UnsatisfiedLinkError: 
org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder()J
    at 
org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder(Native
 Method) ~[flink-dist_2.12-1.14.2.jar:?]
    at 
org.rocksdb.FlinkCompactionFilter.access$000(FlinkCompactionFilter.java:14) 
~[flink-dist_2.12-1.14.2.jar:?]
    at 
org.rocksdb.FlinkCompactionFilter$ConfigHolder.(FlinkCompactionFilter.java:115)
 ~[flink-dist_2.12-1.14.2.jar:?]
    at 
org.rocksdb.FlinkCompactionFilter$FlinkCompactionFilterFactory.(FlinkCompactionFilter.java:142)
 ~[flink-dist_2.12-1.14.2.jar:?]
    at 
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.createAndSetCompactFilterFactory(RocksDbTtlCompactFiltersManager.java:84)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(RocksDbTtlCompactFiltersManager.java:74)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyDescriptor(RocksDBOperationUtils.java:157)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createStateInfo(RocksDBOperationUtils.java:134)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:643)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:225)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:148)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:132)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:203)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.table.runtime.operators.rank.FastTop1Function.open(FastTop1Function.java:114)
 ~[flink-table_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:55)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
 

[jira] [Created] (FLINK-30498) frocksdb throw an UnsatisfiedLinkError

2022-12-23 Thread tanjialiang (Jira)
tanjialiang created FLINK-30498:
---

 Summary: frocksdb throw an UnsatisfiedLinkError
 Key: FLINK-30498
 URL: https://issues.apache.org/jira/browse/FLINK-30498
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.14.6
Reporter: tanjialiang


Using an simple TopN SQL with table.exec.state.ttl options, statebackend use 
rocksdb
{code:java}
SET table.exec.state.ttl = 1min;

CREATE TABLE `student` (
  id int,
  name string,
  update_time bigint
) WITH (
  'connector' = 'datagen',
  'fields.id.kind' = 'random',
  'fields.id.min' = '0',
  'fields.id.max' = '100'
);

CREATE TABLE `blackhole` (
  id int,
  name string,
  update_time bigint
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole
SELECT 
    id, 
    name,
    update_time
FROM (
  SELECT
      id, 
      name,
      update_time,
      ROW_NUMBER() OVER (PARTITION BY id ORDER BY update_time DESC) AS row_num 
  FROM student
) AS t
WHERE t.row_num=1; {code}
It will throw an error from frocksdb
{code:java}
java.lang.UnsatisfiedLinkError: 
org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder()J
    at 
org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder(Native
 Method) ~[flink-dist_2.12-1.14.2.jar:?]
    at 
org.rocksdb.FlinkCompactionFilter.access$000(FlinkCompactionFilter.java:14) 
~[flink-dist_2.12-1.14.2.jar:?]
    at 
org.rocksdb.FlinkCompactionFilter$ConfigHolder.(FlinkCompactionFilter.java:115)
 ~[flink-dist_2.12-1.14.2.jar:?]
    at 
org.rocksdb.FlinkCompactionFilter$FlinkCompactionFilterFactory.(FlinkCompactionFilter.java:142)
 ~[flink-dist_2.12-1.14.2.jar:?]
    at 
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.createAndSetCompactFilterFactory(RocksDbTtlCompactFiltersManager.java:84)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(RocksDbTtlCompactFiltersManager.java:74)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyDescriptor(RocksDBOperationUtils.java:157)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createStateInfo(RocksDBOperationUtils.java:134)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:643)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:225)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:148)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:132)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:203)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.table.runtime.operators.rank.FastTop1Function.open(FastTop1Function.java:114)
 ~[flink-table_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:55)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
    at 

[jira] [Updated] (FLINK-30460) Support writing HBase's CEIL TTL metadata

2022-12-20 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-30460:

Description: 
When i using flink sql to sink hbase, i found i can't set the ceil ttl.  Can we 
support writing the ceil ttl metadata like this?
{code:java}
CREATE TABLE hTable (
 rowkey INT,
 family1 ROW,
 ttl BIGINT METADATA FROM 'ttl',
 PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
 'connector' = 'hbase-2.2',
 'table-name' = 'mytable',
 'zookeeper.quorum' = 'localhost:2181'
);{code}
 

 

  was:
When i using flink sql to sink hbase, i found i can't set the ceil ttl.  Can we 
support writing the ceil ttl metadata like this?

 
{code:java}
CREATE TABLE hTable (
 rowkey INT,
 family1 ROW,
 ttl BIGINT METADATA FROM 'ttl',
 PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
 'connector' = 'hbase-2.2',
 'table-name' = 'mytable',
 'zookeeper.quorum' = 'localhost:2181'
);{code}
 

 


> Support writing HBase's CEIL TTL metadata
> -
>
> Key: FLINK-30460
> URL: https://issues.apache.org/jira/browse/FLINK-30460
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / HBase
>Reporter: tanjialiang
>Priority: Major
>
> When i using flink sql to sink hbase, i found i can't set the ceil ttl.  Can 
> we support writing the ceil ttl metadata like this?
> {code:java}
> CREATE TABLE hTable (
>  rowkey INT,
>  family1 ROW,
>  ttl BIGINT METADATA FROM 'ttl',
>  PRIMARY KEY (rowkey) NOT ENFORCED
> ) WITH (
>  'connector' = 'hbase-2.2',
>  'table-name' = 'mytable',
>  'zookeeper.quorum' = 'localhost:2181'
> );{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30460) Support writing HBase's CEIL TTL metadata

2022-12-20 Thread tanjialiang (Jira)
tanjialiang created FLINK-30460:
---

 Summary: Support writing HBase's CEIL TTL metadata
 Key: FLINK-30460
 URL: https://issues.apache.org/jira/browse/FLINK-30460
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / HBase
Reporter: tanjialiang


When i using flink sql to sink hbase, i found i can't set the ceil ttl.  Can we 
support writing the ceil ttl metadata like this?

 
{code:java}
CREATE TABLE hTable (
 rowkey INT,
 family1 ROW,
 ttl BIGINT METADATA FROM 'ttl',
 PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
 'connector' = 'hbase-2.2',
 'table-name' = 'mytable',
 'zookeeper.quorum' = 'localhost:2181'
);{code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30411) Flink deployment stuck in UPGRADING state when deploy flinkdeployment without resource

2022-12-14 Thread tanjialiang (Jira)
tanjialiang created FLINK-30411:
---

 Summary: Flink deployment stuck in UPGRADING state when deploy 
flinkdeployment without resource
 Key: FLINK-30411
 URL: https://issues.apache.org/jira/browse/FLINK-30411
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: 1.16.0
Reporter: tanjialiang
 Attachments: image-2022-12-14-17-22-12-656.png

In flink kubernetes operator 1.2.0. When i deploy a flinkdeployments without 
resource, the flink deployment stuck in UPGRADING state.
{code:java}
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: socket-window-word-count
spec:
image: flink:1.16.0-scala_2.12-java8
flinkVersion: v1_16
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
job:
jarURI: local:///opt/flink/examples/streaming/WordCount.jar
parallelism: 2
upgradeMode: stateless{code}
 

when i kubectl describe flinkdeployments, i found this error message

!image-2022-12-14-17-22-12-656.png!

 

maybe we can validate it when apply flinkdeployment?  When it is invalid, throw 
an error rather than apply flinkdeployment succeed.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30222) Suspended a job in last-state mode bug

2022-11-27 Thread tanjialiang (Jira)
tanjialiang created FLINK-30222:
---

 Summary: Suspended a job in last-state mode bug
 Key: FLINK-30222
 URL: https://issues.apache.org/jira/browse/FLINK-30222
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.2.0, 1.16.0
Reporter: tanjialiang
 Attachments: image-2022-11-27-16-48-08-445.png

In flink 1.16.0, it support set kubernetes HA with options 'kubernetes', such 
as 'high-availability: kubernetes'. But in kubernetes operator 1.2.0, I try to 
suspended a job in last-state mode, it validate fail, because of 'Job could not 
be upgraded with last-state while Kubernetes HA disabled'.

 

I try to use kubectl patch to supsended a job with last-state
{code:sh}
kubectl -nbigdata-flink patch 
flinkdeployments.flink.apache.org/streaming-638223bf650ac869689faa62-flink 
--type=merge -p '{"spec": {"job":
{"state": "suspended", "upgradeMode": "last-state"}{code}
it found an error, because my kubernetes HA is disabled
{code:java}
Error from server: admission webhook "flinkoperator.flink.apache.org" denied 
the request: Job could not be upgraded with last-state while Kubernetes HA 
disabled {code}
but i enabled kubernetes HA with this follow options:
{code:yaml}
kubernetes.cluster-id: 
high-availability: kubernetes
high-availability.storageDir: hdfs:///flink/recovery {code}
and i found flink kubernetes operator 1.2.0 validate the kubernetes HA in the 
old options:
{code:yaml}
high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory {code}
it may be in the 
org.apache.flink.kubernetes.operator.utils.FlinkUtils#isKubernetesHAActivated 
to judge.

!image-2022-11-27-16-48-08-445.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28728) Support to set the end offset for streaming connector

2022-08-02 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17574091#comment-17574091
 ] 

tanjialiang commented on FLINK-28728:
-

[~martijnvisser] Such as kafka, when kafka's data is not frequently, i would 
like to consume it's data by scheduing rather than a long-term running job, 
because a long-term running job will wasting the cluster resource. I think this 
kind of job running in batch is ok, but i don't known if the job running in 
batch, can it do savepoint when the job batch finished? So that i can restore 
the job from savepoint in the next schedule.

> Support to set the end offset for streaming connector
> -
>
> Key: FLINK-28728
> URL: https://issues.apache.org/jira/browse/FLINK-28728
> Project: Flink
>  Issue Type: Improvement
>Reporter: tanjialiang
>Priority: Major
>
> Like MQ or CDC connector, it support set the startup mode for reading, but 
> not support set the end offset/position. When the MQ's or CDC's data is not 
> frequently,  consume their data in period can cut lots of kubernetes/YARN 
> resource, such as startup from earliest, and end to the current offset, save 
> current offset in savepoint, and using the savepoint to startup in the next 
> period time, in a loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28728) Support to set the end offset for streaming connector

2022-07-28 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-28728:

Description: Like MQ or CDC connector, it support set the startup mode for 
reading, but not support set the end offset/position. When the MQ's or CDC's 
data is not frequently,  consume their data in period can cut lots of 
kubernetes/YARN resource, such as startup from earliest, and end to the current 
offset, save current offset in savepoint, and using the savepoint to startup in 
the next period time, in a loop.  (was: Like MQ or CDC connector, it support 
set the startup mode for reading, but not support set the end offset/position. 
When the MQ's or CDC's data is not frequently,  consume their data in period 
can cut lots of kubernetes/YARN resource, such as startup from earliest, and 
end to  the current offset, save current offset in savepoint, and using the 
savepoint to startup in the next period time, in a loop.)

> Support to set the end offset for streaming connector
> -
>
> Key: FLINK-28728
> URL: https://issues.apache.org/jira/browse/FLINK-28728
> Project: Flink
>  Issue Type: Improvement
>Reporter: tanjialiang
>Priority: Major
>
> Like MQ or CDC connector, it support set the startup mode for reading, but 
> not support set the end offset/position. When the MQ's or CDC's data is not 
> frequently,  consume their data in period can cut lots of kubernetes/YARN 
> resource, such as startup from earliest, and end to the current offset, save 
> current offset in savepoint, and using the savepoint to startup in the next 
> period time, in a loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28728) Support to set the end offset for streaming connector

2022-07-28 Thread tanjialiang (Jira)
tanjialiang created FLINK-28728:
---

 Summary: Support to set the end offset for streaming connector
 Key: FLINK-28728
 URL: https://issues.apache.org/jira/browse/FLINK-28728
 Project: Flink
  Issue Type: Improvement
Reporter: tanjialiang


Like MQ or CDC connector, it support set the startup mode for reading, but not 
support set the end offset/position. When the MQ's or CDC's data is not 
frequently,  consume their data in period can cut lots of kubernetes/YARN 
resource, such as startup from earliest, and end to  the current offset, save 
current offset in savepoint, and using the savepoint to startup in the next 
period time, in a loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28303) Kafka SQL Connector loses data when restoring from a savepoint with a topic with empty partitions

2022-06-30 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17560965#comment-17560965
 ] 

tanjialiang commented on FLINK-28303:
-

I found this bug in flink-1.14.2 too. When some partition is empty, this bug 
will be appear.

[~tzulitai] And I'm sorry to tell you the "auto.offset.reset" is invalid in 
1.14, because of this bug:https://issues.apache.org/jira/browse/FLINK-24697 

> Kafka SQL Connector loses data when restoring from a savepoint with a topic 
> with empty partitions
> -
>
> Key: FLINK-28303
> URL: https://issues.apache.org/jira/browse/FLINK-28303
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4
>Reporter: Robert Metzger
>Priority: Major
>
> Steps to reproduce:
> - Set up a Kafka topic with 10 partitions
> - produce records 0-9 into the topic
> - take a savepoint and stop the job
> - produce records 10-19 into the topic
> - restore the job from the savepoint.
> The job will be missing usually 2-4 records from 10-19.
> My assumption is that if a partition never had data (which is likely with 10 
> partitions and 10 records), the savepoint will only contain offsets for 
> partitions with data. 
> While the job was offline (and we've written record 10-19 into the topic), 
> all partitions got filled. Now, when Kafka comes online again, it will use 
> the "latest" offset for those partitions, skipping some data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-27574) [QUESTION] In Flink k8s Application mode with HA can not using History Server for history backend

2022-05-11 Thread tanjialiang (Jira)
tanjialiang created FLINK-27574:
---

 Summary: [QUESTION] In Flink k8s Application mode with HA can not 
using History Server for history backend
 Key: FLINK-27574
 URL: https://issues.apache.org/jira/browse/FLINK-27574
 Project: Flink
  Issue Type: Technical Debt
Reporter: tanjialiang


In Flink k8s application mode with high-availability, it's job id always 
00, but in history server, it make job's id for the key. Can I modify 
the job id in HA?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27110) using TIMESTAMP_LTZ cause Flink web ui watermark display problem

2022-04-07 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-27110:

Attachment: image-2022-04-07-17-03-38-007.png

> using TIMESTAMP_LTZ cause Flink web ui watermark display problem
> 
>
> Key: FLINK-27110
> URL: https://issues.apache.org/jira/browse/FLINK-27110
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend
>Affects Versions: 1.13.1, 1.14.2
>Reporter: tanjialiang
>Priority: Not a Priority
> Attachments: image-2022-04-07-17-00-52-388.png, 
> image-2022-04-07-17-03-38-007.png
>
>
> when a Flink SQL using TIMESTAMP_LTZ or TIMESTAMP, it will convert their 
> timezone when compute. But in flink web ui, the watermark display without 
> convert. 
>  
> !image-2022-04-07-17-00-52-388.png!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-27110) using TIMESTAMP_LTZ cause Flink web ui watermark display problem

2022-04-07 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-27110:

Description: 
when a Flink SQL using TIMESTAMP_LTZ or TIMESTAMP, it will convert their 
timezone when compute. But in flink web ui, the watermark display without 
convert. 

 

!image-2022-04-07-17-03-38-007.png!

  was:
when a Flink SQL using TIMESTAMP_LTZ or TIMESTAMP, it will convert their 
timezone when compute. But in flink web ui, the watermark display without 
convert. 

 

!image-2022-04-07-17-00-52-388.png!


> using TIMESTAMP_LTZ cause Flink web ui watermark display problem
> 
>
> Key: FLINK-27110
> URL: https://issues.apache.org/jira/browse/FLINK-27110
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend
>Affects Versions: 1.13.1, 1.14.2
>Reporter: tanjialiang
>Priority: Not a Priority
> Attachments: image-2022-04-07-17-00-52-388.png, 
> image-2022-04-07-17-03-38-007.png
>
>
> when a Flink SQL using TIMESTAMP_LTZ or TIMESTAMP, it will convert their 
> timezone when compute. But in flink web ui, the watermark display without 
> convert. 
>  
> !image-2022-04-07-17-03-38-007.png!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-27110) using TIMESTAMP_LTZ cause Flink web ui watermark display problem

2022-04-07 Thread tanjialiang (Jira)
tanjialiang created FLINK-27110:
---

 Summary: using TIMESTAMP_LTZ cause Flink web ui watermark display 
problem
 Key: FLINK-27110
 URL: https://issues.apache.org/jira/browse/FLINK-27110
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Web Frontend
Affects Versions: 1.14.2, 1.13.1
Reporter: tanjialiang
 Attachments: image-2022-04-07-17-00-52-388.png

when a Flink SQL using TIMESTAMP_LTZ or TIMESTAMP, it will convert their 
timezone when compute. But in flink web ui, the watermark display without 
convert. 

 

!image-2022-04-07-17-00-52-388.png!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25618) Data quality by apache flink

2022-01-12 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17474436#comment-17474436
 ] 

tanjialiang commented on FLINK-25618:
-

[~MartijnVisser] Thanks for your reply. I had started a thread in 
[https://lists.apache.org/thread/lytryplwttln192brn91q32f2trw8k5v|https://lists.apache.org/thread/lytryplwttln192brn91q32f2trw8k5v,]

 

> Data quality by apache flink
> 
>
> Key: FLINK-25618
> URL: https://issues.apache.org/jira/browse/FLINK-25618
> Project: Flink
>  Issue Type: New Feature
>Reporter: tanjialiang
>Priority: Not a Priority
>
> This is discussing about how to support data quality through apache flink.
> For example, I has a sql job, a table in this job has a column named phone, 
> and the data of the column phone must match the pattern of telephone, if not 
> match, i can choose drop it or ignored, and we can mark it in the metrics, so 
> that user can monitor the data of quality in source and sink.
> After this, user can kown about the data quality from the source and sink, it 
> is useful for the downstream.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-25618) Data quality by apache flink

2022-01-12 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17474436#comment-17474436
 ] 

tanjialiang edited comment on FLINK-25618 at 1/12/22, 10:50 AM:


[~MartijnVisser] Thanks for your reply. I had started a thread in 
[https://lists.apache.org/thread/lytryplwttln192brn91q32f2trw8k5v|https://lists.apache.org/thread/lytryplwttln192brn91q32f2trw8k5v,]


was (Author: JIRAUSER279823):
[~MartijnVisser] Thanks for your reply. I had started a thread in 
[https://lists.apache.org/thread/lytryplwttln192brn91q32f2trw8k5v|https://lists.apache.org/thread/lytryplwttln192brn91q32f2trw8k5v,]

 

> Data quality by apache flink
> 
>
> Key: FLINK-25618
> URL: https://issues.apache.org/jira/browse/FLINK-25618
> Project: Flink
>  Issue Type: New Feature
>Reporter: tanjialiang
>Priority: Not a Priority
>
> This is discussing about how to support data quality through apache flink.
> For example, I has a sql job, a table in this job has a column named phone, 
> and the data of the column phone must match the pattern of telephone, if not 
> match, i can choose drop it or ignored, and we can mark it in the metrics, so 
> that user can monitor the data of quality in source and sink.
> After this, user can kown about the data quality from the source and sink, it 
> is useful for the downstream.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25618) Data quality by apache flink

2022-01-11 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-25618:

Description: 
This is discussing about how to support data quality through apache flink.

For example, I has a sql job, a table in this job has a column named phone, and 
the data of the column phone must match the pattern of telephone, if not match, 
i can choose drop it or ignored, and we can mark it in the metrics, so that 
user can monitor the data of quality in source and sink.

After this, user can kown about the data quality from the source and sink, it 
is useful for the downstream.

  was:
This is discussing about how to support data quality through apache flink.

For example, I has a sql job, a table in this job has a column named phone, and 
the data of the column phone must match the pattern of telephone, if not match, 
i can choose drop it or ignored, and we can mark it in the metrics, so that 
user can monitor the data of quality in source and sink.

After this, user can kown about the data quality of the source and sink, it is 
useful for the downstream.


> Data quality by apache flink
> 
>
> Key: FLINK-25618
> URL: https://issues.apache.org/jira/browse/FLINK-25618
> Project: Flink
>  Issue Type: New Feature
>Reporter: tanjialiang
>Priority: Not a Priority
>
> This is discussing about how to support data quality through apache flink.
> For example, I has a sql job, a table in this job has a column named phone, 
> and the data of the column phone must match the pattern of telephone, if not 
> match, i can choose drop it or ignored, and we can mark it in the metrics, so 
> that user can monitor the data of quality in source and sink.
> After this, user can kown about the data quality from the source and sink, it 
> is useful for the downstream.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25618) Data quality by apache flink

2022-01-11 Thread tanjialiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tanjialiang updated FLINK-25618:

Description: 
This is discussing about how to support data quality through apache flink.

For example, I has a sql job, a table in this job has a column named phone, and 
the data of the column phone must match the pattern of telephone, if not match, 
i can choose drop it or ignored, and we can mark it in the metrics, so that 
user can monitor the data of quality in source and sink.

After this, user can kown about the data quality of the source and sink, it is 
useful for the downstream.

  was:
This is discussing about how to support data quality through apache flink.

For example, I has a sql job, a table in this job has a column name phone, and 
the data of the column phone must match the pattern of telephone, if not match, 
i can choose drop it or ignored, and we can mark it in the metrics, so that 
user can monitor the data of quality in source and sink.


> Data quality by apache flink
> 
>
> Key: FLINK-25618
> URL: https://issues.apache.org/jira/browse/FLINK-25618
> Project: Flink
>  Issue Type: New Feature
>Reporter: tanjialiang
>Priority: Not a Priority
>
> This is discussing about how to support data quality through apache flink.
> For example, I has a sql job, a table in this job has a column named phone, 
> and the data of the column phone must match the pattern of telephone, if not 
> match, i can choose drop it or ignored, and we can mark it in the metrics, so 
> that user can monitor the data of quality in source and sink.
> After this, user can kown about the data quality of the source and sink, it 
> is useful for the downstream.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25618) Data quality by apache flink

2022-01-11 Thread tanjialiang (Jira)
tanjialiang created FLINK-25618:
---

 Summary: Data quality by apache flink
 Key: FLINK-25618
 URL: https://issues.apache.org/jira/browse/FLINK-25618
 Project: Flink
  Issue Type: New Feature
Reporter: tanjialiang


This is discussing about how to support data quality through apache flink.

For example, I has a sql job, a table in this job has a column name phone, and 
the data of the column phone must match the pattern of telephone, if not match, 
i can choose drop it or ignored, and we can mark it in the metrics, so that 
user can monitor the data of quality in source and sink.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


  1   2   >