[jira] [Created] (FLINK-35233) HBase lookup result is wrong when lookup cache is enabled
tanjialiang created FLINK-35233: --- Summary: HBase lookup result is wrong when lookup cache is enabled Key: FLINK-35233 URL: https://issues.apache.org/jira/browse/FLINK-35233 Project: Flink Issue Type: Bug Components: Connectors / HBase Affects Versions: hbase-3.0.0 Reporter: tanjialiang HBase table ||rowkey||name||age|| |1|ben|18| |2|ken|19| |3|mark|20| FlinkSQL lookup join with lookup cahce {code:java} CREATE TABLE dim_user ( rowkey STRING, info ROW, PRIMARY KEY (rowkey) NOT ENFORCED ) WITH ( 'connector' = 'hbase-2.2', 'zookeeper.quorum' = 'localhost:2181', 'zookeeper.znode.parent' = '/hbase', 'table-name' = 'default:test', 'lookup.cache' = 'PARTIAL', 'lookup.partial-cache.max-rows' = '1000', 'lookup.partial-cache.expire-after-write' = '1h' ); CREATE VIEW user_click AS SELECT user_id, proctime() AS proc_time FROM ( VALUES('1'), ('2'), ('3'), ('1'), ('2') ) AS t (user_id); SELECT user_id, info.name, info.age FROM user_click INNER JOIN dim_user FOR SYSTEM_TIME AS OF user_click.proc_time ON dim_user.rowkey = user_click.user_id;{code} Expect Result ||rowkey||name||age|| |1|ben|18| |2|ken|19| |3|mark|20| |1|ben|18| |2|ken|19| Actual Result ||rowkey||name||age|| |1|ben|18| |2|ken|19| |3|mark|20| |1|mark|20| |2|mark|20| Wrong result when we lookup user_id 1 and 2 the second time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29370) Protobuf in flink-sql-protobuf is not shaded
[ https://issues.apache.org/jira/browse/FLINK-29370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822169#comment-17822169 ] tanjialiang commented on FLINK-29370: - [~jark] [~libenchao] [~maosuhan] I found that flink-sql-orc has the protobuf dependency without shading, and it conflicts with the flink-sql-protobuf, my flink version is 1.16.1. For now, the temporary solution is to shade the protobuf dependency in both the flink-sql-protobuf and user-proto classes by myself. > Protobuf in flink-sql-protobuf is not shaded > > > Key: FLINK-29370 > URL: https://issues.apache.org/jira/browse/FLINK-29370 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.16.0 >Reporter: Jark Wu >Priority: Major > > The protobuf classes in flink-sql-protobuf is not shaded which may lead to > class conflicts. Usually, sql jars should shade common used dependencies, > e.g. flink-sql-avro: > https://github.com/apache/flink/blob/master/flink-formats/flink-sql-avro/pom.xml#L88 > > {code} > ➜ Downloads jar tvf flink-sql-protobuf-1.16.0.jar | grep com.google > 0 Tue Sep 13 20:23:44 CST 2022 com/google/ > 0 Tue Sep 13 20:23:44 CST 2022 com/google/protobuf/ >568 Tue Sep 13 20:23:44 CST 2022 > com/google/protobuf/ProtobufInternalUtils.class > 19218 Tue Sep 13 20:23:44 CST 2022 > com/google/protobuf/AbstractMessage$Builder.class >259 Tue Sep 13 20:23:44 CST 2022 > com/google/protobuf/AbstractMessage$BuilderParent.class > 10167 Tue Sep 13 20:23:44 CST 2022 com/google/protobuf/AbstractMessage.class > 1486 Tue Sep 13 20:23:44 CST 2022 > com/google/protobuf/AbstractMessageLite$Builder$LimitedInputStream.class > 12399 Tue Sep 13 20:23:44 CST 2022 > com/google/protobuf/AbstractMessageLite$Builder.class >279 Tue Sep 13 20:23:44 CST 2022 > com/google/protobuf/AbstractMessageLite$InternalOneOfEnu > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34089) Missing check of subscribed and assigned topics at job startup
tanjialiang created FLINK-34089: --- Summary: Missing check of subscribed and assigned topics at job startup Key: FLINK-34089 URL: https://issues.apache.org/jira/browse/FLINK-34089 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: kafka-3.0.2 Reporter: tanjialiang When we unsubscribe a topic and still restore from the old state, the job will still read data from the unsubscribed topic. I think we should check if that the subscribed topic partitions match the assigned partitions, and throw an error so the user can handle it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-24533) Flink SQL Upsert To Hbase Appear data loss
[ https://issues.apache.org/jira/browse/FLINK-24533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17794492#comment-17794492 ] tanjialiang commented on FLINK-24533: - Hi [~licp], sorry for late to notice this ticket, it has been fixed in FLINK-33304. > Flink SQL Upsert To Hbase Appear data loss > - > > Key: FLINK-24533 > URL: https://issues.apache.org/jira/browse/FLINK-24533 > Project: Flink > Issue Type: Improvement > Components: Connectors / HBase >Affects Versions: 1.12.0 > Environment: Flink 1.12.0 on Yarn > HBase1.4 >Reporter: licp >Priority: Major > Attachments: Check_Result.png, Specify_some_key.png > > > Data flow direction is described below: > Source:Mysql > Sink:HBase > Parallelism:1 > ---Code Example One Using Left Join --- > -- {color:#FF}Mysql Source ,Total Records:4829{color} > create table user( > user_id string, > user_name string, > primary key(user_id) not enforced > )with( > 'connector' = 'mysql-cdc', > 'hostname' = 'localhost', > 'port' = '3308', > 'username' = 'user_name', > 'password' = '**', > 'database-name' = 'database_name', > 'table-name' = 'table_name', > 'debezium.event.processing.failure.handling.mode' = 'warn', > 'debezium.snapshot.locking.mode' = 'none' > ); > create table user_profile( > user_id string, > age int, > primary key(user_id) not enforced > )with( > 'connector' = 'mysql-cdc', > 'hostname' = 'localhost', > 'port' = '3308', > 'username' = 'user_name', > 'password' = '**', > 'database-name' = 'database_name', > 'table-name' = 'table_name', > 'debezium.event.processing.failure.handling.mode' = 'warn', > 'debezium.snapshot.locking.mode' = 'none' > ); > {color:#FF}-- HBase sink ;Total Record:4826{color} > create table real_dwd_user_info_to_hbase( > rowkey string, > f ROW(user_name string,age int) > )with( > 'connector' = 'hbase-1.4', > 'table-name' = 'table_name', > 'zookeeper.quorum' = 'zk', > 'zookeeper.znode.parent' = '/hbase' > ); > insert into real_dwd_user_info_to_hbase > select > u.user_id, > row(u.user_name,up.age) as > from user u > left join user_profile up > on u.user_id = up.user_id > where u.user_id<100 > ; > --Code Example Two Using Left Join And Specify some key- > insert into real_dwd_user_info_to_hbase > select > u.user_id, > row(u.user_name,up.age) as > from user u > left join user_profile up > on u.user_id = up.user_id > where u.user_id=0 > --- > I printed the same code logic results, I can find that the specified key has > three results of " + i -d + i", but in HBase is still not find the key -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29770) hbase connector supports out-of-order data
[ https://issues.apache.org/jira/browse/FLINK-29770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17794489#comment-17794489 ] tanjialiang commented on FLINK-29770: - [~Bo Cui] Thanks for reporting this issue, but i think it can be closed because it had been support in [FLINK-33208|https://issues.apache.org/jira/browse/FLINK-33208]. > hbase connector supports out-of-order data > -- > > Key: FLINK-29770 > URL: https://issues.apache.org/jira/browse/FLINK-29770 > Project: Flink > Issue Type: Improvement > Components: Connectors / HBase >Reporter: Bo Cui >Priority: Minor > Labels: auto-deprioritized-major, pull-request-available > > The data may be out of order and has no timestamp. As a result, the data > written to the HBase is incorrect -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-19648) Support the nested projection push down for the hbase connector
[ https://issues.apache.org/jira/browse/FLINK-19648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781290#comment-17781290 ] tanjialiang commented on FLINK-19648: - I tried but i found lookup doesn't support nested projection yet, maybe we should support the lookup nested projection frist, and then we move on this issue. > Support the nested projection push down for the hbase connector > --- > > Key: FLINK-19648 > URL: https://issues.apache.org/jira/browse/FLINK-19648 > Project: Flink > Issue Type: Sub-task > Components: Connectors / HBase, Table SQL / Ecosystem >Reporter: Shengkai Fang >Priority: Major > > Support nested projection push down for hbase. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-19648) Support the nested projection push down for the hbase connector
[ https://issues.apache.org/jira/browse/FLINK-19648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780934#comment-17780934 ] tanjialiang commented on FLINK-19648: - Hi [~jark] [~fsk119] I want to take this ticket, can you please assign to me? > Support the nested projection push down for the hbase connector > --- > > Key: FLINK-19648 > URL: https://issues.apache.org/jira/browse/FLINK-19648 > Project: Flink > Issue Type: Sub-task > Components: Connectors / HBase, Table SQL / Ecosystem >Reporter: Shengkai Fang >Priority: Major > > Support nested projection push down for hbase. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33304) Atomicity of RowMutations would broken when Delete and Put on same columnFamily/column/row
[ https://issues.apache.org/jira/browse/FLINK-33304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779005#comment-17779005 ] tanjialiang commented on FLINK-33304: - Hi [~MartijnVisser], can you help me request a reviewer? > Atomicity of RowMutations would broken when Delete and Put on same > columnFamily/column/row > -- > > Key: FLINK-33304 > URL: https://issues.apache.org/jira/browse/FLINK-33304 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Affects Versions: hbase-3.0.1 >Reporter: tanjialiang >Priority: Blocker > Labels: pull-request-available > > Current we put the {{Mutation}} into {{BufferedMutator}} directly, when > {{Mutation}} have a {{Delete}} followed by {{Put}} to same column family or > columns or rows, only the {{Delete}} is happening while the {{Put}} is > ignored so atomicity of {{Mutation}} is broken for such cases. > See https://issues.apache.org/jira/browse/HBASE-8626. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33173) Support currentFetchEventTimeLag metrics for KafkaSource
[ https://issues.apache.org/jira/browse/FLINK-33173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1535#comment-1535 ] tanjialiang commented on FLINK-33173: - [~martijnvisser] It's my pleasure, i will try it. > Support currentFetchEventTimeLag metrics for KafkaSource > > > Key: FLINK-33173 > URL: https://issues.apache.org/jira/browse/FLINK-33173 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: kafka-3.0.0 >Reporter: tanjialiang >Priority: Major > Labels: pull-request-available > > In the connector specifications of > [FLIP-33|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33], > {{currentFetchEventTimeLag}} metrics is the time in milliseconds from the > record event timestamp to the timestamp Flink fetched the record. It can > reflect the consupition latency before deserialization. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33304) Atomicity of RowMutations would broken when Delete and Put on same columnFamily/column/row
[ https://issues.apache.org/jira/browse/FLINK-33304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-33304: Description: Current we put the {{Mutation}} into {{BufferedMutator}} directly, when {{Mutation}} have a {{Delete}} followed by {{Put}} to same column family or columns or rows, only the {{Delete}} is happening while the {{Put}} is ignored so atomicity of {{Mutation}} is broken for such cases. See https://issues.apache.org/jira/browse/HBASE-8626. was: Current we put the mutations into BufferedMutator directly, when RowMutations have a Delete followed by Put to same column family or columns or rows, only the Delete is happening while the Put is ignored so atomicity of RowMutations is broken for such cases. See https://issues.apache.org/jira/browse/HBASE-8626. > Atomicity of RowMutations would broken when Delete and Put on same > columnFamily/column/row > -- > > Key: FLINK-33304 > URL: https://issues.apache.org/jira/browse/FLINK-33304 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Affects Versions: hbase-3.0.1 >Reporter: tanjialiang >Priority: Blocker > Labels: pull-request-available > > Current we put the {{Mutation}} into {{BufferedMutator}} directly, when > {{Mutation}} have a {{Delete}} followed by {{Put}} to same column family or > columns or rows, only the {{Delete}} is happening while the {{Put}} is > ignored so atomicity of {{Mutation}} is broken for such cases. > See https://issues.apache.org/jira/browse/HBASE-8626. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33304) Atomicity of RowMutations would broken when Delete and Put on same columnFamily/column/row
tanjialiang created FLINK-33304: --- Summary: Atomicity of RowMutations would broken when Delete and Put on same columnFamily/column/row Key: FLINK-33304 URL: https://issues.apache.org/jira/browse/FLINK-33304 Project: Flink Issue Type: Bug Components: Connectors / HBase Affects Versions: hbase-3.0.1 Reporter: tanjialiang Current we put the mutations into BufferedMutator directly, when RowMutations have a Delete followed by Put to same column family or columns or rows, only the Delete is happening while the Put is ignored so atomicity of RowMutations is broken for such cases. See https://issues.apache.org/jira/browse/HBASE-8626. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33173) Support currentFetchEventTimeLag metrics for KafkaSource
[ https://issues.apache.org/jira/browse/FLINK-33173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17776501#comment-17776501 ] tanjialiang commented on FLINK-33173: - We need to make SourceReaderBase provide an interface for implementations to deliver FetchTime all the way to the place extracting EventTime in SourceOutput. Then we can continue working on this. > Support currentFetchEventTimeLag metrics for KafkaSource > > > Key: FLINK-33173 > URL: https://issues.apache.org/jira/browse/FLINK-33173 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: kafka-3.0.0 >Reporter: tanjialiang >Priority: Major > Labels: pull-request-available > > In the connector specifications of > [FLIP-33|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33], > {{currentFetchEventTimeLag}} metrics is the time in milliseconds from the > record event timestamp to the timestamp Flink fetched the record. It can > reflect the consupition latency before deserialization. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs
[ https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17776082#comment-17776082 ] tanjialiang commented on FLINK-29692: - Hi everyone, I notice that there is no solution yet. I want to share my thoughts about this feature. Maybe it can help. I think support early-fire may not be the best solution to the current window function. Because every window triggers are expensive, and also the early-fire is not the realtime trigger. For example {code:sql} SET table.exec.emit.early-fire.enabled = true; SET table.exec.emit.early-fire.delay = 1min; SELECT user_id, COUNT(*) AS total, HOP_START(rowtime, INTERVAL '24' HOUR, INTERVAL '48' HOUR) AS window_start, HOP_END(rowtime, rowtime, INTERVAL '24' HOUR, INTERVAL '48' HOUR) AS window_end, FROM user_click GROUP BY user_id, HOP(rowtime, INTERVAL '24' HOUR, INTERVAL '48' HOUR); {code} 1. whether HOP/TUMBLE/CUMULATE window or enable early-fire, there are having a time delay, which are not realtime enough. 2. when the cardinal of user_id is large, everytime to trigger window is very expensive, which would make job instability, easy to make checkpoint timeout. 3. everytime early-fire would trigger all user_id's windows, but maybe only a small part of the data actually changed in this early-fire trigger interval, which maybe cause write pressure to the sink. In my company, I've added a window TVF function for this case, named HOPv2/TUMBLEv2 (maybe the name is not fit for the community). {code:sql} select user_id, COUNT(*) AS total, window_start, window_time, -- the record rowtime window_end FROM TABLE( HOPV2( DATA => TABLE user_click, TIMECOL => DESCRIPTOR(rowtime), SLIDE => INTERVAL '24' HOUR SIZE => INTERVAL '48' HOUR, ALLOWED_LATENESS => true)) GROUP BY user_id, window_start, window_time, window_end; {code} 1. similar to OVER window,we accumulate and output the result when record comming (actually is on timer trigger), which is in realtime trigger and also there is not a lot of write pressure for sink. 2. the window_time is the record rowtime, which is represents the current progress. 3. similar to HOP window, we fire and purge when window_end come. 4. support allowedLateness option, when process the late event, if its window have not been purge, allow acuumulate without emit. I would like to contribute it but maybe need more discussion and help because i am still a novice for flink contribution. > Support early/late fires for Windowing TVFs > --- > > Key: FLINK-29692 > URL: https://issues.apache.org/jira/browse/FLINK-29692 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.15.3 >Reporter: Canope Nerda >Priority: Major > > I have cases where I need to 1) output data as soon as possible and 2) handle > late arriving data to achieve eventual correctness. In the logic, I need to > do window deduplication which is based on Windowing TVFs and according to > source code, early/late fires are not supported yet in Windowing TVFs. > Actually 1) contradicts with 2). Without early/late fires, we had to > compromise, either live with fresh incorrect data or tolerate excess latency > for correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33208) Support the writable metadata timestamp for hbase connector
[ https://issues.apache.org/jira/browse/FLINK-33208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-33208: Summary: Support the writable metadata timestamp for hbase connector (was: Support the timestamp writable metadata for hbase connector) > Support the writable metadata timestamp for hbase connector > --- > > Key: FLINK-33208 > URL: https://issues.apache.org/jira/browse/FLINK-33208 > Project: Flink > Issue Type: New Feature > Components: Connectors / HBase >Affects Versions: hbase-3.0.1 >Reporter: tanjialiang >Priority: Major > > Currently, the hbase sink does not support write data with `timestamp`, which > may cause the data to be written out of order. I suggest to support the > timestamp writable metadata for hbase connector so that we can set the > `timestamp` when we writing. > {code:java} > CREATE TABLE hTable ( > rowkey INT, > family1 ROW, > version TIMESTAMP_LTZ(3) METADATA FROM 'timestamp', > PRIMARY KEY (rowkey) NOT ENFORCED > ) WITH ( > 'connector' = 'hbase-2.2', > 'table-name' = 'mytable', > 'zookeeper.quorum' = 'localhost:2181' > );{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33208) Support the timestamp writable metadata for hbase connector
tanjialiang created FLINK-33208: --- Summary: Support the timestamp writable metadata for hbase connector Key: FLINK-33208 URL: https://issues.apache.org/jira/browse/FLINK-33208 Project: Flink Issue Type: New Feature Components: Connectors / HBase Affects Versions: hbase-3.0.1 Reporter: tanjialiang Currently, the hbase sink does not support write data with `timestamp`, which may cause the data to be written out of order. I suggest to support the timestamp writable metadata for hbase connector so that we can set the `timestamp` when we writing. {code:java} CREATE TABLE hTable ( rowkey INT, family1 ROW, version TIMESTAMP_LTZ(3) METADATA FROM 'timestamp', PRIMARY KEY (rowkey) NOT ENFORCED ) WITH ( 'connector' = 'hbase-2.2', 'table-name' = 'mytable', 'zookeeper.quorum' = 'localhost:2181' );{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33207) Scan hbase table will throw error when table is empty
[ https://issues.apache.org/jira/browse/FLINK-33207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-33207: Priority: Blocker (was: Critical) > Scan hbase table will throw error when table is empty > - > > Key: FLINK-33207 > URL: https://issues.apache.org/jira/browse/FLINK-33207 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Affects Versions: hbase-3.0.1 >Reporter: tanjialiang >Priority: Blocker > Labels: pull-request-available > > When i scan the empty hbase table, it will throw an error when > createInputSplits, we should return empty split instead. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33207) Scan hbase table will throw error when table is empty
[ https://issues.apache.org/jira/browse/FLINK-33207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-33207: Description: When i scan the empty hbase table, it will throw an error when createInputSplits, we should return empty split instead. (was: When i scan the empty hbase table, it will throw an error when createInputSplits, we shuold return empty split instead.) > Scan hbase table will throw error when table is empty > - > > Key: FLINK-33207 > URL: https://issues.apache.org/jira/browse/FLINK-33207 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Affects Versions: hbase-3.0.1 >Reporter: tanjialiang >Priority: Critical > Labels: pull-request-available > > When i scan the empty hbase table, it will throw an error when > createInputSplits, we should return empty split instead. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33207) Scan hbase table will throw error when table is empty
[ https://issues.apache.org/jira/browse/FLINK-33207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-33207: Description: When i scan the empty hbase table, it will throw an error when createInputSplits, we shuold return empty split to instead. (was: When i scan the hbase table when it is empty, it will throw an error when createInputSplits, we shuold return empty split to instead.) > Scan hbase table will throw error when table is empty > - > > Key: FLINK-33207 > URL: https://issues.apache.org/jira/browse/FLINK-33207 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Affects Versions: hbase-3.0.1 >Reporter: tanjialiang >Priority: Critical > Labels: pull-request-available > > When i scan the empty hbase table, it will throw an error when > createInputSplits, we shuold return empty split to instead. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33207) Scan hbase table will throw error when table is empty
[ https://issues.apache.org/jira/browse/FLINK-33207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-33207: Description: When i scan the empty hbase table, it will throw an error when createInputSplits, we shuold return empty split instead. (was: When i scan the empty hbase table, it will throw an error when createInputSplits, we shuold return empty split to instead.) > Scan hbase table will throw error when table is empty > - > > Key: FLINK-33207 > URL: https://issues.apache.org/jira/browse/FLINK-33207 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Affects Versions: hbase-3.0.1 >Reporter: tanjialiang >Priority: Critical > Labels: pull-request-available > > When i scan the empty hbase table, it will throw an error when > createInputSplits, we shuold return empty split instead. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33207) Scan hbase table will throw error when table is empty
tanjialiang created FLINK-33207: --- Summary: Scan hbase table will throw error when table is empty Key: FLINK-33207 URL: https://issues.apache.org/jira/browse/FLINK-33207 Project: Flink Issue Type: Bug Components: Connectors / HBase Affects Versions: hbase-3.0.1 Reporter: tanjialiang When i scan the hbase table when it is empty, it will throw an error when createInputSplits, we shuold return empty split to instead. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33206) Verify the existence of hbase table before read/write
tanjialiang created FLINK-33206: --- Summary: Verify the existence of hbase table before read/write Key: FLINK-33206 URL: https://issues.apache.org/jira/browse/FLINK-33206 Project: Flink Issue Type: Improvement Components: Connectors / HBase Affects Versions: hbase-3.0.1 Reporter: tanjialiang Attachments: image-2023-10-08-16-54-05-917.png Currently, we do not verify the existence of hbase table before read/write, and the error would make the user confused. The `HBaseSinkFunction` throws `TableNotFoundException` when do flush. The `inputFormat` throws not obvious enough. !image-2023-10-08-16-54-05-917.png! So i think we should verify the existence of hbase table when call `open` function. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33173) Support currentFetchEventTimeLag metrics for KafkaSource
[ https://issues.apache.org/jira/browse/FLINK-33173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-33173: Description: In the connector specifications of [FLIP-33|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33], {{currentFetchEventTimeLag}} metrics is the time in milliseconds from the record event timestamp to the timestamp Flink fetched the record. It can reflect the consupition latency before deserialization. (was: In the connector specifications of [FLIP-33|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33[],], currentFetchEventTimeLag metrics is the time in milliseconds from the record event timestamp to the timestamp Flink fetched the record.) > Support currentFetchEventTimeLag metrics for KafkaSource > > > Key: FLINK-33173 > URL: https://issues.apache.org/jira/browse/FLINK-33173 > Project: Flink > Issue Type: Improvement >Affects Versions: kafka-3.0.0 >Reporter: tanjialiang >Priority: Major > Labels: pull-request-available > > In the connector specifications of > [FLIP-33|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33], > {{currentFetchEventTimeLag}} metrics is the time in milliseconds from the > record event timestamp to the timestamp Flink fetched the record. It can > reflect the consupition latency before deserialization. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33173) Support currentFetchEventTimeLag metrics for KafkaSource
[ https://issues.apache.org/jira/browse/FLINK-33173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-33173: Component/s: Connectors / Kafka > Support currentFetchEventTimeLag metrics for KafkaSource > > > Key: FLINK-33173 > URL: https://issues.apache.org/jira/browse/FLINK-33173 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: kafka-3.0.0 >Reporter: tanjialiang >Priority: Major > Labels: pull-request-available > > In the connector specifications of > [FLIP-33|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33], > {{currentFetchEventTimeLag}} metrics is the time in milliseconds from the > record event timestamp to the timestamp Flink fetched the record. It can > reflect the consupition latency before deserialization. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33173) Support currentFetchEventTimeLag metrics for KafkaSource
[ https://issues.apache.org/jira/browse/FLINK-33173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-33173: Description: In the connector specifications of [FLIP-33|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33[],|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33] currentFetchEventTimeLag metrics is the time in milliseconds from the record event timestamp to the timestamp Flink fetched the record. (was: In the connector specifications of FLIP-33, currentFetchEventTimeLag metrics is the time in milliseconds from the record event timestamp to the timestamp Flink fetched the record.) > Support currentFetchEventTimeLag metrics for KafkaSource > > > Key: FLINK-33173 > URL: https://issues.apache.org/jira/browse/FLINK-33173 > Project: Flink > Issue Type: Improvement >Affects Versions: kafka-3.0.0 >Reporter: tanjialiang >Priority: Major > > In the connector specifications of > [FLIP-33|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33[],|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33] > currentFetchEventTimeLag metrics is the time in milliseconds from the record > event timestamp to the timestamp Flink fetched the record. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33173) Support currentFetchEventTimeLag metrics for KafkaSource
[ https://issues.apache.org/jira/browse/FLINK-33173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-33173: Description: In the connector specifications of [FLIP-33|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33[],], currentFetchEventTimeLag metrics is the time in milliseconds from the record event timestamp to the timestamp Flink fetched the record. (was: In the connector specifications of [FLIP-33|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33[],|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33] currentFetchEventTimeLag metrics is the time in milliseconds from the record event timestamp to the timestamp Flink fetched the record.) > Support currentFetchEventTimeLag metrics for KafkaSource > > > Key: FLINK-33173 > URL: https://issues.apache.org/jira/browse/FLINK-33173 > Project: Flink > Issue Type: Improvement >Affects Versions: kafka-3.0.0 >Reporter: tanjialiang >Priority: Major > > In the connector specifications of > [FLIP-33|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33[],], > currentFetchEventTimeLag metrics is the time in milliseconds from the record > event timestamp to the timestamp Flink fetched the record. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33173) Support currentFetchEventTimeLag metrics for KafkaSource
tanjialiang created FLINK-33173: --- Summary: Support currentFetchEventTimeLag metrics for KafkaSource Key: FLINK-33173 URL: https://issues.apache.org/jira/browse/FLINK-33173 Project: Flink Issue Type: Improvement Affects Versions: kafka-3.0.0 Reporter: tanjialiang In the connector specifications of FLIP-33, currentFetchEventTimeLag metrics is the time in milliseconds from the record event timestamp to the timestamp Flink fetched the record. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33164) HBase connector support ignore null value for partial update
tanjialiang created FLINK-33164: --- Summary: HBase connector support ignore null value for partial update Key: FLINK-33164 URL: https://issues.apache.org/jira/browse/FLINK-33164 Project: Flink Issue Type: Improvement Components: Connectors / HBase Affects Versions: hbase-3.0.0 Reporter: tanjialiang Sometimes, user want to write data and ignore null value to achieve partial update. So i suggest adding an options: sink.ignore-null-value. {code:java} CREATE TABLE hTable ( rowkey STRING, cf1 ROW, PRIMARY KEY (rowkey) NOT ENFORCED ) WITH ( 'connector' = 'hbase-2.2', 'table-name' = 'default:test', 'zookeeper.quorum' = 'localhost:2181', 'sink.ignore-null-value' = 'true' -- default is false, true is enabled ); INSERT INTO hTable VALUES('1', ROW('10', 'hello, world')); INSERT INTO hTable VALUES('1', ROW('30', CAST(NULL AS STRING))); -- null value to cf1.q2 -- when sink.ignore-null-value is false // after first insert {rowkey: "1", "cf1": {q1: "10", q2: "hello, world"}} // after second insert, cf1.q2 update to null {rowkey: "1", "cf1": {q1: "30", q2: "null"}} -- when sink.ignore-null-value is true // after first insert {rowkey: "1", "cf1": {q1: "10", q2: "hello, world"}} // after second insert, cf1.q2 is still the old value {rowkey: "1", "cf1": {q1: "30", q2: "hello, world"}} {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28303) Kafka SQL Connector loses data when restoring from a savepoint with a topic with empty partitions
[ https://issues.apache.org/jira/browse/FLINK-28303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769056#comment-17769056 ] tanjialiang commented on FLINK-28303: - Maybe we should implement the LatestOffsetsInitializer to look up the end offset and pass it to the reader, instead of pass KafkaPartitionSplit#LATEST_OFFSET(-1). > Kafka SQL Connector loses data when restoring from a savepoint with a topic > with empty partitions > - > > Key: FLINK-28303 > URL: https://issues.apache.org/jira/browse/FLINK-28303 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.4 >Reporter: Robert Metzger >Priority: Major > > Steps to reproduce: > - Set up a Kafka topic with 10 partitions > - produce records 0-9 into the topic > - take a savepoint and stop the job > - produce records 10-19 into the topic > - restore the job from the savepoint. > The job will be missing usually 2-4 records from 10-19. > My assumption is that if a partition never had data (which is likely with 10 > partitions and 10 records), the savepoint will only contain offsets for > partitions with data. > While the job was offline (and we've written record 10-19 into the topic), > all partitions got filled. Now, when Kafka comes online again, it will use > the "latest" offset for those partitions, skipping some data. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28303) Kafka SQL Connector loses data when restoring from a savepoint with a topic with empty partitions
[ https://issues.apache.org/jira/browse/FLINK-28303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769014#comment-17769014 ] tanjialiang commented on FLINK-28303: - [~martijnvisser] I had already check the latest kafka connector code, this problem still exists. > Kafka SQL Connector loses data when restoring from a savepoint with a topic > with empty partitions > - > > Key: FLINK-28303 > URL: https://issues.apache.org/jira/browse/FLINK-28303 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.4 >Reporter: Robert Metzger >Priority: Major > > Steps to reproduce: > - Set up a Kafka topic with 10 partitions > - produce records 0-9 into the topic > - take a savepoint and stop the job > - produce records 10-19 into the topic > - restore the job from the savepoint. > The job will be missing usually 2-4 records from 10-19. > My assumption is that if a partition never had data (which is likely with 10 > partitions and 10 records), the savepoint will only contain offsets for > partitions with data. > While the job was offline (and we've written record 10-19 into the topic), > all partitions got filled. Now, when Kafka comes online again, it will use > the "latest" offset for those partitions, skipping some data. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28303) Kafka SQL Connector loses data when restoring from a savepoint with a topic with empty partitions
[ https://issues.apache.org/jira/browse/FLINK-28303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17768637#comment-17768637 ] tanjialiang commented on FLINK-28303: - Maybe this this is the reason? [FLINK-33153|https://issues.apache.org/jira/browse/FLINK-33153] > Kafka SQL Connector loses data when restoring from a savepoint with a topic > with empty partitions > - > > Key: FLINK-28303 > URL: https://issues.apache.org/jira/browse/FLINK-28303 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.4 >Reporter: Robert Metzger >Priority: Major > > Steps to reproduce: > - Set up a Kafka topic with 10 partitions > - produce records 0-9 into the topic > - take a savepoint and stop the job > - produce records 10-19 into the topic > - restore the job from the savepoint. > The job will be missing usually 2-4 records from 10-19. > My assumption is that if a partition never had data (which is likely with 10 > partitions and 10 records), the savepoint will only contain offsets for > partitions with data. > While the job was offline (and we've written record 10-19 into the topic), > all partitions got filled. Now, when Kafka comes online again, it will use > the "latest" offset for those partitions, skipping some data. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33153) Kafka using latest-offset maybe missing data
[ https://issues.apache.org/jira/browse/FLINK-33153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17768631#comment-17768631 ] tanjialiang commented on FLINK-33153: - It relate to https://issues.apache.org/jira/browse/FLINK-28303, so i close this issue. > Kafka using latest-offset maybe missing data > > > Key: FLINK-33153 > URL: https://issues.apache.org/jira/browse/FLINK-33153 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: kafka-4.1.0 >Reporter: tanjialiang >Priority: Minor > > When Kafka start with the latest-offset strategy, it does not fetch the > latest snapshot offset and specify it for consumption. Instead, it sets the > startingOffset to -1 (KafkaPartitionSplit.LATEST_OFFSET, which makes > currentOffset = -1, and call the KafkaConsumer's seekToEnd API). The > currentOffset is only set to the consumed offset + 1 when the task consumes > data, and this currentOffset is stored in the state during checkpointing. If > there are very few messages in Kafka and a partition has not consumed any > data, and I stop the task with a savepoint, then write data to that > partition, and start the task with the savepoint, the task will resume from > the saved state. Due to the startingOffset in the state being -1, it will > cause the task to miss the data that was written before the recovery point. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33153) Kafka using latest-offset maybe missing data
[ https://issues.apache.org/jira/browse/FLINK-33153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-33153: External issue URL: (was: https://issues.apache.org/jira/browse/FLINK-28303) > Kafka using latest-offset maybe missing data > > > Key: FLINK-33153 > URL: https://issues.apache.org/jira/browse/FLINK-33153 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: kafka-4.1.0 >Reporter: tanjialiang >Priority: Minor > > When Kafka start with the latest-offset strategy, it does not fetch the > latest snapshot offset and specify it for consumption. Instead, it sets the > startingOffset to -1 (KafkaPartitionSplit.LATEST_OFFSET, which makes > currentOffset = -1, and call the KafkaConsumer's seekToEnd API). The > currentOffset is only set to the consumed offset + 1 when the task consumes > data, and this currentOffset is stored in the state during checkpointing. If > there are very few messages in Kafka and a partition has not consumed any > data, and I stop the task with a savepoint, then write data to that > partition, and start the task with the savepoint, the task will resume from > the saved state. Due to the startingOffset in the state being -1, it will > cause the task to miss the data that was written before the recovery point. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33153) Kafka using latest-offset maybe missing data
[ https://issues.apache.org/jira/browse/FLINK-33153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-33153: External issue URL: https://issues.apache.org/jira/browse/FLINK-28303 Release Note: (was: realte to https://issues.apache.org/jira/browse/FLINK-28303) > Kafka using latest-offset maybe missing data > > > Key: FLINK-33153 > URL: https://issues.apache.org/jira/browse/FLINK-33153 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: kafka-4.1.0 >Reporter: tanjialiang >Priority: Minor > > When Kafka start with the latest-offset strategy, it does not fetch the > latest snapshot offset and specify it for consumption. Instead, it sets the > startingOffset to -1 (KafkaPartitionSplit.LATEST_OFFSET, which makes > currentOffset = -1, and call the KafkaConsumer's seekToEnd API). The > currentOffset is only set to the consumed offset + 1 when the task consumes > data, and this currentOffset is stored in the state during checkpointing. If > there are very few messages in Kafka and a partition has not consumed any > data, and I stop the task with a savepoint, then write data to that > partition, and start the task with the savepoint, the task will resume from > the saved state. Due to the startingOffset in the state being -1, it will > cause the task to miss the data that was written before the recovery point. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-33153) Kafka using latest-offset maybe missing data
[ https://issues.apache.org/jira/browse/FLINK-33153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang closed FLINK-33153. --- Release Note: realte to https://issues.apache.org/jira/browse/FLINK-28303 Resolution: Duplicate > Kafka using latest-offset maybe missing data > > > Key: FLINK-33153 > URL: https://issues.apache.org/jira/browse/FLINK-33153 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: kafka-4.1.0 >Reporter: tanjialiang >Priority: Minor > > When Kafka start with the latest-offset strategy, it does not fetch the > latest snapshot offset and specify it for consumption. Instead, it sets the > startingOffset to -1 (KafkaPartitionSplit.LATEST_OFFSET, which makes > currentOffset = -1, and call the KafkaConsumer's seekToEnd API). The > currentOffset is only set to the consumed offset + 1 when the task consumes > data, and this currentOffset is stored in the state during checkpointing. If > there are very few messages in Kafka and a partition has not consumed any > data, and I stop the task with a savepoint, then write data to that > partition, and start the task with the savepoint, the task will resume from > the saved state. Due to the startingOffset in the state being -1, it will > cause the task to miss the data that was written before the recovery point. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33153) Kafka using latest-offset maybe missing data
tanjialiang created FLINK-33153: --- Summary: Kafka using latest-offset maybe missing data Key: FLINK-33153 URL: https://issues.apache.org/jira/browse/FLINK-33153 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: kafka-4.1.0 Reporter: tanjialiang When Kafka start with the latest-offset strategy, it does not fetch the latest snapshot offset and specify it for consumption. Instead, it sets the startingOffset to -1 (KafkaPartitionSplit.LATEST_OFFSET, which makes currentOffset = -1, and call the KafkaConsumer's seekToEnd API). The currentOffset is only set to the consumed offset + 1 when the task consumes data, and this currentOffset is stored in the state during checkpointing. If there are very few messages in Kafka and a partition has not consumed any data, and I stop the task with a savepoint, then write data to that partition, and start the task with the savepoint, the task will resume from the saved state. Due to the startingOffset in the state being -1, it will cause the task to miss the data that was written before the recovery point. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-32252) SELECT COUNT(*) will return nothing when source no data return
[ https://issues.apache.org/jira/browse/FLINK-32252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang closed FLINK-32252. --- Resolution: Duplicate > SELECT COUNT(*) will return nothing when source no data return > -- > > Key: FLINK-32252 > URL: https://issues.apache.org/jira/browse/FLINK-32252 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: jdbc-3.1.0 >Reporter: tanjialiang >Priority: Major > > > mysql source > {code:java} > CREATE TABLE student( > id int primary key auto_increment, > name varchar(32), > age int > ); > INSERT INTO student(name, age) VALUES > ('tanjl',18),('jark',20),('mike',16),('rose',21);{code} > > Flink SQL > {code:java} > CREATE TABLE student ( > `id` INT PRIMARY KEY, > `name` STRING, > `age` INT > ) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://localhost/test?serverTimezone=UTC', > 'username' = 'root', > 'password' = 'root', > 'table-name' = 'student' > ); > SELECT count(*) FROM student WHERE age < 15;{code} > flink will return nothing because jdbc connector push the filter down(after > flink-connector-jdbc-3.1.0), which make source no data return. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32252) SELECT COUNT(*) will return nothing when source no data return
[ https://issues.apache.org/jira/browse/FLINK-32252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17729567#comment-17729567 ] tanjialiang commented on FLINK-32252: - [~jark] Got it, thanks. > SELECT COUNT(*) will return nothing when source no data return > -- > > Key: FLINK-32252 > URL: https://issues.apache.org/jira/browse/FLINK-32252 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: jdbc-3.1.0 >Reporter: tanjialiang >Priority: Major > > > mysql source > {code:java} > CREATE TABLE student( > id int primary key auto_increment, > name varchar(32), > age int > ); > INSERT INTO student(name, age) VALUES > ('tanjl',18),('jark',20),('mike',16),('rose',21);{code} > > Flink SQL > {code:java} > CREATE TABLE student ( > `id` INT PRIMARY KEY, > `name` STRING, > `age` INT > ) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://localhost/test?serverTimezone=UTC', > 'username' = 'root', > 'password' = 'root', > 'table-name' = 'student' > ); > SELECT count(*) FROM student WHERE age < 15;{code} > flink will return nothing because jdbc connector push the filter down(after > flink-connector-jdbc-3.1.0), which make source no data return. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-32252) SELECT COUNT(*) will return nothing when source no data return
[ https://issues.apache.org/jira/browse/FLINK-32252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17729556#comment-17729556 ] tanjialiang edited comment on FLINK-32252 at 6/6/23 2:39 AM: - [~jark] Whether in BATCH mode or STREAMING mode the final reuslts should be consitent? So i think maybe something wrong in STREAMING mode. was (Author: JIRAUSER279823): [~jark] Whether in BATCH mode or STREAMING mode the reuslts should be consitent? So i think maybe something wrong in STREAMING mode. > SELECT COUNT(*) will return nothing when source no data return > -- > > Key: FLINK-32252 > URL: https://issues.apache.org/jira/browse/FLINK-32252 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: jdbc-3.1.0 >Reporter: tanjialiang >Priority: Major > > > mysql source > {code:java} > CREATE TABLE student( > id int primary key auto_increment, > name varchar(32), > age int > ); > INSERT INTO student(name, age) VALUES > ('tanjl',18),('jark',20),('mike',16),('rose',21);{code} > > Flink SQL > {code:java} > CREATE TABLE student ( > `id` INT PRIMARY KEY, > `name` STRING, > `age` INT > ) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://localhost/test?serverTimezone=UTC', > 'username' = 'root', > 'password' = 'root', > 'table-name' = 'student' > ); > SELECT count(*) FROM student WHERE age < 15;{code} > flink will return nothing because jdbc connector push the filter down(after > flink-connector-jdbc-3.1.0), which make source no data return. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32252) SELECT COUNT(*) will return nothing when source no data return
[ https://issues.apache.org/jira/browse/FLINK-32252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17729556#comment-17729556 ] tanjialiang commented on FLINK-32252: - [~jark] Whether in BATCH mode or STREAMING mode the reuslts should be consitent? So i think maybe something wrong in STREAMING mode. > SELECT COUNT(*) will return nothing when source no data return > -- > > Key: FLINK-32252 > URL: https://issues.apache.org/jira/browse/FLINK-32252 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: jdbc-3.1.0 >Reporter: tanjialiang >Priority: Major > > > mysql source > {code:java} > CREATE TABLE student( > id int primary key auto_increment, > name varchar(32), > age int > ); > INSERT INTO student(name, age) VALUES > ('tanjl',18),('jark',20),('mike',16),('rose',21);{code} > > Flink SQL > {code:java} > CREATE TABLE student ( > `id` INT PRIMARY KEY, > `name` STRING, > `age` INT > ) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://localhost/test?serverTimezone=UTC', > 'username' = 'root', > 'password' = 'root', > 'table-name' = 'student' > ); > SELECT count(*) FROM student WHERE age < 15;{code} > flink will return nothing because jdbc connector push the filter down(after > flink-connector-jdbc-3.1.0), which make source no data return. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32252) SELECT COUNT(*) will return nothing when source no data return
[ https://issues.apache.org/jira/browse/FLINK-32252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17729552#comment-17729552 ] tanjialiang commented on FLINK-32252: - [~jark] I tried, STREAMING mode return nothing and BATCH mode return 0. > SELECT COUNT(*) will return nothing when source no data return > -- > > Key: FLINK-32252 > URL: https://issues.apache.org/jira/browse/FLINK-32252 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: jdbc-3.1.0 >Reporter: tanjialiang >Priority: Major > > > mysql source > {code:java} > CREATE TABLE student( > id int primary key auto_increment, > name varchar(32), > age int > ); > INSERT INTO student(name, age) VALUES > ('tanjl',18),('jark',20),('mike',16),('rose',21);{code} > > Flink SQL > {code:java} > CREATE TABLE student ( > `id` INT PRIMARY KEY, > `name` STRING, > `age` INT > ) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://localhost/test?serverTimezone=UTC', > 'username' = 'root', > 'password' = 'root', > 'table-name' = 'student' > ); > SELECT count(*) FROM student WHERE age < 15;{code} > flink will return nothing because jdbc connector push the filter down(after > flink-connector-jdbc-3.1.0), which make source no data return. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32252) SELECT COUNT(*) will return nothing when source no data return
tanjialiang created FLINK-32252: --- Summary: SELECT COUNT(*) will return nothing when source no data return Key: FLINK-32252 URL: https://issues.apache.org/jira/browse/FLINK-32252 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.16.1 Reporter: tanjialiang mysql source {code:java} CREATE TABLE student( id int primary key auto_increment, name varchar(32), age int ); INSERT INTO student(name, age) VALUES ('tanjl',18),('jark',20),('mike',16),('rose',21);{code} Flink SQL {code:java} CREATE TABLE student ( `id` INT PRIMARY KEY, `name` STRING, `age` INT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost/test?serverTimezone=UTC', 'username' = 'root', 'password' = 'root', 'table-name' = 'student' ); SELECT count(*) FROM student WHERE age < 15;{code} flink will return nothing because jdbc connector push the filter down(after flink-connector-jdbc-3.1.0), which make source no data return. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31308) JobManager's metaspace out-of-memory when submit a flinksessionjobs
[ https://issues.apache.org/jira/browse/FLINK-31308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-31308: Component/s: Table SQL / Planner Table SQL / Runtime (was: Kubernetes Operator) (was: Table SQL / API) Affects Version/s: (was: kubernetes-operator-1.4.0) > JobManager's metaspace out-of-memory when submit a flinksessionjobs > --- > > Key: FLINK-31308 > URL: https://issues.apache.org/jira/browse/FLINK-31308 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner, Table SQL / Runtime >Affects Versions: 1.16.1 >Reporter: tanjialiang >Priority: Major > Attachments: image-2023-03-03-10-34-46-681.png > > > Hello teams, when i try to recurring submit a flinksessionjobs by flink > operator, it will be make JobManager's metaspace OOM. My Job having some > flink-sql logic, it is the userclassloader didn't closed? Or may be beacuase > of flink-sql's codegen? By the way, it not appear when i using > flink-sql-gateway to submit. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31686) Filesystem connector should replace the shallow copy with deep copy
[ https://issues.apache.org/jira/browse/FLINK-31686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17707522#comment-17707522 ] tanjialiang commented on FLINK-31686: - Maybe i can take this ticket, i would like to try it. > Filesystem connector should replace the shallow copy with deep copy > --- > > Key: FLINK-31686 > URL: https://issues.apache.org/jira/browse/FLINK-31686 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.16.1 >Reporter: tanjialiang >Priority: Major > Attachments: image-2023-04-01-16-18-48-762.png, > image-2023-04-01-16-18-56-075.png > > > Hi team, when i using the following sql > {code:java} > CREATE TABLE student ( > `id` STRING, > `name` STRING, > `age` INT > ) WITH ( > 'connector' = 'filesystem', > 'path' = '...', > 'format' = 'orc' > ); > select > t1.total, > t2.total > from > ( > select > count(*) as total, > 1 as join_key > from student > where name = 'tanjialiang' > ) t1 > LEFT JOIN ( > select > count(*) as total, > 1 as join_key > from student; > ) t2 > ON t1.join_key = t2.join_key; {code} > > it will throw an error > !image-2023-04-01-16-18-48-762.png! > > I tried to solve it, and i found filesystem connector's copy function using a > shallow copy instread of deep copy. It lead to all of query from a same > table source reuse the same bulkWriterFormat, and my query have filter > condition, which will push down into the bulkWriterFormat, so the filter > condition maybe reuse. > I found the DynamicTableSource and DynamicTableSink's copy function comment > to ask we should impletement it with deep copy, but i found every connector > are using shallow copy to impletement it. So i think not only the > filesystem connector have this problem. > !image-2023-04-01-16-18-56-075.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31686) Filesystem connector should replace the shallow copy with deep copy
[ https://issues.apache.org/jira/browse/FLINK-31686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-31686: Summary: Filesystem connector should replace the shallow copy with deep copy (was: Filesystem Connector should replace the shallow copy with deep copy) > Filesystem connector should replace the shallow copy with deep copy > --- > > Key: FLINK-31686 > URL: https://issues.apache.org/jira/browse/FLINK-31686 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.16.1 >Reporter: tanjialiang >Priority: Major > Attachments: image-2023-04-01-16-18-48-762.png, > image-2023-04-01-16-18-56-075.png > > > Hi team, when i using the following sql > {code:java} > CREATE TABLE student ( > `id` STRING, > `name` STRING, > `age` INT > ) WITH ( > 'connector' = 'filesystem', > 'path' = '...', > 'format' = 'orc' > ); > select > t1.total, > t2.total > from > ( > select > count(*) as total, > 1 as join_key > from student > where name = 'tanjialiang' > ) t1 > LEFT JOIN ( > select > count(*) as total, > 1 as join_key > from student; > ) t2 > ON t1.join_key = t2.join_key; {code} > > it will throw an error > !image-2023-04-01-16-18-48-762.png! > > I tried to solve it, and i found filesystem connector's copy function using a > shallow copy instread of deep copy. It lead to all of query from a same > table source reuse the same bulkWriterFormat, and my query have filter > condition, which will push down into the bulkWriterFormat, so the filter > condition maybe reuse. > I found the DynamicTableSource and DynamicTableSink's copy function comment > to ask we should impletement it with deep copy, but i found every connector > are using shallow copy to impletement it. So i think not only the > filesystem connector have this problem. > !image-2023-04-01-16-18-56-075.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31686) Filesystem Connector should replace the shallow copy with deep copy
[ https://issues.apache.org/jira/browse/FLINK-31686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-31686: Summary: Filesystem Connector should replace the shallow copy with deep copy (was: Connector should replace the shallow copy with deep copy) > Filesystem Connector should replace the shallow copy with deep copy > --- > > Key: FLINK-31686 > URL: https://issues.apache.org/jira/browse/FLINK-31686 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.16.1 >Reporter: tanjialiang >Priority: Major > Attachments: image-2023-04-01-16-18-48-762.png, > image-2023-04-01-16-18-56-075.png > > > Hi team, when i using the following sql > {code:java} > CREATE TABLE student ( > `id` STRING, > `name` STRING, > `age` INT > ) WITH ( > 'connector' = 'filesystem', > 'path' = '...', > 'format' = 'orc' > ); > select > t1.total, > t2.total > from > ( > select > count(*) as total, > 1 as join_key > from student > where name = 'tanjialiang' > ) t1 > LEFT JOIN ( > select > count(*) as total, > 1 as join_key > from student; > ) t2 > ON t1.join_key = t2.join_key; {code} > > it will throw an error > !image-2023-04-01-16-18-48-762.png! > > I tried to solve it, and i found filesystem connector's copy function using a > shallow copy instread of deep copy. It lead to all of query from a same > table source reuse the same bulkWriterFormat, and my query have filter > condition, which will push down into the bulkWriterFormat, so the filter > condition maybe reuse. > I found the DynamicTableSource and DynamicTableSink's copy function comment > to ask we should impletement it with deep copy, but i found every connector > are using shallow copy to impletement it. So i think not only the > filesystem connector have this problem. > !image-2023-04-01-16-18-56-075.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31686) Connector should replace the shallow copy with deep copy
[ https://issues.apache.org/jira/browse/FLINK-31686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-31686: Summary: Connector should replace the shallow copy with deep copy (was: All of connector should replace the shallow copy with deep copy) > Connector should replace the shallow copy with deep copy > > > Key: FLINK-31686 > URL: https://issues.apache.org/jira/browse/FLINK-31686 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.16.1 >Reporter: tanjialiang >Priority: Major > Attachments: image-2023-04-01-16-18-48-762.png, > image-2023-04-01-16-18-56-075.png > > > Hi team, when i using the following sql > {code:java} > CREATE TABLE student ( > `id` STRING, > `name` STRING, > `age` INT > ) WITH ( > 'connector' = 'filesystem', > 'path' = '...', > 'format' = 'orc' > ); > select > t1.total, > t2.total > from > ( > select > count(*) as total, > 1 as join_key > from student > where name = 'tanjialiang' > ) t1 > LEFT JOIN ( > select > count(*) as total, > 1 as join_key > from student; > ) t2 > ON t1.join_key = t2.join_key; {code} > > it will throw an error > !image-2023-04-01-16-18-48-762.png! > > I tried to solve it, and i found filesystem connector's copy function using a > shallow copy instread of deep copy. It lead to all of query from a same > table source reuse the same bulkWriterFormat, and my query have filter > condition, which will push down into the bulkWriterFormat, so the filter > condition maybe reuse. > I found the DynamicTableSource and DynamicTableSink's copy function comment > to ask we should impletement it with deep copy, but i found every connector > are using shallow copy to impletement it. So i think not only the > filesystem connector have this problem. > !image-2023-04-01-16-18-56-075.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31686) All of connector should replace the shallow copy with deep copy
[ https://issues.apache.org/jira/browse/FLINK-31686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-31686: Summary: All of connector should replace the shallow copy with deep copy (was: All of connector should replace the shallow copy with a deep copy) > All of connector should replace the shallow copy with deep copy > --- > > Key: FLINK-31686 > URL: https://issues.apache.org/jira/browse/FLINK-31686 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.16.1 >Reporter: tanjialiang >Priority: Major > Attachments: image-2023-04-01-16-18-48-762.png, > image-2023-04-01-16-18-56-075.png > > > Hi team, when i using the following sql > {code:java} > CREATE TABLE student ( > `id` STRING, > `name` STRING, > `age` INT > ) WITH ( > 'connector' = 'filesystem', > 'path' = '...', > 'format' = 'orc' > ); > select > t1.total, > t2.total > from > ( > select > count(*) as total, > 1 as join_key > from student > where name = 'tanjialiang' > ) t1 > LEFT JOIN ( > select > count(*) as total, > 1 as join_key > from student; > ) t2 > ON t1.join_key = t2.join_key; {code} > > it will throw an error > !image-2023-04-01-16-18-48-762.png! > > I tried to solve it, and i found filesystem connector's copy function using a > shallow copy instread of deep copy. It lead to all of query from a same > table source reuse the same bulkWriterFormat, and my query have filter > condition, which will push down into the bulkWriterFormat, so the filter > condition maybe reuse. > I found the DynamicTableSource and DynamicTableSink's copy function comment > to ask we should impletement it with deep copy, but i found every connector > are using shallow copy to impletement it. So i think not only the > filesystem connector have this problem. > !image-2023-04-01-16-18-56-075.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31686) All of connector should replace the shallow copy with a deep copy
tanjialiang created FLINK-31686: --- Summary: All of connector should replace the shallow copy with a deep copy Key: FLINK-31686 URL: https://issues.apache.org/jira/browse/FLINK-31686 Project: Flink Issue Type: Bug Components: Connectors / FileSystem Affects Versions: 1.16.1 Reporter: tanjialiang Attachments: image-2023-04-01-16-18-48-762.png, image-2023-04-01-16-18-56-075.png Hi team, when i using the following sql {code:java} CREATE TABLE student ( `id` STRING, `name` STRING, `age` INT ) WITH ( 'connector' = 'filesystem', 'path' = '...', 'format' = 'orc' ); select t1.total, t2.total from ( select count(*) as total, 1 as join_key from student where name = 'tanjialiang' ) t1 LEFT JOIN ( select count(*) as total, 1 as join_key from student; ) t2 ON t1.join_key = t2.join_key; {code} it will throw an error !image-2023-04-01-15-53-40-060.png! I tried to solve it, and i found filesystem connector's copy function using a shallow copy instread of deep copy. It lead to all of query from a same table source reuse the same bulkWriterFormat, and my query have filter condition, which will push down into the bulkWriterFormat, so the filter condition maybe reuse. I found the DynamicTableSource and DynamicTableSink's copy function comment to ask we should impletement it with deep copy, but i found every connector are using shallow copy to impletement it. So i think not only the filesystem connector have this problem. !image-2023-04-01-16-12-29-927.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31686) All of connector should replace the shallow copy with a deep copy
[ https://issues.apache.org/jira/browse/FLINK-31686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-31686: Attachment: image-2023-04-01-16-18-48-762.png > All of connector should replace the shallow copy with a deep copy > - > > Key: FLINK-31686 > URL: https://issues.apache.org/jira/browse/FLINK-31686 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.16.1 >Reporter: tanjialiang >Priority: Major > Attachments: image-2023-04-01-16-18-48-762.png, > image-2023-04-01-16-18-56-075.png > > > Hi team, when i using the following sql > {code:java} > CREATE TABLE student ( > `id` STRING, > `name` STRING, > `age` INT > ) WITH ( > 'connector' = 'filesystem', > 'path' = '...', > 'format' = 'orc' > ); > select > t1.total, > t2.total > from > ( > select > count(*) as total, > 1 as join_key > from student > where name = 'tanjialiang' > ) t1 > LEFT JOIN ( > select > count(*) as total, > 1 as join_key > from student; > ) t2 > ON t1.join_key = t2.join_key; {code} > > it will throw an error > !image-2023-04-01-15-53-40-060.png! > > I tried to solve it, and i found filesystem connector's copy function using a > shallow copy instread of deep copy. It lead to all of query from a same > table source reuse the same bulkWriterFormat, and my query have filter > condition, which will push down into the bulkWriterFormat, so the filter > condition maybe reuse. > I found the DynamicTableSource and DynamicTableSink's copy function comment > to ask we should impletement it with deep copy, but i found every connector > are using shallow copy to impletement it. So i think not only the > filesystem connector have this problem. > !image-2023-04-01-16-12-29-927.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31686) All of connector should replace the shallow copy with a deep copy
[ https://issues.apache.org/jira/browse/FLINK-31686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-31686: Description: Hi team, when i using the following sql {code:java} CREATE TABLE student ( `id` STRING, `name` STRING, `age` INT ) WITH ( 'connector' = 'filesystem', 'path' = '...', 'format' = 'orc' ); select t1.total, t2.total from ( select count(*) as total, 1 as join_key from student where name = 'tanjialiang' ) t1 LEFT JOIN ( select count(*) as total, 1 as join_key from student; ) t2 ON t1.join_key = t2.join_key; {code} it will throw an error !image-2023-04-01-16-18-48-762.png! I tried to solve it, and i found filesystem connector's copy function using a shallow copy instread of deep copy. It lead to all of query from a same table source reuse the same bulkWriterFormat, and my query have filter condition, which will push down into the bulkWriterFormat, so the filter condition maybe reuse. I found the DynamicTableSource and DynamicTableSink's copy function comment to ask we should impletement it with deep copy, but i found every connector are using shallow copy to impletement it. So i think not only the filesystem connector have this problem. !image-2023-04-01-16-18-56-075.png! was: Hi team, when i using the following sql {code:java} CREATE TABLE student ( `id` STRING, `name` STRING, `age` INT ) WITH ( 'connector' = 'filesystem', 'path' = '...', 'format' = 'orc' ); select t1.total, t2.total from ( select count(*) as total, 1 as join_key from student where name = 'tanjialiang' ) t1 LEFT JOIN ( select count(*) as total, 1 as join_key from student; ) t2 ON t1.join_key = t2.join_key; {code} it will throw an error !image-2023-04-01-15-53-40-060.png! I tried to solve it, and i found filesystem connector's copy function using a shallow copy instread of deep copy. It lead to all of query from a same table source reuse the same bulkWriterFormat, and my query have filter condition, which will push down into the bulkWriterFormat, so the filter condition maybe reuse. I found the DynamicTableSource and DynamicTableSink's copy function comment to ask we should impletement it with deep copy, but i found every connector are using shallow copy to impletement it. So i think not only the filesystem connector have this problem. !image-2023-04-01-16-12-29-927.png! > All of connector should replace the shallow copy with a deep copy > - > > Key: FLINK-31686 > URL: https://issues.apache.org/jira/browse/FLINK-31686 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.16.1 >Reporter: tanjialiang >Priority: Major > Attachments: image-2023-04-01-16-18-48-762.png, > image-2023-04-01-16-18-56-075.png > > > Hi team, when i using the following sql > {code:java} > CREATE TABLE student ( > `id` STRING, > `name` STRING, > `age` INT > ) WITH ( > 'connector' = 'filesystem', > 'path' = '...', > 'format' = 'orc' > ); > select > t1.total, > t2.total > from > ( > select > count(*) as total, > 1 as join_key > from student > where name = 'tanjialiang' > ) t1 > LEFT JOIN ( > select > count(*) as total, > 1 as join_key > from student; > ) t2 > ON t1.join_key = t2.join_key; {code} > > it will throw an error > !image-2023-04-01-16-18-48-762.png! > > I tried to solve it, and i found filesystem connector's copy function using a > shallow copy instread of deep copy. It lead to all of query from a same > table source reuse the same bulkWriterFormat, and my query have filter > condition, which will push down into the bulkWriterFormat, so the filter > condition maybe reuse. > I found the DynamicTableSource and DynamicTableSink's copy function comment > to ask we should impletement it with deep copy, but i found every connector > are using shallow copy to impletement it. So i think not only the > filesystem connector have this problem. > !image-2023-04-01-16-18-56-075.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31686) All of connector should replace the shallow copy with a deep copy
[ https://issues.apache.org/jira/browse/FLINK-31686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-31686: Attachment: image-2023-04-01-16-18-56-075.png > All of connector should replace the shallow copy with a deep copy > - > > Key: FLINK-31686 > URL: https://issues.apache.org/jira/browse/FLINK-31686 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.16.1 >Reporter: tanjialiang >Priority: Major > Attachments: image-2023-04-01-16-18-48-762.png, > image-2023-04-01-16-18-56-075.png > > > Hi team, when i using the following sql > {code:java} > CREATE TABLE student ( > `id` STRING, > `name` STRING, > `age` INT > ) WITH ( > 'connector' = 'filesystem', > 'path' = '...', > 'format' = 'orc' > ); > select > t1.total, > t2.total > from > ( > select > count(*) as total, > 1 as join_key > from student > where name = 'tanjialiang' > ) t1 > LEFT JOIN ( > select > count(*) as total, > 1 as join_key > from student; > ) t2 > ON t1.join_key = t2.join_key; {code} > > it will throw an error > !image-2023-04-01-15-53-40-060.png! > > I tried to solve it, and i found filesystem connector's copy function using a > shallow copy instread of deep copy. It lead to all of query from a same > table source reuse the same bulkWriterFormat, and my query have filter > condition, which will push down into the bulkWriterFormat, so the filter > condition maybe reuse. > I found the DynamicTableSource and DynamicTableSink's copy function comment > to ask we should impletement it with deep copy, but i found every connector > are using shallow copy to impletement it. So i think not only the > filesystem connector have this problem. > !image-2023-04-01-16-12-29-927.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31541) Get metrics in Flink WEB UI error
tanjialiang created FLINK-31541: --- Summary: Get metrics in Flink WEB UI error Key: FLINK-31541 URL: https://issues.apache.org/jira/browse/FLINK-31541 Project: Flink Issue Type: Bug Components: Runtime / Metrics, Runtime / Web Frontend Affects Versions: 1.16.1 Reporter: tanjialiang Attachments: image-2023-03-21-20-28-56-348.png When i get a metrics from a operator which name contains '[' or ']', it will be return 400 from rest response. The reason is we can not submit an GET request which params contains '[' or ']', it is invaild in REST. !image-2023-03-21-20-28-56-348.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31367) Support filter's function push down
tanjialiang created FLINK-31367: --- Summary: Support filter's function push down Key: FLINK-31367 URL: https://issues.apache.org/jira/browse/FLINK-31367 Project: Flink Issue Type: Improvement Components: Connectors / JDBC, Table SQL / API Affects Versions: 1.16.1 Reporter: tanjialiang Hi teams, as far as i known, source ability support simply filter push down, it may be just push down constant value like this: {code:java} CREATE TABLE student ( id int, brithday string ) WITH ( ... ); # it can support push down its filter if connector implements SupportsFilterPushDown # id and birthday will be push down SELECT * FROM student WHERE id = 1 AND birthday = '1997-04-13';{code} But it will not push down like this: {code:java} CREATE TABLE student ( id int, brithday string ) WITH ( ... ); # it will not support push down its filter though connector implements SupportsFilterPushDown # id and birthday will not push down, so it will be filter in flink task SELECT * FROM student WHERE id = 1 AND birthday = TO_DATE('1997-04-13 00:00:00');{code} Can we get the flink function in SupportsFilterPushDown, so we can adapt the flink function in every connector? For example, I can adapt the Flink function TO_DATE to mysql's function STR_TO_DATE. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31308) JobManager's metaspace out-of-memory when submit a flinksessionjobs
tanjialiang created FLINK-31308: --- Summary: JobManager's metaspace out-of-memory when submit a flinksessionjobs Key: FLINK-31308 URL: https://issues.apache.org/jira/browse/FLINK-31308 Project: Flink Issue Type: Bug Components: Kubernetes Operator, Table SQL / API Affects Versions: kubernetes-operator-1.4.0, 1.16.1 Reporter: tanjialiang Attachments: image-2023-03-03-10-34-46-681.png Hello teams, when i try to recurring submit a flinksessionjobs by flink operator, it will be make JobManager's metaspace OOM. My Job having some flink-sql logic, it is the userclassloader didn't closed? Or may be beacuase of flink-sql's codegen? By the way, it not appear when i using flink-sql-gateway to submit. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30551) Add open method to PartitionCommitPolicy
[ https://issues.apache.org/jira/browse/FLINK-30551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17691406#comment-17691406 ] tanjialiang commented on FLINK-30551: - I think it is necessary for custom partition commit policy to get the flink configuration from open method. I can set some custom option into flink conf, and use in my custom commit policy. WDYT? [~gaoyunhaii] [~lzljs3620320] > Add open method to PartitionCommitPolicy > > > Key: FLINK-30551 > URL: https://issues.apache.org/jira/browse/FLINK-30551 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Aitozi >Priority: Major > Labels: pull-request-available > > Currently, the {{PartitionCommitPolicy}} do not have the open hook. The > custom partition commit policy does not have an appropriate entry point for > the init work. > So I purpose to add an {{open}} method to make this work. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30687) FILTER not effect in count(*)
[ https://issues.apache.org/jira/browse/FLINK-30687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-30687: Description: When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' is not effect. {code:java} CREATE TABLE student ( id INT NOT NULL, name STRING, class_id INT NOT NULL ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/test', 'table-name' = 'student', 'username' = 'root', 'password' = '12345678' ); SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student; or SELECT COUNT(1) FILTER (WHERE class_id = 1) FROM student;{code} But when i tried Flink SQL like this, it worked. {code:java} SELECT COUNT(*) FROM student WHERE class_id = 1; or SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code} By the way, mysql connector has a bug and fixed in https://issues.apache.org/jira/browse/FLINK-29558. Maybe you try this demo should cherry-pick this PR first. was: When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' is not effect. {code:java} CREATE TABLE student ( id INT NOT NULL, name STRING, class_id INT NOT NULL ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/test', 'table-name' = 'student', 'username' = 'root', 'password' = '12345678' ); SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student; or SELECT COUNT(1) FILTER (WHERE class_id = 1) FROM student;{code} But when i tried Flink SQL like this, it worked. {code:java} SELECT COUNT(*) FROM student WHERE class_id = 1; or SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code} By the way, mysql connector has a bug and fixed in FLINK-27268. Maybe you try this demo should cherry-pick this PR first. > FILTER not effect in count(*) > - > > Key: FLINK-30687 > URL: https://issues.apache.org/jira/browse/FLINK-30687 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0 >Reporter: tanjialiang >Priority: Major > Attachments: image-2023-01-16-10-54-04-673.png > > > When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' > is not effect. > {code:java} > CREATE TABLE student > ( > id INT NOT NULL, > name STRING, > class_id INT NOT NULL > ) > WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://localhost:3306/test', > 'table-name' = 'student', > 'username' = 'root', > 'password' = '12345678' > ); > SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student; > or > SELECT COUNT(1) FILTER (WHERE class_id = 1) FROM student;{code} > > But when i tried Flink SQL like this, it worked. > {code:java} > SELECT COUNT(*) FROM student WHERE class_id = 1; > or > SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code} > > By the way, mysql connector has a bug and fixed in > https://issues.apache.org/jira/browse/FLINK-29558. Maybe you try this demo > should cherry-pick this PR first. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-30922) SQL validate fail in parsing writable metadata
[ https://issues.apache.org/jira/browse/FLINK-30922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686968#comment-17686968 ] tanjialiang edited comment on FLINK-30922 at 2/10/23 9:04 AM: -- Great! Thanks for [~csq] 's contribute. Hope to merge it soon. was (Author: JIRAUSER279823): Great! Thanks for [~csq] 's contribute. Hope to merge it soon.[|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=godfreyhe] > SQL validate fail in parsing writable metadata > -- > > Key: FLINK-30922 > URL: https://issues.apache.org/jira/browse/FLINK-30922 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.1 >Reporter: tanjialiang >Priority: Major > Labels: pull-request-available > > When i tried an simple demo sql with writing metadata to the kafka in flink > sql client > {code:java} > CREATE TABLE KafkaTable ( > `user_id` BIGINT, > `item_id` BIGINT, > `behavior` STRING, > `ts` TIMESTAMP(3) METADATA FROM 'timestamp' > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'user_behavior', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'testGroup', > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'csv' > ) > INSERT INTO KafkaTable(user_id, ts) SELECT '1', CURRENT_TIMESTAMP; {code} > > it will be throw an error > {code:java} > org.apache.flink.table.client.gateway.SqlExecutionException: Failed to parse > statement: INSERT INTO KafkaTable(user_id, ts) SELECT '1', CURRENT_TIMESTAMP; > at > org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:174) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.cli.SqlCommandParserImpl.parseCommand(SqlCommandParserImpl.java:45) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.cli.SqlMultiLineParser.parse(SqlMultiLineParser.java:71) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.jline.reader.impl.LineReaderImpl.acceptLine(LineReaderImpl.java:2964) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.jline.reader.impl.LineReaderImpl$$Lambda$364/1900307803.apply(Unknown > Source) ~[?:?] > at > org.jline.reader.impl.LineReaderImpl$1.apply(LineReaderImpl.java:3778) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.jline.reader.impl.LineReaderImpl.readLine(LineReaderImpl.java:679) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:295) > [flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:280) > [flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:228) > [flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) > [flink-sql-client-1.16.1.jar:1.16.1] > at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) > [flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) > [flink-sql-client-1.16.1.jar:1.16.1] > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) > [flink-sql-client-1.16.1.jar:1.16.1] > Caused by: org.apache.flink.table.api.ValidationException: SQL validation > failed. From line 1, column 33 to line 1, column 34: Unknown target column > 'ts' > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186) > ~[?:?] > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113) > ~[?:?] > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261) > ~[?:?] > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) > ~[?:?] > at > org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172) > ~[flink-sql-client-1.16.1.jar:1.16.1] > ... 13 more > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, > column 33 to line 1, column 34: Unknown target column 'ts' > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) ~[?:1.8.0_41] > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > ~[?:1.8.0_41] > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > ~[?:1.8.0_41] > at
[jira] [Commented] (FLINK-30922) SQL validate fail in parsing writable metadata
[ https://issues.apache.org/jira/browse/FLINK-30922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686968#comment-17686968 ] tanjialiang commented on FLINK-30922: - Great! Thanks for [~csq] 's contribute. Hope to merge it soon.[|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=godfreyhe] > SQL validate fail in parsing writable metadata > -- > > Key: FLINK-30922 > URL: https://issues.apache.org/jira/browse/FLINK-30922 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.1 >Reporter: tanjialiang >Priority: Major > Labels: pull-request-available > > When i tried an simple demo sql with writing metadata to the kafka in flink > sql client > {code:java} > CREATE TABLE KafkaTable ( > `user_id` BIGINT, > `item_id` BIGINT, > `behavior` STRING, > `ts` TIMESTAMP(3) METADATA FROM 'timestamp' > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'user_behavior', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'testGroup', > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'csv' > ) > INSERT INTO KafkaTable(user_id, ts) SELECT '1', CURRENT_TIMESTAMP; {code} > > it will be throw an error > {code:java} > org.apache.flink.table.client.gateway.SqlExecutionException: Failed to parse > statement: INSERT INTO KafkaTable(user_id, ts) SELECT '1', CURRENT_TIMESTAMP; > at > org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:174) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.cli.SqlCommandParserImpl.parseCommand(SqlCommandParserImpl.java:45) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.cli.SqlMultiLineParser.parse(SqlMultiLineParser.java:71) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.jline.reader.impl.LineReaderImpl.acceptLine(LineReaderImpl.java:2964) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.jline.reader.impl.LineReaderImpl$$Lambda$364/1900307803.apply(Unknown > Source) ~[?:?] > at > org.jline.reader.impl.LineReaderImpl$1.apply(LineReaderImpl.java:3778) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.jline.reader.impl.LineReaderImpl.readLine(LineReaderImpl.java:679) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:295) > [flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:280) > [flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:228) > [flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) > [flink-sql-client-1.16.1.jar:1.16.1] > at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) > [flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) > [flink-sql-client-1.16.1.jar:1.16.1] > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) > [flink-sql-client-1.16.1.jar:1.16.1] > Caused by: org.apache.flink.table.api.ValidationException: SQL validation > failed. From line 1, column 33 to line 1, column 34: Unknown target column > 'ts' > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186) > ~[?:?] > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113) > ~[?:?] > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261) > ~[?:?] > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) > ~[?:?] > at > org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172) > ~[flink-sql-client-1.16.1.jar:1.16.1] > ... 13 more > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, > column 33 to line 1, column 34: Unknown target column 'ts' > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) ~[?:1.8.0_41] > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > ~[?:1.8.0_41] > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > ~[?:1.8.0_41] > at java.lang.reflect.Constructor.newInstance(Constructor.java:422) > ~[?:1.8.0_41] > at >
[jira] [Updated] (FLINK-30922) SQL validate fail in parsing writable metadata
[ https://issues.apache.org/jira/browse/FLINK-30922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-30922: Summary: SQL validate fail in parsing writable metadata (was: SQL validate fail to parse writing metadata) > SQL validate fail in parsing writable metadata > -- > > Key: FLINK-30922 > URL: https://issues.apache.org/jira/browse/FLINK-30922 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.1 >Reporter: tanjialiang >Priority: Major > > When i tried an simple demo sql with writing metadata to the kafka in flink > sql client > {code:java} > CREATE TABLE KafkaTable ( > `user_id` BIGINT, > `item_id` BIGINT, > `behavior` STRING, > `ts` TIMESTAMP(3) METADATA FROM 'timestamp' > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'user_behavior', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'testGroup', > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'csv' > ) > INSERT INTO KafkaTable(user_id, ts) SELECT '1', CURRENT_TIMESTAMP; {code} > > it will be throw an error > {code:java} > org.apache.flink.table.client.gateway.SqlExecutionException: Failed to parse > statement: INSERT INTO KafkaTable(user_id, ts) SELECT '1', CURRENT_TIMESTAMP; > at > org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:174) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.cli.SqlCommandParserImpl.parseCommand(SqlCommandParserImpl.java:45) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.cli.SqlMultiLineParser.parse(SqlMultiLineParser.java:71) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.jline.reader.impl.LineReaderImpl.acceptLine(LineReaderImpl.java:2964) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.jline.reader.impl.LineReaderImpl$$Lambda$364/1900307803.apply(Unknown > Source) ~[?:?] > at > org.jline.reader.impl.LineReaderImpl$1.apply(LineReaderImpl.java:3778) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.jline.reader.impl.LineReaderImpl.readLine(LineReaderImpl.java:679) > ~[flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:295) > [flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:280) > [flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:228) > [flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) > [flink-sql-client-1.16.1.jar:1.16.1] > at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) > [flink-sql-client-1.16.1.jar:1.16.1] > at > org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) > [flink-sql-client-1.16.1.jar:1.16.1] > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) > [flink-sql-client-1.16.1.jar:1.16.1] > Caused by: org.apache.flink.table.api.ValidationException: SQL validation > failed. From line 1, column 33 to line 1, column 34: Unknown target column > 'ts' > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186) > ~[?:?] > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113) > ~[?:?] > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261) > ~[?:?] > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) > ~[?:?] > at > org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172) > ~[flink-sql-client-1.16.1.jar:1.16.1] > ... 13 more > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, > column 33 to line 1, column 34: Unknown target column 'ts' > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) ~[?:1.8.0_41] > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > ~[?:1.8.0_41] > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > ~[?:1.8.0_41] > at java.lang.reflect.Constructor.newInstance(Constructor.java:422) > ~[?:1.8.0_41] > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) > ~[?:?] > at > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
[jira] [Created] (FLINK-30922) SQL validate fail to parse writing metadata
tanjialiang created FLINK-30922: --- Summary: SQL validate fail to parse writing metadata Key: FLINK-30922 URL: https://issues.apache.org/jira/browse/FLINK-30922 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.16.1 Reporter: tanjialiang When i tried an simple demo sql with writing metadata to the kafka in flink sql client {code:java} CREATE TABLE KafkaTable ( `user_id` BIGINT, `item_id` BIGINT, `behavior` STRING, `ts` TIMESTAMP(3) METADATA FROM 'timestamp' ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ) INSERT INTO KafkaTable(user_id, ts) SELECT '1', CURRENT_TIMESTAMP; {code} it will be throw an error {code:java} org.apache.flink.table.client.gateway.SqlExecutionException: Failed to parse statement: INSERT INTO KafkaTable(user_id, ts) SELECT '1', CURRENT_TIMESTAMP; at org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:174) ~[flink-sql-client-1.16.1.jar:1.16.1] at org.apache.flink.table.client.cli.SqlCommandParserImpl.parseCommand(SqlCommandParserImpl.java:45) ~[flink-sql-client-1.16.1.jar:1.16.1] at org.apache.flink.table.client.cli.SqlMultiLineParser.parse(SqlMultiLineParser.java:71) ~[flink-sql-client-1.16.1.jar:1.16.1] at org.jline.reader.impl.LineReaderImpl.acceptLine(LineReaderImpl.java:2964) ~[flink-sql-client-1.16.1.jar:1.16.1] at org.jline.reader.impl.LineReaderImpl$$Lambda$364/1900307803.apply(Unknown Source) ~[?:?] at org.jline.reader.impl.LineReaderImpl$1.apply(LineReaderImpl.java:3778) ~[flink-sql-client-1.16.1.jar:1.16.1] at org.jline.reader.impl.LineReaderImpl.readLine(LineReaderImpl.java:679) ~[flink-sql-client-1.16.1.jar:1.16.1] at org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:295) [flink-sql-client-1.16.1.jar:1.16.1] at org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:280) [flink-sql-client-1.16.1.jar:1.16.1] at org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:228) [flink-sql-client-1.16.1.jar:1.16.1] at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) [flink-sql-client-1.16.1.jar:1.16.1] at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) [flink-sql-client-1.16.1.jar:1.16.1] at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) [flink-sql-client-1.16.1.jar:1.16.1] at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) [flink-sql-client-1.16.1.jar:1.16.1] Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 33 to line 1, column 34: Unknown target column 'ts' at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186) ~[?:?] at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113) ~[?:?] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261) ~[?:?] at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) ~[?:?] at org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172) ~[flink-sql-client-1.16.1.jar:1.16.1] ... 13 more Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 33 to line 1, column 34: Unknown target column 'ts' at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_41] at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[?:1.8.0_41] at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_41] at java.lang.reflect.Constructor.newInstance(Constructor.java:422) ~[?:1.8.0_41] at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) ~[?:?] at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883) ~[?:?] at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868) ~[?:?] at org.apache.flink.table.planner.calcite.PreValidateReWriter$.newValidationError(PreValidateReWriter.scala:401) ~[?:?] at org.apache.flink.table.planner.calcite.PreValidateReWriter$.validateField(PreValidateReWriter.scala:389) ~[?:?] at org.apache.flink.table.planner.calcite.PreValidateReWriter$.$anonfun$appendPartitionAndNullsProjects$3(PreValidateReWriter.scala:172) ~[?:?]
[jira] [Comment Edited] (FLINK-30687) FILTER not effect in count(*)
[ https://issues.apache.org/jira/browse/FLINK-30687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677122#comment-17677122 ] tanjialiang edited comment on FLINK-30687 at 1/16/23 2:58 AM: -- I try to found out how to fix this, i found in org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator#createFilterExpression the filterArg must be >0, but ‘SELECT COUNT( * ) FILTER (WHERE cluster_id = 1) FROM ${table}’ 's filterArg is 0. I fixed the condition in 'filterArg >= 0', and it work. !image-2023-01-16-10-54-04-673.png! So i want to ask why we can not filterArg=0? Did it some scenes we can not use? Because 'inputFieldTypes' is a 'scala.collection.Seq' which index is start from 0. was (Author: JIRAUSER279823): I try to found out how to fix this, i found in org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator#createFilterExpression the filterArg must be >0, but ‘SELECT COUNT(*) FILTER (WHERE cluster_id = 1) FROM ${table}’ 's filterArg is 0. I fixed the condition in 'filterArg >= 0', and it work. !image-2023-01-16-10-54-04-673.png! So i want to ask why we can not filterArg=0? Did it some scenes we can not use? Because 'inputFieldTypes' is a 'scala.collection.Seq' which index is start from 0. > FILTER not effect in count(*) > - > > Key: FLINK-30687 > URL: https://issues.apache.org/jira/browse/FLINK-30687 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0 >Reporter: tanjialiang >Priority: Major > Attachments: image-2023-01-16-10-54-04-673.png > > > When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' > is not effect. > {code:java} > CREATE TABLE student > ( > id INT NOT NULL, > name STRING, > class_id INT NOT NULL > ) > WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://localhost:3306/test', > 'table-name' = 'student', > 'username' = 'root', > 'password' = '12345678' > ); > SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student; > or > SELECT COUNT(1) FILTER (WHERE class_id = 1) FROM student;{code} > > But when i tried Flink SQL like this, it worked. > {code:java} > SELECT COUNT(*) FROM student WHERE class_id = 1; > or > SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code} > > By the way, mysql connector has a bug and fixed in FLINK-27268. Maybe you try > this demo should cherry-pick this PR first. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30687) FILTER not effect in count(*)
[ https://issues.apache.org/jira/browse/FLINK-30687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677122#comment-17677122 ] tanjialiang commented on FLINK-30687: - I try to found out how to fix this, i found in org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator#createFilterExpression the filterArg must be >0, but ‘SELECT COUNT(*) FILTER (WHERE cluster_id = 1) FROM ${table}’ 's filterArg is 0. I fixed the condition in 'filterArg >= 0', and it work. !image-2023-01-16-10-54-04-673.png! So i want to ask why we can not filterArg=0? Did it some scenes we can not use? Because 'inputFieldTypes' is a 'scala.collection.Seq' which index is start from 0. > FILTER not effect in count(*) > - > > Key: FLINK-30687 > URL: https://issues.apache.org/jira/browse/FLINK-30687 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0 >Reporter: tanjialiang >Priority: Major > Attachments: image-2023-01-16-10-54-04-673.png > > > When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' > is not effect. > {code:java} > CREATE TABLE student > ( > id INT NOT NULL, > name STRING, > class_id INT NOT NULL > ) > WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://localhost:3306/test', > 'table-name' = 'student', > 'username' = 'root', > 'password' = '12345678' > ); > SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student; > or > SELECT COUNT(1) FILTER (WHERE class_id = 1) FROM student;{code} > > But when i tried Flink SQL like this, it worked. > {code:java} > SELECT COUNT(*) FROM student WHERE class_id = 1; > or > SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code} > > By the way, mysql connector has a bug and fixed in FLINK-27268. Maybe you try > this demo should cherry-pick this PR first. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30687) FILTER not effect in count(*)
[ https://issues.apache.org/jira/browse/FLINK-30687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-30687: Attachment: image-2023-01-16-10-54-04-673.png > FILTER not effect in count(*) > - > > Key: FLINK-30687 > URL: https://issues.apache.org/jira/browse/FLINK-30687 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0 >Reporter: tanjialiang >Priority: Major > Attachments: image-2023-01-16-10-54-04-673.png > > > When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' > is not effect. > {code:java} > CREATE TABLE student > ( > id INT NOT NULL, > name STRING, > class_id INT NOT NULL > ) > WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://localhost:3306/test', > 'table-name' = 'student', > 'username' = 'root', > 'password' = '12345678' > ); > SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student; > or > SELECT COUNT(1) FILTER (WHERE class_id = 1) FROM student;{code} > > But when i tried Flink SQL like this, it worked. > {code:java} > SELECT COUNT(*) FROM student WHERE class_id = 1; > or > SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code} > > By the way, mysql connector has a bug and fixed in FLINK-27268. Maybe you try > this demo should cherry-pick this PR first. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30687) FILTER not effect in count(*)
[ https://issues.apache.org/jira/browse/FLINK-30687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-30687: Description: When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' is not effect. {code:java} CREATE TABLE student ( id INT NOT NULL, name STRING, class_id INT NOT NULL ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/test', 'table-name' = 'student', 'username' = 'root', 'password' = '12345678' ); SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student; or SELECT COUNT(1) FILTER (WHERE class_id = 1) FROM student;{code} But when i tried Flink SQL like this, it worked. {code:java} SELECT COUNT(*) FROM student WHERE class_id = 1; or SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code} By the way, mysql connector has a bug and fixed in FLINK-27268. Maybe you try this demo should cherry-pick this PR first. was: When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' is not effect. {code:java} CREATE TABLE student ( id INT NOT NULL, name STRING, class_id INT NOT NULL ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/test', 'table-name' = 'student', 'username' = 'root', 'password' = '12345678' ); SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student; {code} But when i tried Flink SQL like this, it worked. {code:java} SELECT COUNT(*) FROM student WHERE class_id = 1; or SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code} By the way, mysql connector has a bug and fixed in [FLINK-27268|https://issues.apache.org/jira/browse/FLINK-27268]. Maybe you try this demo should cherry-pick this PR first. > FILTER not effect in count(*) > - > > Key: FLINK-30687 > URL: https://issues.apache.org/jira/browse/FLINK-30687 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0 >Reporter: tanjialiang >Priority: Major > > When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' > is not effect. > {code:java} > CREATE TABLE student > ( > id INT NOT NULL, > name STRING, > class_id INT NOT NULL > ) > WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://localhost:3306/test', > 'table-name' = 'student', > 'username' = 'root', > 'password' = '12345678' > ); > SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student; > or > SELECT COUNT(1) FILTER (WHERE class_id = 1) FROM student;{code} > > But when i tried Flink SQL like this, it worked. > {code:java} > SELECT COUNT(*) FROM student WHERE class_id = 1; > or > SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code} > > By the way, mysql connector has a bug and fixed in FLINK-27268. Maybe you try > this demo should cherry-pick this PR first. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30687) FILTER not effect in count(*)
[ https://issues.apache.org/jira/browse/FLINK-30687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-30687: Description: When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' is not effect. {code:java} CREATE TABLE student ( id INT NOT NULL, name STRING, class_id INT NOT NULL ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/test', 'table-name' = 'student', 'username' = 'root', 'password' = '12345678' ); SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student; {code} But when i tried Flink SQL like this, it worked. {code:java} SELECT COUNT(*) FROM student WHERE class_id = 1; or SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code} By the way, mysql connector has a bug and fixed in [FLINK-27268|https://issues.apache.org/jira/browse/FLINK-27268]. Maybe you try this demo should cherry-pick this PR first. was: When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' is not effect. {code:java} CREATE TABLE student ( id INT NOT NULL, name STRING, class_id INT NOT NULL ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/test', 'table-name' = 'student', 'username' = 'root', 'password' = '12345678' ); SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student; {code} But when i tried Flink SQL like this, it worked. {code:java} SELECT COUNT(*) FROM student WHERE class_id = 1; or SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code} By the way, mysql connector has a bug and fixed in [FLINK-27268|(https://issues.apache.org/jira/browse/FLINK-27268]. Maybe you try this demo should cherry-pick this PR first. > FILTER not effect in count(*) > - > > Key: FLINK-30687 > URL: https://issues.apache.org/jira/browse/FLINK-30687 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0 >Reporter: tanjialiang >Priority: Major > > When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' > is not effect. > {code:java} > CREATE TABLE student > ( > id INT NOT NULL, > name STRING, > class_id INT NOT NULL > ) > WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://localhost:3306/test', > 'table-name' = 'student', > 'username' = 'root', > 'password' = '12345678' > ); > SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student; > {code} > > But when i tried Flink SQL like this, it worked. > {code:java} > SELECT COUNT(*) FROM student WHERE class_id = 1; > or > SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code} > > By the way, mysql connector has a bug and fixed in > [FLINK-27268|https://issues.apache.org/jira/browse/FLINK-27268]. Maybe you > try this demo should cherry-pick this PR first. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30687) FILTER not effect in count(*)
[ https://issues.apache.org/jira/browse/FLINK-30687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-30687: Description: When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' is not effect. {code:java} CREATE TABLE student ( id INT NOT NULL, name STRING, class_id INT NOT NULL ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/test', 'table-name' = 'student', 'username' = 'root', 'password' = '12345678' ); SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student; {code} But when i tried Flink SQL like this, it worked. {code:java} SELECT COUNT(*) FROM student WHERE class_id = 1; or SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code} By the way, mysql connector has a bug and fixed in [FLINK-27268|(https://issues.apache.org/jira/browse/FLINK-27268]. Maybe you try this demo should cherry-pick this PR first. was: When i try to using Flink SQL like this {code:java} CREATE TABLE student ( id INT NOT NULL, name STRING, class_id INT NOT NULL ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/test', 'table-name' = 'student', 'username' = 'root', 'password' = '12345678' ); SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student; {code} I found 'FILTER(WHERE class_id = 1)' is not effect. But when i tried Flink SQL like this, it worked. {code:java} SELECT COUNT(*) FROM student WHERE class_id = 1; or SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code} By the way, mysql connector has a bug and fixed in [FLINK-27268|(https://issues.apache.org/jira/browse/FLINK-27268]. Maybe you try this demo should cherry-pick this PR first. > FILTER not effect in count(*) > - > > Key: FLINK-30687 > URL: https://issues.apache.org/jira/browse/FLINK-30687 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0 >Reporter: tanjialiang >Priority: Major > > When i try to using Flink SQL like this, i found 'FILTER(WHERE class_id = 1)' > is not effect. > {code:java} > CREATE TABLE student > ( > id INT NOT NULL, > name STRING, > class_id INT NOT NULL > ) > WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://localhost:3306/test', > 'table-name' = 'student', > 'username' = 'root', > 'password' = '12345678' > ); > SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student; > {code} > > But when i tried Flink SQL like this, it worked. > {code:java} > SELECT COUNT(*) FROM student WHERE class_id = 1; > or > SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code} > > By the way, mysql connector has a bug and fixed in > [FLINK-27268|(https://issues.apache.org/jira/browse/FLINK-27268]. Maybe you > try this demo should cherry-pick this PR first. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30687) FILTER not effect in count(*)
[ https://issues.apache.org/jira/browse/FLINK-30687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-30687: Description: When i try to using Flink SQL like this {code:java} CREATE TABLE student ( id INT NOT NULL, name STRING, class_id INT NOT NULL ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/test', 'table-name' = 'student', 'username' = 'root', 'password' = '12345678' ); SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student; {code} I found 'FILTER(WHERE class_id = 1)' is not effect. But when i tried Flink SQL like this, it worked. {code:java} SELECT COUNT(*) FROM student WHERE class_id = 1; or SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code} By the way, mysql connector has a bug and fixed in [FLINK-27268|(https://issues.apache.org/jira/browse/FLINK-27268]. Maybe you try this demo should cherry-pick this PR first. was: When i try to using Flink SQL like this {code:java} CREATE TABLE student ( id INT NOT NULL, name STRING, class_id INT NOT NULL ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/test', 'table-name' = 'student', 'username' = 'root', 'password' = '12345678' ); SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student; {code} I found 'FILTER(WHERE class_id = 1)' is not effect. But when i tried Flink SQL like this, it worked. {code:java} SELECT COUNT(*) FROM student WHERE class_id = 1; or SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code} By the way, mysql connector has a bug and fixed in [FLINK-27268|(https://issues.apache.org/jira/browse/FLINK-27268]. Maybe you try this demo should cherry-pick this PR first. > FILTER not effect in count(*) > - > > Key: FLINK-30687 > URL: https://issues.apache.org/jira/browse/FLINK-30687 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0 >Reporter: tanjialiang >Priority: Major > > When i try to using Flink SQL like this > {code:java} > CREATE TABLE student > ( > id INT NOT NULL, > name STRING, > class_id INT NOT NULL > ) > WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://localhost:3306/test', > 'table-name' = 'student', > 'username' = 'root', > 'password' = '12345678' > ); > SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student; > {code} > > I found 'FILTER(WHERE class_id = 1)' is not effect. > But when i tried Flink SQL like this, it worked. > {code:java} > SELECT COUNT(*) FROM student WHERE class_id = 1; > or > SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code} > > By the way, mysql connector has a bug and fixed in > [FLINK-27268|(https://issues.apache.org/jira/browse/FLINK-27268]. Maybe you > try this demo should cherry-pick this PR first. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30687) FILTER not effect in count(*)
tanjialiang created FLINK-30687: --- Summary: FILTER not effect in count(*) Key: FLINK-30687 URL: https://issues.apache.org/jira/browse/FLINK-30687 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.16.0 Reporter: tanjialiang When i try to using Flink SQL like this {code:java} CREATE TABLE student ( id INT NOT NULL, name STRING, class_id INT NOT NULL ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/test', 'table-name' = 'student', 'username' = 'root', 'password' = '12345678' ); SELECT COUNT(*) FILTER (WHERE class_id = 1) FROM student; {code} I found 'FILTER(WHERE class_id = 1)' is not effect. But when i tried Flink SQL like this, it worked. {code:java} SELECT COUNT(*) FROM student WHERE class_id = 1; or SELECT COUNT(class_id) FILTER (WHERE class_id = 1) FROM student;{code} By the way, mysql connector has a bug and fixed in [FLINK-27268|(https://issues.apache.org/jira/browse/FLINK-27268]. Maybe you try this demo should cherry-pick this PR first. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29558) Use select count(*) from xxx; and get SQL syntax
[ https://issues.apache.org/jira/browse/FLINK-29558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17676845#comment-17676845 ] tanjialiang commented on FLINK-29558: - [~StarBoy1005] It had been fixed in [FLINK-27268|https://issues.apache.org/jira/browse/FLINK-27268], but i found a problem, when using flink sql like this "SELECT COUNT(*) FROM table" or "SELECT COUNT(1) FROM table", projection always return 'ROW<> NOT NULL', so it cause mysql connector transform to "SELECT FROM table". After [FLINK-27268|https://issues.apache.org/jira/browse/FLINK-27268], it transform to "SELECT '' FROM table". But is it projection's bug? When projection can not match something, it will be return 'ROW<> NOT NULL'. I think maybe not only mysql connector has this bug. cc [~martijnvisser] [~jark] > Use select count(*) from xxx; and get SQL syntax > > > Key: FLINK-29558 > URL: https://issues.apache.org/jira/browse/FLINK-29558 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Affects Versions: 1.15.3 > Environment: flink 1.15.2 > CentOS Linux release 7.9.2009 (Core) > 5.7.32-log MySQL Community Server (GPL) >Reporter: StarBoy1005 >Priority: Major > Attachments: image-2022-10-10-15-31-34-341.png, > image-2022-10-19-17-55-14-700.png, image-2022-11-03-18-16-12-127.png, > screenshot-1.png, screenshot-2.png > > > Hi, I use flink sql to make kafka records to mysql. > so I create these 2 tables in flink sql,here is the mysql ,and I created the > table in mysql before I did the insert action in flink sql. > CREATE TABLE mysql_MyUserTable ( > id STRING, > name STRING, > age STRING, > status STRING, > PRIMARY KEY (id) NOT ENFORCED > ) WITH ( >'connector' = 'jdbc', >'url' = 'jdbc:mysql://10.19.29.170:3306/fromflink152', >'table-name' = 'users', >'username' = 'root', >'password' = '**' > ); > In mysql, I created database "fromflink152" then created the table like this > way > CREATE TABLE `users` ( > `id` varchar(64) NOT NULL DEFAULT '', > `name` varchar(255) DEFAULT NULL, > `age` varchar(255) DEFAULT NULL, > `status` varchar(255) DEFAULT NULL, > PRIMARY KEY (`id`) > ) > After executed insert sql,I found 'select * from mysql_MyUserTable' can get > correct result,but ’select count(\*) from mysql_MyUserTable‘ or ’select > count(id) from mysql_MyUserTable‘ ,the collect job in flink app keep > restarting again and again.The exception is: > !image-2022-10-10-15-31-34-341.png! > So I wonder which config that I missed about the table in flink or mysql side > :( -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-29558) Use select count(*) from xxx; and get SQL syntax
[ https://issues.apache.org/jira/browse/FLINK-29558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17676845#comment-17676845 ] tanjialiang edited comment on FLINK-29558 at 1/14/23 5:48 AM: -- [~StarBoy1005] It had been fixed in FLINK-27268, but i found a problem, when using flink sql like this "SELECT COUNT(*) FROM table" or "SELECT COUNT(1) FROM table", projection always return 'ROW<> NOT NULL', so it cause mysql connector transform to "SELECT FROM table". After FLINK-27268, it transform to "SELECT '' FROM table". But is it projection's bug? When projection can not match something, it will be return 'ROW<> NOT NULL'. I think maybe not only mysql connector has this bug. cc [~martijnvisser] [~jark] was (Author: JIRAUSER279823): [~StarBoy1005] It had been fixed in [FLINK-27268|https://issues.apache.org/jira/browse/FLINK-27268], but i found a problem, when using flink sql like this "SELECT COUNT(*) FROM table" or "SELECT COUNT(1) FROM table", projection always return 'ROW<> NOT NULL', so it cause mysql connector transform to "SELECT FROM table". After [FLINK-27268|https://issues.apache.org/jira/browse/FLINK-27268], it transform to "SELECT '' FROM table". But is it projection's bug? When projection can not match something, it will be return 'ROW<> NOT NULL'. I think maybe not only mysql connector has this bug. cc [~martijnvisser] [~jark] > Use select count(*) from xxx; and get SQL syntax > > > Key: FLINK-29558 > URL: https://issues.apache.org/jira/browse/FLINK-29558 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Affects Versions: 1.15.3 > Environment: flink 1.15.2 > CentOS Linux release 7.9.2009 (Core) > 5.7.32-log MySQL Community Server (GPL) >Reporter: StarBoy1005 >Priority: Major > Attachments: image-2022-10-10-15-31-34-341.png, > image-2022-10-19-17-55-14-700.png, image-2022-11-03-18-16-12-127.png, > screenshot-1.png, screenshot-2.png > > > Hi, I use flink sql to make kafka records to mysql. > so I create these 2 tables in flink sql,here is the mysql ,and I created the > table in mysql before I did the insert action in flink sql. > CREATE TABLE mysql_MyUserTable ( > id STRING, > name STRING, > age STRING, > status STRING, > PRIMARY KEY (id) NOT ENFORCED > ) WITH ( >'connector' = 'jdbc', >'url' = 'jdbc:mysql://10.19.29.170:3306/fromflink152', >'table-name' = 'users', >'username' = 'root', >'password' = '**' > ); > In mysql, I created database "fromflink152" then created the table like this > way > CREATE TABLE `users` ( > `id` varchar(64) NOT NULL DEFAULT '', > `name` varchar(255) DEFAULT NULL, > `age` varchar(255) DEFAULT NULL, > `status` varchar(255) DEFAULT NULL, > PRIMARY KEY (`id`) > ) > After executed insert sql,I found 'select * from mysql_MyUserTable' can get > correct result,but ’select count(\*) from mysql_MyUserTable‘ or ’select > count(id) from mysql_MyUserTable‘ ,the collect job in flink app keep > restarting again and again.The exception is: > !image-2022-10-10-15-31-34-341.png! > So I wonder which config that I missed about the table in flink or mysql side > :( -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-29558) Use select count(*) from xxx; and get SQL syntax
[ https://issues.apache.org/jira/browse/FLINK-29558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17676845#comment-17676845 ] tanjialiang edited comment on FLINK-29558 at 1/14/23 5:48 AM: -- [~StarBoy1005] It had been fixed in FLINK-27268, but i found a problem, when using flink sql like this "SELECT COUNT( * ) FROM table" or "SELECT COUNT(1) FROM table", projection always return 'ROW<> NOT NULL', so it cause mysql connector transform to "SELECT FROM table". After FLINK-27268, it transform to "SELECT '' FROM table". But is it projection's bug? When projection can not match something, it will be return 'ROW<> NOT NULL'. I think maybe not only mysql connector has this bug. cc [~martijnvisser] [~jark] was (Author: JIRAUSER279823): [~StarBoy1005] It had been fixed in FLINK-27268, but i found a problem, when using flink sql like this "SELECT COUNT(*) FROM table" or "SELECT COUNT(1) FROM table", projection always return 'ROW<> NOT NULL', so it cause mysql connector transform to "SELECT FROM table". After FLINK-27268, it transform to "SELECT '' FROM table". But is it projection's bug? When projection can not match something, it will be return 'ROW<> NOT NULL'. I think maybe not only mysql connector has this bug. cc [~martijnvisser] [~jark] > Use select count(*) from xxx; and get SQL syntax > > > Key: FLINK-29558 > URL: https://issues.apache.org/jira/browse/FLINK-29558 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Affects Versions: 1.15.3 > Environment: flink 1.15.2 > CentOS Linux release 7.9.2009 (Core) > 5.7.32-log MySQL Community Server (GPL) >Reporter: StarBoy1005 >Priority: Major > Attachments: image-2022-10-10-15-31-34-341.png, > image-2022-10-19-17-55-14-700.png, image-2022-11-03-18-16-12-127.png, > screenshot-1.png, screenshot-2.png > > > Hi, I use flink sql to make kafka records to mysql. > so I create these 2 tables in flink sql,here is the mysql ,and I created the > table in mysql before I did the insert action in flink sql. > CREATE TABLE mysql_MyUserTable ( > id STRING, > name STRING, > age STRING, > status STRING, > PRIMARY KEY (id) NOT ENFORCED > ) WITH ( >'connector' = 'jdbc', >'url' = 'jdbc:mysql://10.19.29.170:3306/fromflink152', >'table-name' = 'users', >'username' = 'root', >'password' = '**' > ); > In mysql, I created database "fromflink152" then created the table like this > way > CREATE TABLE `users` ( > `id` varchar(64) NOT NULL DEFAULT '', > `name` varchar(255) DEFAULT NULL, > `age` varchar(255) DEFAULT NULL, > `status` varchar(255) DEFAULT NULL, > PRIMARY KEY (`id`) > ) > After executed insert sql,I found 'select * from mysql_MyUserTable' can get > correct result,but ’select count(\*) from mysql_MyUserTable‘ or ’select > count(id) from mysql_MyUserTable‘ ,the collect job in flink app keep > restarting again and again.The exception is: > !image-2022-10-10-15-31-34-341.png! > So I wonder which config that I missed about the table in flink or mysql side > :( -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30520) Arguments contains with '#' will error split in loadYAMLResource
[ https://issues.apache.org/jira/browse/FLINK-30520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17652797#comment-17652797 ] tanjialiang commented on FLINK-30520: - [~xtsong], thanks for your reply, it is the best way to introduce proper standard yaml parsing. Hope to fix it soon. > Arguments contains with '#' will error split in loadYAMLResource > > > Key: FLINK-30520 > URL: https://issues.apache.org/jira/browse/FLINK-30520 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.16.0, 1.15.3 >Reporter: tanjialiang >Priority: Major > Labels: pull-request-available > Attachments: image-2022-12-28-17-13-25-351.png, > image-2022-12-28-17-13-38-337.png, image-2022-12-28-18-03-40-806.png > > > When i submit a flink jar job in Kubernetes Application mode which main args > contains '#', it will be error split by > org.apache.flink.configuration.GlobalConfiguration#loadYAMLResource > > such as i using flink-kubernetes-operator to submit a job in kubernetes > application mode > > {code:java} > apiVersion: flink.apache.org/v1beta1 > kind: FlinkDeployment > metadata: > name: word-count > spec: > image: apache/flink:1.16.0-scala_2.12-java8 > flinkVersion: v1_16 > flinkConfiguration: > taskmanager.numberOfTaskSlots: "1" > jobManager: > resource: > memory: "2048m" > cpu: 1 > taskManager: > resource: > memory: "2048m" > cpu: 1 > serviceAccount: flink > job: > jarURI: local:///opt/flink/examples/streaming/WordCount.jar > args: > - --output > - /tmp/1#.txt > parallelism: 2 > upgradeMode: stateless > {code} > > > It will be error split when loading the flink-conf.yaml > !image-2022-12-28-17-13-25-351.png! > > And i enter to the jobmanager's pod to saw the flink-conf.yaml's program-args > is right > !image-2022-12-28-17-13-38-337.png! > > Maybe we should have a more strict validate for yaml comment? this is the > yaml editor by vim, we can see comment should be start with '#' in line or > comment start with ' #' > !image-2022-12-28-18-03-40-806.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30520) Arguments contains with '#' will error split in loadYAMLResource
[ https://issues.apache.org/jira/browse/FLINK-30520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-30520: Description: When i submit a flink jar job in Kubernetes Application mode which main args contains '#', it will be error split by org.apache.flink.configuration.GlobalConfiguration#loadYAMLResource such as i using flink-kubernetes-operator to submit a job in kubernetes application mode {code:java} apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: word-count spec: image: apache/flink:1.16.0-scala_2.12-java8 flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" jobManager: resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 serviceAccount: flink job: jarURI: local:///opt/flink/examples/streaming/WordCount.jar args: - --output - /tmp/1#.txt parallelism: 2 upgradeMode: stateless {code} It will be error split when loading the flink-conf.yaml !image-2022-12-28-17-13-25-351.png! And i enter to the jobmanager's pod to saw the flink-conf.yaml's program-args is right !image-2022-12-28-17-13-38-337.png! Maybe we should have a more strict validate for yaml comment? this is the yaml editor by vim, we can see comment should be start with '#' in line or comment start with ' #' !image-2022-12-28-18-03-40-806.png! was: When i submit a flink jar job in Kubernetes Application mode which main args contains '#', it will be error split by org.apache.flink.configuration.GlobalConfiguration#loadYAMLResource such as i using flink-kubernetes-operator to submit a job in kubernetes application mode {code:java} apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: word-count spec: image: apache/flink:1.16.0-scala_2.12-java8 flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" jobManager: resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 serviceAccount: flink job: jarURI: local:///opt/flink/examples/streaming/WordCount.jar args: - --output - /tmp/1#.txt parallelism: 2 upgradeMode: stateless {code} It will be error split when loading the flink-conf.yaml !image-2022-12-28-17-13-25-351.png! And i enter to the jobmanager's pod to saw the flink-conf.yaml's program-args is right !image-2022-12-28-17-13-38-337.png! Maybe we should have a more strict validate for yaml comment? > Arguments contains with '#' will error split in loadYAMLResource > > > Key: FLINK-30520 > URL: https://issues.apache.org/jira/browse/FLINK-30520 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.16.0, 1.14.6, 1.15.3 >Reporter: tanjialiang >Priority: Major > Labels: pull-request-available > Attachments: image-2022-12-28-17-13-25-351.png, > image-2022-12-28-17-13-38-337.png, image-2022-12-28-18-03-40-806.png > > > When i submit a flink jar job in Kubernetes Application mode which main args > contains '#', it will be error split by > org.apache.flink.configuration.GlobalConfiguration#loadYAMLResource > > such as i using flink-kubernetes-operator to submit a job in kubernetes > application mode > > {code:java} > apiVersion: flink.apache.org/v1beta1 > kind: FlinkDeployment > metadata: > name: word-count > spec: > image: apache/flink:1.16.0-scala_2.12-java8 > flinkVersion: v1_16 > flinkConfiguration: > taskmanager.numberOfTaskSlots: "1" > jobManager: > resource: > memory: "2048m" > cpu: 1 > taskManager: > resource: > memory: "2048m" > cpu: 1 > serviceAccount: flink > job: > jarURI: local:///opt/flink/examples/streaming/WordCount.jar > args: > - --output > - /tmp/1#.txt > parallelism: 2 > upgradeMode: stateless > {code} > > > It will be error split when loading the flink-conf.yaml > !image-2022-12-28-17-13-25-351.png! > > And i enter to the jobmanager's pod to saw the flink-conf.yaml's program-args > is right > !image-2022-12-28-17-13-38-337.png! > > Maybe we should have a more strict validate for yaml comment? this is the > yaml editor by vim, we can see comment should be start with '#' in line or > comment start with ' #' > !image-2022-12-28-18-03-40-806.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30520) Arguments contains with '#' will error split in loadYAMLResource
[ https://issues.apache.org/jira/browse/FLINK-30520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-30520: Attachment: image-2022-12-28-18-03-40-806.png > Arguments contains with '#' will error split in loadYAMLResource > > > Key: FLINK-30520 > URL: https://issues.apache.org/jira/browse/FLINK-30520 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.16.0, 1.14.6, 1.15.3 >Reporter: tanjialiang >Priority: Major > Labels: pull-request-available > Attachments: image-2022-12-28-17-13-25-351.png, > image-2022-12-28-17-13-38-337.png, image-2022-12-28-18-03-40-806.png > > > When i submit a flink jar job in Kubernetes Application mode which main args > contains '#', it will be error split by > org.apache.flink.configuration.GlobalConfiguration#loadYAMLResource > > such as i using flink-kubernetes-operator to submit a job in kubernetes > application mode > > {code:java} > apiVersion: flink.apache.org/v1beta1 > kind: FlinkDeployment > metadata: > name: word-count > spec: > image: apache/flink:1.16.0-scala_2.12-java8 > flinkVersion: v1_16 > flinkConfiguration: > taskmanager.numberOfTaskSlots: "1" > jobManager: > resource: > memory: "2048m" > cpu: 1 > taskManager: > resource: > memory: "2048m" > cpu: 1 > serviceAccount: flink > job: > jarURI: local:///opt/flink/examples/streaming/WordCount.jar > args: > - --output > - /tmp/1#.txt > parallelism: 2 > upgradeMode: stateless > {code} > > > It will be error split when loading the flink-conf.yaml > !image-2022-12-28-17-13-25-351.png! > > And i enter to the jobmanager's pod to saw the flink-conf.yaml's program-args > is right > !image-2022-12-28-17-13-38-337.png! > > Maybe we should have a more strict validate for yaml comment? > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30520) Arguments contains with '#' will error split in loadYAMLResource
[ https://issues.apache.org/jira/browse/FLINK-30520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-30520: Attachment: image-2022-12-28-17-13-38-337.png > Arguments contains with '#' will error split in loadYAMLResource > > > Key: FLINK-30520 > URL: https://issues.apache.org/jira/browse/FLINK-30520 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.16.0, 1.14.6, 1.15.3 >Reporter: tanjialiang >Priority: Major > Attachments: image-2022-12-28-17-13-25-351.png, > image-2022-12-28-17-13-38-337.png > > > When i submit a flink jar job in Kubernetes Application mode which main args > contains '#', it will be error split by > org.apache.flink.configuration.GlobalConfiguration#loadYAMLResource > > such as i using flink-kubernetes-operator to submit a job in kubernetes > application mode > > {code:java} > apiVersion: flink.apache.org/v1beta1 > kind: FlinkDeployment > metadata: > name: word-count > spec: > image: apache/flink:1.16.0-scala_2.12-java8 > flinkVersion: v1_16 > flinkConfiguration: > taskmanager.numberOfTaskSlots: "1" > jobManager: > resource: > memory: "2048m" > cpu: 1 > taskManager: > resource: > memory: "2048m" > cpu: 1 > serviceAccount: flink > job: > jarURI: local:///opt/flink/examples/streaming/WordCount.jar > args: > - --output > - /tmp/1#.txt > parallelism: 2 > upgradeMode: stateless > {code} > > > It will be error split when loading the flink-conf.yaml > !image-2022-12-28-16-30-30-645.png! > > And i enter to the jobmanager's pod to saw the flink-conf.yaml's program-args > is right > !image-2022-12-28-16-34-25-819.png! > > Maybe we should have a more strict validate for yaml comment? > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30520) Arguments contains with '#' will error split in loadYAMLResource
[ https://issues.apache.org/jira/browse/FLINK-30520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-30520: Attachment: image-2022-12-28-17-13-25-351.png > Arguments contains with '#' will error split in loadYAMLResource > > > Key: FLINK-30520 > URL: https://issues.apache.org/jira/browse/FLINK-30520 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.16.0, 1.14.6, 1.15.3 >Reporter: tanjialiang >Priority: Major > Attachments: image-2022-12-28-17-13-25-351.png, > image-2022-12-28-17-13-38-337.png > > > When i submit a flink jar job in Kubernetes Application mode which main args > contains '#', it will be error split by > org.apache.flink.configuration.GlobalConfiguration#loadYAMLResource > > such as i using flink-kubernetes-operator to submit a job in kubernetes > application mode > > {code:java} > apiVersion: flink.apache.org/v1beta1 > kind: FlinkDeployment > metadata: > name: word-count > spec: > image: apache/flink:1.16.0-scala_2.12-java8 > flinkVersion: v1_16 > flinkConfiguration: > taskmanager.numberOfTaskSlots: "1" > jobManager: > resource: > memory: "2048m" > cpu: 1 > taskManager: > resource: > memory: "2048m" > cpu: 1 > serviceAccount: flink > job: > jarURI: local:///opt/flink/examples/streaming/WordCount.jar > args: > - --output > - /tmp/1#.txt > parallelism: 2 > upgradeMode: stateless > {code} > > > It will be error split when loading the flink-conf.yaml > !image-2022-12-28-16-30-30-645.png! > > And i enter to the jobmanager's pod to saw the flink-conf.yaml's program-args > is right > !image-2022-12-28-16-34-25-819.png! > > Maybe we should have a more strict validate for yaml comment? > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30520) Arguments contains with '#' will error split in loadYAMLResource
[ https://issues.apache.org/jira/browse/FLINK-30520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-30520: Description: When i submit a flink jar job in Kubernetes Application mode which main args contains '#', it will be error split by org.apache.flink.configuration.GlobalConfiguration#loadYAMLResource such as i using flink-kubernetes-operator to submit a job in kubernetes application mode {code:java} apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: word-count spec: image: apache/flink:1.16.0-scala_2.12-java8 flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" jobManager: resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 serviceAccount: flink job: jarURI: local:///opt/flink/examples/streaming/WordCount.jar args: - --output - /tmp/1#.txt parallelism: 2 upgradeMode: stateless {code} It will be error split when loading the flink-conf.yaml !image-2022-12-28-17-13-25-351.png! And i enter to the jobmanager's pod to saw the flink-conf.yaml's program-args is right !image-2022-12-28-17-13-38-337.png! Maybe we should have a more strict validate for yaml comment? was: When i submit a flink jar job in Kubernetes Application mode which main args contains '#', it will be error split by org.apache.flink.configuration.GlobalConfiguration#loadYAMLResource such as i using flink-kubernetes-operator to submit a job in kubernetes application mode {code:java} apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: word-count spec: image: apache/flink:1.16.0-scala_2.12-java8 flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" jobManager: resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 serviceAccount: flink job: jarURI: local:///opt/flink/examples/streaming/WordCount.jar args: - --output - /tmp/1#.txt parallelism: 2 upgradeMode: stateless {code} It will be error split when loading the flink-conf.yaml !image-2022-12-28-16-30-30-645.png! And i enter to the jobmanager's pod to saw the flink-conf.yaml's program-args is right !image-2022-12-28-16-34-25-819.png! Maybe we should have a more strict validate for yaml comment? > Arguments contains with '#' will error split in loadYAMLResource > > > Key: FLINK-30520 > URL: https://issues.apache.org/jira/browse/FLINK-30520 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.16.0, 1.14.6, 1.15.3 >Reporter: tanjialiang >Priority: Major > Attachments: image-2022-12-28-17-13-25-351.png, > image-2022-12-28-17-13-38-337.png > > > When i submit a flink jar job in Kubernetes Application mode which main args > contains '#', it will be error split by > org.apache.flink.configuration.GlobalConfiguration#loadYAMLResource > > such as i using flink-kubernetes-operator to submit a job in kubernetes > application mode > > {code:java} > apiVersion: flink.apache.org/v1beta1 > kind: FlinkDeployment > metadata: > name: word-count > spec: > image: apache/flink:1.16.0-scala_2.12-java8 > flinkVersion: v1_16 > flinkConfiguration: > taskmanager.numberOfTaskSlots: "1" > jobManager: > resource: > memory: "2048m" > cpu: 1 > taskManager: > resource: > memory: "2048m" > cpu: 1 > serviceAccount: flink > job: > jarURI: local:///opt/flink/examples/streaming/WordCount.jar > args: > - --output > - /tmp/1#.txt > parallelism: 2 > upgradeMode: stateless > {code} > > > It will be error split when loading the flink-conf.yaml > !image-2022-12-28-17-13-25-351.png! > > And i enter to the jobmanager's pod to saw the flink-conf.yaml's program-args > is right > !image-2022-12-28-17-13-38-337.png! > > Maybe we should have a more strict validate for yaml comment? > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30520) Arguments contains with '#' will error split in loadYAMLResource
tanjialiang created FLINK-30520: --- Summary: Arguments contains with '#' will error split in loadYAMLResource Key: FLINK-30520 URL: https://issues.apache.org/jira/browse/FLINK-30520 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.15.3, 1.14.6, 1.16.0 Reporter: tanjialiang When i submit a flink jar job in Kubernetes Application mode which main args contains '#', it will be error split by org.apache.flink.configuration.GlobalConfiguration#loadYAMLResource such as i using flink-kubernetes-operator to submit a job in kubernetes application mode {code:java} apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: word-count spec: image: apache/flink:1.16.0-scala_2.12-java8 flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" jobManager: resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 serviceAccount: flink job: jarURI: local:///opt/flink/examples/streaming/WordCount.jar args: - --output - /tmp/1#.txt parallelism: 2 upgradeMode: stateless {code} It will be error split when loading the flink-conf.yaml !image-2022-12-28-16-30-30-645.png! And i enter to the jobmanager's pod to saw the flink-conf.yaml's program-args is right !image-2022-12-28-16-34-25-819.png! Maybe we should have a more strict validate for yaml comment? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30498) frocksdb throw an UnsatisfiedLinkError
[ https://issues.apache.org/jira/browse/FLINK-30498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17651955#comment-17651955 ] tanjialiang commented on FLINK-30498: - Hi [~Yanfei Lei] , i made a stupid mistake, it was my team's problem, so i close this issue. And yes, it was caused by thre conflict with the frocksdbjni jar, but not caused by the flink official jar, At first i think it was the flink docker's bug, because i found it in the 1.14 not in 1.15 and 1.16, but when i start the 1.14 cluster just in docker and without any extend jar, it was running success. So i think caused by extend jar in 1.14 cluster, at the end, i found it was my team's defined extend jar. > frocksdb throw an UnsatisfiedLinkError > -- > > Key: FLINK-30498 > URL: https://issues.apache.org/jira/browse/FLINK-30498 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.14.6 >Reporter: tanjialiang >Priority: Major > > Using an simple TopN SQL with table.exec.state.ttl options, statebackend use > rocksdb > {code:java} > SET table.exec.state.ttl = 1min; > CREATE TABLE `student` ( > id int, > name string, > update_time bigint > ) WITH ( > 'connector' = 'datagen', > 'fields.id.kind' = 'random', > 'fields.id.min' = '0', > 'fields.id.max' = '100' > ); > CREATE TABLE `blackhole` ( > id int, > name string, > update_time bigint > ) WITH ( > 'connector' = 'blackhole' > ); > INSERT INTO blackhole > SELECT > id, > name, > update_time > FROM ( > SELECT > id, > name, > update_time, > ROW_NUMBER() OVER (PARTITION BY id ORDER BY update_time DESC) AS > row_num > FROM student > ) AS t > WHERE t.row_num=1; {code} > It will throw an error from frocksdb > {code:java} > java.lang.UnsatisfiedLinkError: > org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder()J > at > org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder(Native > Method) ~[flink-dist_2.12-1.14.2.jar:?] > at > org.rocksdb.FlinkCompactionFilter.access$000(FlinkCompactionFilter.java:14) > ~[flink-dist_2.12-1.14.2.jar:?] > at > org.rocksdb.FlinkCompactionFilter$ConfigHolder.(FlinkCompactionFilter.java:115) > ~[flink-dist_2.12-1.14.2.jar:?] > at > org.rocksdb.FlinkCompactionFilter$FlinkCompactionFilterFactory.(FlinkCompactionFilter.java:142) > ~[flink-dist_2.12-1.14.2.jar:?] > at > org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.createAndSetCompactFilterFactory(RocksDbTtlCompactFiltersManager.java:84) > ~[flink-dist_2.12-1.14.2.jar:1.14.2] > at > org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(RocksDbTtlCompactFiltersManager.java:74) > ~[flink-dist_2.12-1.14.2.jar:1.14.2] > at > org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyDescriptor(RocksDBOperationUtils.java:157) > ~[flink-dist_2.12-1.14.2.jar:1.14.2] > at > org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createStateInfo(RocksDBOperationUtils.java:134) > ~[flink-dist_2.12-1.14.2.jar:1.14.2] > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:643) > ~[flink-dist_2.12-1.14.2.jar:1.14.2] > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837) > ~[flink-dist_2.12-1.14.2.jar:1.14.2] > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:225) > ~[flink-dist_2.12-1.14.2.jar:1.14.2] > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:148) > ~[flink-dist_2.12-1.14.2.jar:1.14.2] > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:132) > ~[flink-dist_2.12-1.14.2.jar:1.14.2] > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) > ~[flink-dist_2.12-1.14.2.jar:1.14.2] > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302) > ~[flink-dist_2.12-1.14.2.jar:1.14.2] > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353) > ~[flink-dist_2.12-1.14.2.jar:1.14.2] > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115) > ~[flink-dist_2.12-1.14.2.jar:1.14.2] > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) > ~[flink-dist_2.12-1.14.2.jar:1.14.2] > at >
[jira] [Closed] (FLINK-30498) frocksdb throw an UnsatisfiedLinkError
[ https://issues.apache.org/jira/browse/FLINK-30498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang closed FLINK-30498. --- Resolution: Not A Problem > frocksdb throw an UnsatisfiedLinkError > -- > > Key: FLINK-30498 > URL: https://issues.apache.org/jira/browse/FLINK-30498 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.14.6 >Reporter: tanjialiang >Priority: Major > > Using an simple TopN SQL with table.exec.state.ttl options, statebackend use > rocksdb > {code:java} > SET table.exec.state.ttl = 1min; > CREATE TABLE `student` ( > id int, > name string, > update_time bigint > ) WITH ( > 'connector' = 'datagen', > 'fields.id.kind' = 'random', > 'fields.id.min' = '0', > 'fields.id.max' = '100' > ); > CREATE TABLE `blackhole` ( > id int, > name string, > update_time bigint > ) WITH ( > 'connector' = 'blackhole' > ); > INSERT INTO blackhole > SELECT > id, > name, > update_time > FROM ( > SELECT > id, > name, > update_time, > ROW_NUMBER() OVER (PARTITION BY id ORDER BY update_time DESC) AS > row_num > FROM student > ) AS t > WHERE t.row_num=1; {code} > It will throw an error from frocksdb > {code:java} > java.lang.UnsatisfiedLinkError: > org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder()J > at > org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder(Native > Method) ~[flink-dist_2.12-1.14.2.jar:?] > at > org.rocksdb.FlinkCompactionFilter.access$000(FlinkCompactionFilter.java:14) > ~[flink-dist_2.12-1.14.2.jar:?] > at > org.rocksdb.FlinkCompactionFilter$ConfigHolder.(FlinkCompactionFilter.java:115) > ~[flink-dist_2.12-1.14.2.jar:?] > at > org.rocksdb.FlinkCompactionFilter$FlinkCompactionFilterFactory.(FlinkCompactionFilter.java:142) > ~[flink-dist_2.12-1.14.2.jar:?] > at > org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.createAndSetCompactFilterFactory(RocksDbTtlCompactFiltersManager.java:84) > ~[flink-dist_2.12-1.14.2.jar:1.14.2] > at > org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(RocksDbTtlCompactFiltersManager.java:74) > ~[flink-dist_2.12-1.14.2.jar:1.14.2] > at > org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyDescriptor(RocksDBOperationUtils.java:157) > ~[flink-dist_2.12-1.14.2.jar:1.14.2] > at > org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createStateInfo(RocksDBOperationUtils.java:134) > ~[flink-dist_2.12-1.14.2.jar:1.14.2] > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:643) > ~[flink-dist_2.12-1.14.2.jar:1.14.2] > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837) > ~[flink-dist_2.12-1.14.2.jar:1.14.2] > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:225) > ~[flink-dist_2.12-1.14.2.jar:1.14.2] > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:148) > ~[flink-dist_2.12-1.14.2.jar:1.14.2] > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:132) > ~[flink-dist_2.12-1.14.2.jar:1.14.2] > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) > ~[flink-dist_2.12-1.14.2.jar:1.14.2] > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302) > ~[flink-dist_2.12-1.14.2.jar:1.14.2] > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353) > ~[flink-dist_2.12-1.14.2.jar:1.14.2] > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115) > ~[flink-dist_2.12-1.14.2.jar:1.14.2] > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) > ~[flink-dist_2.12-1.14.2.jar:1.14.2] > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:203) > ~[flink-dist_2.12-1.14.2.jar:1.14.2] > at > org.apache.flink.table.runtime.operators.rank.FastTop1Function.open(FastTop1Function.java:114) > ~[flink-table_2.12-1.14.2.jar:1.14.2] > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) > ~[flink-dist_2.12-1.14.2.jar:1.14.2] > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100) >
[jira] [Updated] (FLINK-30498) frocksdb throw an UnsatisfiedLinkError
[ https://issues.apache.org/jira/browse/FLINK-30498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-30498: Description: Using an simple TopN SQL with table.exec.state.ttl options, statebackend use rocksdb {code:java} SET table.exec.state.ttl = 1min; CREATE TABLE `student` ( id int, name string, update_time bigint ) WITH ( 'connector' = 'datagen', 'fields.id.kind' = 'random', 'fields.id.min' = '0', 'fields.id.max' = '100' ); CREATE TABLE `blackhole` ( id int, name string, update_time bigint ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole SELECT id, name, update_time FROM ( SELECT id, name, update_time, ROW_NUMBER() OVER (PARTITION BY id ORDER BY update_time DESC) AS row_num FROM student ) AS t WHERE t.row_num=1; {code} It will throw an error from frocksdb {code:java} java.lang.UnsatisfiedLinkError: org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder()J at org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder(Native Method) ~[flink-dist_2.12-1.14.2.jar:?] at org.rocksdb.FlinkCompactionFilter.access$000(FlinkCompactionFilter.java:14) ~[flink-dist_2.12-1.14.2.jar:?] at org.rocksdb.FlinkCompactionFilter$ConfigHolder.(FlinkCompactionFilter.java:115) ~[flink-dist_2.12-1.14.2.jar:?] at org.rocksdb.FlinkCompactionFilter$FlinkCompactionFilterFactory.(FlinkCompactionFilter.java:142) ~[flink-dist_2.12-1.14.2.jar:?] at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.createAndSetCompactFilterFactory(RocksDbTtlCompactFiltersManager.java:84) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(RocksDbTtlCompactFiltersManager.java:74) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyDescriptor(RocksDBOperationUtils.java:157) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createStateInfo(RocksDBOperationUtils.java:134) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:643) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:225) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:148) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:132) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:203) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.table.runtime.operators.rank.FastTop1Function.open(FastTop1Function.java:114) ~[flink-table_2.12-1.14.2.jar:1.14.2] at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:55) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
[jira] [Updated] (FLINK-30498) frocksdb throw an UnsatisfiedLinkError
[ https://issues.apache.org/jira/browse/FLINK-30498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-30498: Description: Using an simple TopN SQL with table.exec.state.ttl options, statebackend use rocksdb {code:java} SET table.exec.state.ttl = 1min; CREATE TABLE `student` ( id int, name string, update_time bigint ) WITH ( 'connector' = 'datagen', 'fields.id.kind' = 'random', 'fields.id.min' = '0', 'fields.id.max' = '100' ); CREATE TABLE `blackhole` ( id int, name string, update_time bigint ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole SELECT id, name, update_time FROM ( SELECT id, name, update_time, ROW_NUMBER() OVER (PARTITION BY id ORDER BY update_time DESC) AS row_num FROM student ) AS t WHERE t.row_num=1; {code} It will throw an error from frocksdb {code:java} java.lang.UnsatisfiedLinkError: org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder()J at org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder(Native Method) ~[flink-dist_2.12-1.14.2.jar:?] at org.rocksdb.FlinkCompactionFilter.access$000(FlinkCompactionFilter.java:14) ~[flink-dist_2.12-1.14.2.jar:?] at org.rocksdb.FlinkCompactionFilter$ConfigHolder.(FlinkCompactionFilter.java:115) ~[flink-dist_2.12-1.14.2.jar:?] at org.rocksdb.FlinkCompactionFilter$FlinkCompactionFilterFactory.(FlinkCompactionFilter.java:142) ~[flink-dist_2.12-1.14.2.jar:?] at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.createAndSetCompactFilterFactory(RocksDbTtlCompactFiltersManager.java:84) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(RocksDbTtlCompactFiltersManager.java:74) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyDescriptor(RocksDBOperationUtils.java:157) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createStateInfo(RocksDBOperationUtils.java:134) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:643) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:225) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:148) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:132) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:203) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.table.runtime.operators.rank.FastTop1Function.open(FastTop1Function.java:114) ~[flink-table_2.12-1.14.2.jar:1.14.2] at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:55) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
[jira] [Created] (FLINK-30498) frocksdb throw an UnsatisfiedLinkError
tanjialiang created FLINK-30498: --- Summary: frocksdb throw an UnsatisfiedLinkError Key: FLINK-30498 URL: https://issues.apache.org/jira/browse/FLINK-30498 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.14.6 Reporter: tanjialiang Using an simple TopN SQL with table.exec.state.ttl options, statebackend use rocksdb {code:java} SET table.exec.state.ttl = 1min; CREATE TABLE `student` ( id int, name string, update_time bigint ) WITH ( 'connector' = 'datagen', 'fields.id.kind' = 'random', 'fields.id.min' = '0', 'fields.id.max' = '100' ); CREATE TABLE `blackhole` ( id int, name string, update_time bigint ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole SELECT id, name, update_time FROM ( SELECT id, name, update_time, ROW_NUMBER() OVER (PARTITION BY id ORDER BY update_time DESC) AS row_num FROM student ) AS t WHERE t.row_num=1; {code} It will throw an error from frocksdb {code:java} java.lang.UnsatisfiedLinkError: org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder()J at org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder(Native Method) ~[flink-dist_2.12-1.14.2.jar:?] at org.rocksdb.FlinkCompactionFilter.access$000(FlinkCompactionFilter.java:14) ~[flink-dist_2.12-1.14.2.jar:?] at org.rocksdb.FlinkCompactionFilter$ConfigHolder.(FlinkCompactionFilter.java:115) ~[flink-dist_2.12-1.14.2.jar:?] at org.rocksdb.FlinkCompactionFilter$FlinkCompactionFilterFactory.(FlinkCompactionFilter.java:142) ~[flink-dist_2.12-1.14.2.jar:?] at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.createAndSetCompactFilterFactory(RocksDbTtlCompactFiltersManager.java:84) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(RocksDbTtlCompactFiltersManager.java:74) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyDescriptor(RocksDBOperationUtils.java:157) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createStateInfo(RocksDBOperationUtils.java:134) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:643) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:225) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:148) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:132) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:203) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.table.runtime.operators.rank.FastTop1Function.open(FastTop1Function.java:114) ~[flink-table_2.12-1.14.2.jar:1.14.2] at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:55) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at
[jira] [Updated] (FLINK-30460) Support writing HBase's CEIL TTL metadata
[ https://issues.apache.org/jira/browse/FLINK-30460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-30460: Description: When i using flink sql to sink hbase, i found i can't set the ceil ttl. Can we support writing the ceil ttl metadata like this? {code:java} CREATE TABLE hTable ( rowkey INT, family1 ROW, ttl BIGINT METADATA FROM 'ttl', PRIMARY KEY (rowkey) NOT ENFORCED ) WITH ( 'connector' = 'hbase-2.2', 'table-name' = 'mytable', 'zookeeper.quorum' = 'localhost:2181' );{code} was: When i using flink sql to sink hbase, i found i can't set the ceil ttl. Can we support writing the ceil ttl metadata like this? {code:java} CREATE TABLE hTable ( rowkey INT, family1 ROW, ttl BIGINT METADATA FROM 'ttl', PRIMARY KEY (rowkey) NOT ENFORCED ) WITH ( 'connector' = 'hbase-2.2', 'table-name' = 'mytable', 'zookeeper.quorum' = 'localhost:2181' );{code} > Support writing HBase's CEIL TTL metadata > - > > Key: FLINK-30460 > URL: https://issues.apache.org/jira/browse/FLINK-30460 > Project: Flink > Issue Type: Improvement > Components: Connectors / HBase >Reporter: tanjialiang >Priority: Major > > When i using flink sql to sink hbase, i found i can't set the ceil ttl. Can > we support writing the ceil ttl metadata like this? > {code:java} > CREATE TABLE hTable ( > rowkey INT, > family1 ROW, > ttl BIGINT METADATA FROM 'ttl', > PRIMARY KEY (rowkey) NOT ENFORCED > ) WITH ( > 'connector' = 'hbase-2.2', > 'table-name' = 'mytable', > 'zookeeper.quorum' = 'localhost:2181' > );{code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30460) Support writing HBase's CEIL TTL metadata
tanjialiang created FLINK-30460: --- Summary: Support writing HBase's CEIL TTL metadata Key: FLINK-30460 URL: https://issues.apache.org/jira/browse/FLINK-30460 Project: Flink Issue Type: Improvement Components: Connectors / HBase Reporter: tanjialiang When i using flink sql to sink hbase, i found i can't set the ceil ttl. Can we support writing the ceil ttl metadata like this? {code:java} CREATE TABLE hTable ( rowkey INT, family1 ROW, ttl BIGINT METADATA FROM 'ttl', PRIMARY KEY (rowkey) NOT ENFORCED ) WITH ( 'connector' = 'hbase-2.2', 'table-name' = 'mytable', 'zookeeper.quorum' = 'localhost:2181' );{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30411) Flink deployment stuck in UPGRADING state when deploy flinkdeployment without resource
tanjialiang created FLINK-30411: --- Summary: Flink deployment stuck in UPGRADING state when deploy flinkdeployment without resource Key: FLINK-30411 URL: https://issues.apache.org/jira/browse/FLINK-30411 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: 1.16.0 Reporter: tanjialiang Attachments: image-2022-12-14-17-22-12-656.png In flink kubernetes operator 1.2.0. When i deploy a flinkdeployments without resource, the flink deployment stuck in UPGRADING state. {code:java} apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: socket-window-word-count spec: image: flink:1.16.0-scala_2.12-java8 flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" serviceAccount: flink job: jarURI: local:///opt/flink/examples/streaming/WordCount.jar parallelism: 2 upgradeMode: stateless{code} when i kubectl describe flinkdeployments, i found this error message !image-2022-12-14-17-22-12-656.png! maybe we can validate it when apply flinkdeployment? When it is invalid, throw an error rather than apply flinkdeployment succeed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30222) Suspended a job in last-state mode bug
tanjialiang created FLINK-30222: --- Summary: Suspended a job in last-state mode bug Key: FLINK-30222 URL: https://issues.apache.org/jira/browse/FLINK-30222 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.2.0, 1.16.0 Reporter: tanjialiang Attachments: image-2022-11-27-16-48-08-445.png In flink 1.16.0, it support set kubernetes HA with options 'kubernetes', such as 'high-availability: kubernetes'. But in kubernetes operator 1.2.0, I try to suspended a job in last-state mode, it validate fail, because of 'Job could not be upgraded with last-state while Kubernetes HA disabled'. I try to use kubectl patch to supsended a job with last-state {code:sh} kubectl -nbigdata-flink patch flinkdeployments.flink.apache.org/streaming-638223bf650ac869689faa62-flink --type=merge -p '{"spec": {"job": {"state": "suspended", "upgradeMode": "last-state"}{code} it found an error, because my kubernetes HA is disabled {code:java} Error from server: admission webhook "flinkoperator.flink.apache.org" denied the request: Job could not be upgraded with last-state while Kubernetes HA disabled {code} but i enabled kubernetes HA with this follow options: {code:yaml} kubernetes.cluster-id: high-availability: kubernetes high-availability.storageDir: hdfs:///flink/recovery {code} and i found flink kubernetes operator 1.2.0 validate the kubernetes HA in the old options: {code:yaml} high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory {code} it may be in the org.apache.flink.kubernetes.operator.utils.FlinkUtils#isKubernetesHAActivated to judge. !image-2022-11-27-16-48-08-445.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28728) Support to set the end offset for streaming connector
[ https://issues.apache.org/jira/browse/FLINK-28728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17574091#comment-17574091 ] tanjialiang commented on FLINK-28728: - [~martijnvisser] Such as kafka, when kafka's data is not frequently, i would like to consume it's data by scheduing rather than a long-term running job, because a long-term running job will wasting the cluster resource. I think this kind of job running in batch is ok, but i don't known if the job running in batch, can it do savepoint when the job batch finished? So that i can restore the job from savepoint in the next schedule. > Support to set the end offset for streaming connector > - > > Key: FLINK-28728 > URL: https://issues.apache.org/jira/browse/FLINK-28728 > Project: Flink > Issue Type: Improvement >Reporter: tanjialiang >Priority: Major > > Like MQ or CDC connector, it support set the startup mode for reading, but > not support set the end offset/position. When the MQ's or CDC's data is not > frequently, consume their data in period can cut lots of kubernetes/YARN > resource, such as startup from earliest, and end to the current offset, save > current offset in savepoint, and using the savepoint to startup in the next > period time, in a loop. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28728) Support to set the end offset for streaming connector
[ https://issues.apache.org/jira/browse/FLINK-28728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-28728: Description: Like MQ or CDC connector, it support set the startup mode for reading, but not support set the end offset/position. When the MQ's or CDC's data is not frequently, consume their data in period can cut lots of kubernetes/YARN resource, such as startup from earliest, and end to the current offset, save current offset in savepoint, and using the savepoint to startup in the next period time, in a loop. (was: Like MQ or CDC connector, it support set the startup mode for reading, but not support set the end offset/position. When the MQ's or CDC's data is not frequently, consume their data in period can cut lots of kubernetes/YARN resource, such as startup from earliest, and end to the current offset, save current offset in savepoint, and using the savepoint to startup in the next period time, in a loop.) > Support to set the end offset for streaming connector > - > > Key: FLINK-28728 > URL: https://issues.apache.org/jira/browse/FLINK-28728 > Project: Flink > Issue Type: Improvement >Reporter: tanjialiang >Priority: Major > > Like MQ or CDC connector, it support set the startup mode for reading, but > not support set the end offset/position. When the MQ's or CDC's data is not > frequently, consume their data in period can cut lots of kubernetes/YARN > resource, such as startup from earliest, and end to the current offset, save > current offset in savepoint, and using the savepoint to startup in the next > period time, in a loop. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28728) Support to set the end offset for streaming connector
tanjialiang created FLINK-28728: --- Summary: Support to set the end offset for streaming connector Key: FLINK-28728 URL: https://issues.apache.org/jira/browse/FLINK-28728 Project: Flink Issue Type: Improvement Reporter: tanjialiang Like MQ or CDC connector, it support set the startup mode for reading, but not support set the end offset/position. When the MQ's or CDC's data is not frequently, consume their data in period can cut lots of kubernetes/YARN resource, such as startup from earliest, and end to the current offset, save current offset in savepoint, and using the savepoint to startup in the next period time, in a loop. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28303) Kafka SQL Connector loses data when restoring from a savepoint with a topic with empty partitions
[ https://issues.apache.org/jira/browse/FLINK-28303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17560965#comment-17560965 ] tanjialiang commented on FLINK-28303: - I found this bug in flink-1.14.2 too. When some partition is empty, this bug will be appear. [~tzulitai] And I'm sorry to tell you the "auto.offset.reset" is invalid in 1.14, because of this bug:https://issues.apache.org/jira/browse/FLINK-24697 > Kafka SQL Connector loses data when restoring from a savepoint with a topic > with empty partitions > - > > Key: FLINK-28303 > URL: https://issues.apache.org/jira/browse/FLINK-28303 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.4 >Reporter: Robert Metzger >Priority: Major > > Steps to reproduce: > - Set up a Kafka topic with 10 partitions > - produce records 0-9 into the topic > - take a savepoint and stop the job > - produce records 10-19 into the topic > - restore the job from the savepoint. > The job will be missing usually 2-4 records from 10-19. > My assumption is that if a partition never had data (which is likely with 10 > partitions and 10 records), the savepoint will only contain offsets for > partitions with data. > While the job was offline (and we've written record 10-19 into the topic), > all partitions got filled. Now, when Kafka comes online again, it will use > the "latest" offset for those partitions, skipping some data. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-27574) [QUESTION] In Flink k8s Application mode with HA can not using History Server for history backend
tanjialiang created FLINK-27574: --- Summary: [QUESTION] In Flink k8s Application mode with HA can not using History Server for history backend Key: FLINK-27574 URL: https://issues.apache.org/jira/browse/FLINK-27574 Project: Flink Issue Type: Technical Debt Reporter: tanjialiang In Flink k8s application mode with high-availability, it's job id always 00, but in history server, it make job's id for the key. Can I modify the job id in HA? -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-27110) using TIMESTAMP_LTZ cause Flink web ui watermark display problem
[ https://issues.apache.org/jira/browse/FLINK-27110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-27110: Attachment: image-2022-04-07-17-03-38-007.png > using TIMESTAMP_LTZ cause Flink web ui watermark display problem > > > Key: FLINK-27110 > URL: https://issues.apache.org/jira/browse/FLINK-27110 > Project: Flink > Issue Type: Bug > Components: Runtime / Web Frontend >Affects Versions: 1.13.1, 1.14.2 >Reporter: tanjialiang >Priority: Not a Priority > Attachments: image-2022-04-07-17-00-52-388.png, > image-2022-04-07-17-03-38-007.png > > > when a Flink SQL using TIMESTAMP_LTZ or TIMESTAMP, it will convert their > timezone when compute. But in flink web ui, the watermark display without > convert. > > !image-2022-04-07-17-00-52-388.png! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-27110) using TIMESTAMP_LTZ cause Flink web ui watermark display problem
[ https://issues.apache.org/jira/browse/FLINK-27110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-27110: Description: when a Flink SQL using TIMESTAMP_LTZ or TIMESTAMP, it will convert their timezone when compute. But in flink web ui, the watermark display without convert. !image-2022-04-07-17-03-38-007.png! was: when a Flink SQL using TIMESTAMP_LTZ or TIMESTAMP, it will convert their timezone when compute. But in flink web ui, the watermark display without convert. !image-2022-04-07-17-00-52-388.png! > using TIMESTAMP_LTZ cause Flink web ui watermark display problem > > > Key: FLINK-27110 > URL: https://issues.apache.org/jira/browse/FLINK-27110 > Project: Flink > Issue Type: Bug > Components: Runtime / Web Frontend >Affects Versions: 1.13.1, 1.14.2 >Reporter: tanjialiang >Priority: Not a Priority > Attachments: image-2022-04-07-17-00-52-388.png, > image-2022-04-07-17-03-38-007.png > > > when a Flink SQL using TIMESTAMP_LTZ or TIMESTAMP, it will convert their > timezone when compute. But in flink web ui, the watermark display without > convert. > > !image-2022-04-07-17-03-38-007.png! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-27110) using TIMESTAMP_LTZ cause Flink web ui watermark display problem
tanjialiang created FLINK-27110: --- Summary: using TIMESTAMP_LTZ cause Flink web ui watermark display problem Key: FLINK-27110 URL: https://issues.apache.org/jira/browse/FLINK-27110 Project: Flink Issue Type: Bug Components: Runtime / Web Frontend Affects Versions: 1.14.2, 1.13.1 Reporter: tanjialiang Attachments: image-2022-04-07-17-00-52-388.png when a Flink SQL using TIMESTAMP_LTZ or TIMESTAMP, it will convert their timezone when compute. But in flink web ui, the watermark display without convert. !image-2022-04-07-17-00-52-388.png! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25618) Data quality by apache flink
[ https://issues.apache.org/jira/browse/FLINK-25618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17474436#comment-17474436 ] tanjialiang commented on FLINK-25618: - [~MartijnVisser] Thanks for your reply. I had started a thread in [https://lists.apache.org/thread/lytryplwttln192brn91q32f2trw8k5v|https://lists.apache.org/thread/lytryplwttln192brn91q32f2trw8k5v,] > Data quality by apache flink > > > Key: FLINK-25618 > URL: https://issues.apache.org/jira/browse/FLINK-25618 > Project: Flink > Issue Type: New Feature >Reporter: tanjialiang >Priority: Not a Priority > > This is discussing about how to support data quality through apache flink. > For example, I has a sql job, a table in this job has a column named phone, > and the data of the column phone must match the pattern of telephone, if not > match, i can choose drop it or ignored, and we can mark it in the metrics, so > that user can monitor the data of quality in source and sink. > After this, user can kown about the data quality from the source and sink, it > is useful for the downstream. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (FLINK-25618) Data quality by apache flink
[ https://issues.apache.org/jira/browse/FLINK-25618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17474436#comment-17474436 ] tanjialiang edited comment on FLINK-25618 at 1/12/22, 10:50 AM: [~MartijnVisser] Thanks for your reply. I had started a thread in [https://lists.apache.org/thread/lytryplwttln192brn91q32f2trw8k5v|https://lists.apache.org/thread/lytryplwttln192brn91q32f2trw8k5v,] was (Author: JIRAUSER279823): [~MartijnVisser] Thanks for your reply. I had started a thread in [https://lists.apache.org/thread/lytryplwttln192brn91q32f2trw8k5v|https://lists.apache.org/thread/lytryplwttln192brn91q32f2trw8k5v,] > Data quality by apache flink > > > Key: FLINK-25618 > URL: https://issues.apache.org/jira/browse/FLINK-25618 > Project: Flink > Issue Type: New Feature >Reporter: tanjialiang >Priority: Not a Priority > > This is discussing about how to support data quality through apache flink. > For example, I has a sql job, a table in this job has a column named phone, > and the data of the column phone must match the pattern of telephone, if not > match, i can choose drop it or ignored, and we can mark it in the metrics, so > that user can monitor the data of quality in source and sink. > After this, user can kown about the data quality from the source and sink, it > is useful for the downstream. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25618) Data quality by apache flink
[ https://issues.apache.org/jira/browse/FLINK-25618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-25618: Description: This is discussing about how to support data quality through apache flink. For example, I has a sql job, a table in this job has a column named phone, and the data of the column phone must match the pattern of telephone, if not match, i can choose drop it or ignored, and we can mark it in the metrics, so that user can monitor the data of quality in source and sink. After this, user can kown about the data quality from the source and sink, it is useful for the downstream. was: This is discussing about how to support data quality through apache flink. For example, I has a sql job, a table in this job has a column named phone, and the data of the column phone must match the pattern of telephone, if not match, i can choose drop it or ignored, and we can mark it in the metrics, so that user can monitor the data of quality in source and sink. After this, user can kown about the data quality of the source and sink, it is useful for the downstream. > Data quality by apache flink > > > Key: FLINK-25618 > URL: https://issues.apache.org/jira/browse/FLINK-25618 > Project: Flink > Issue Type: New Feature >Reporter: tanjialiang >Priority: Not a Priority > > This is discussing about how to support data quality through apache flink. > For example, I has a sql job, a table in this job has a column named phone, > and the data of the column phone must match the pattern of telephone, if not > match, i can choose drop it or ignored, and we can mark it in the metrics, so > that user can monitor the data of quality in source and sink. > After this, user can kown about the data quality from the source and sink, it > is useful for the downstream. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25618) Data quality by apache flink
[ https://issues.apache.org/jira/browse/FLINK-25618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanjialiang updated FLINK-25618: Description: This is discussing about how to support data quality through apache flink. For example, I has a sql job, a table in this job has a column named phone, and the data of the column phone must match the pattern of telephone, if not match, i can choose drop it or ignored, and we can mark it in the metrics, so that user can monitor the data of quality in source and sink. After this, user can kown about the data quality of the source and sink, it is useful for the downstream. was: This is discussing about how to support data quality through apache flink. For example, I has a sql job, a table in this job has a column name phone, and the data of the column phone must match the pattern of telephone, if not match, i can choose drop it or ignored, and we can mark it in the metrics, so that user can monitor the data of quality in source and sink. > Data quality by apache flink > > > Key: FLINK-25618 > URL: https://issues.apache.org/jira/browse/FLINK-25618 > Project: Flink > Issue Type: New Feature >Reporter: tanjialiang >Priority: Not a Priority > > This is discussing about how to support data quality through apache flink. > For example, I has a sql job, a table in this job has a column named phone, > and the data of the column phone must match the pattern of telephone, if not > match, i can choose drop it or ignored, and we can mark it in the metrics, so > that user can monitor the data of quality in source and sink. > After this, user can kown about the data quality of the source and sink, it > is useful for the downstream. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25618) Data quality by apache flink
tanjialiang created FLINK-25618: --- Summary: Data quality by apache flink Key: FLINK-25618 URL: https://issues.apache.org/jira/browse/FLINK-25618 Project: Flink Issue Type: New Feature Reporter: tanjialiang This is discussing about how to support data quality through apache flink. For example, I has a sql job, a table in this job has a column name phone, and the data of the column phone must match the pattern of telephone, if not match, i can choose drop it or ignored, and we can mark it in the metrics, so that user can monitor the data of quality in source and sink. -- This message was sent by Atlassian Jira (v8.20.1#820001)