[jira] [Created] (FLINK-32819) flink can not parse the param `#` correctly in k8s application mode
Jun Zhang created FLINK-32819: - Summary: flink can not parse the param `#` correctly in k8s application mode Key: FLINK-32819 URL: https://issues.apache.org/jira/browse/FLINK-32819 Project: Flink Issue Type: Bug Components: Runtime / Configuration Affects Versions: 1.17.1 Reporter: Jun Zhang Fix For: 1.18.0 when I submit a flink job in k8s application mode, and has a param contains `#` ,for example mysql password , the flink can not parse the param correctly. the content after the `#` will lost. {code:java} /mnt/flink/flink-1.17.0/bin/flink run-application \ -Dexecution.target=kubernetes-application \ -Dkubernetes.container.image=x \ local:///opt/flink/usrlib/my.jar \ --mysql-conf hostname=localhost \ --mysql-conf username=root \ --mysql-conf password=%&^GGJI#$jh665$fi^% \ --mysql-conf port=3306 {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31459) add UPDATE COLUMN POSITION for flink table store
Jun Zhang created FLINK-31459: - Summary: add UPDATE COLUMN POSITION for flink table store Key: FLINK-31459 URL: https://issues.apache.org/jira/browse/FLINK-31459 Project: Flink Issue Type: Improvement Components: Table Store Affects Versions: table-store-0.3.1 Reporter: Jun Zhang Fix For: table-store-0.4.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31338) support infer parallelism for flink table store
Jun Zhang created FLINK-31338: - Summary: support infer parallelism for flink table store Key: FLINK-31338 URL: https://issues.apache.org/jira/browse/FLINK-31338 Project: Flink Issue Type: Improvement Components: Table Store Affects Versions: table-store-0.3.0 Reporter: Jun Zhang Fix For: table-store-0.4.0 When using flink to query the fts table, we can config the scan parallelism by set the scan.parallelism, but the user may do not know how much parallelism should be used, setting a too large parallelism will cause resource waste, setting the parallelism too small will cause the query to be slow, so we can add parallelism infer. The function is enabled by default. the parallelism is equal to the number of read splits. Of course, the user can manually turn off the infer function. In order to prevent too many datafiles from causing excessive parallelism, we also set a max infer parallelism. When the infer parallelism exceeds the setting, use the max parallelism. In addition, we also need to compare with the limit in the select query statement to get a more appropriate parallelism in the case of limit pushdown, for example we have a sql select * from table limit 1, and finally we infer the parallelism is 10, but we only one parallel is needed , besause we only need one data . -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31258) can not get kerberos keytab in flink operator
Jun Zhang created FLINK-31258: - Summary: can not get kerberos keytab in flink operator Key: FLINK-31258 URL: https://issues.apache.org/jira/browse/FLINK-31258 Project: Flink Issue Type: Bug Components: Kubernetes Operator Reporter: Jun Zhang env: flink k8s operator 1.4 flink 1.14.6 : the conf {code:java} flinkConfiguration: security.kerberos.login.keytab=/path/your/user.keytab security.kerberos.login.principal=y...@hadoop.com {code} and I get an exception: {code:java} Status: Cluster Info: Error: {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.client.deployment.ClusterDeploymentException: Could not create Kubernetes cluster \"basic-example\".","throwableList":[{"type":"org.apache.flink.client.deployment.ClusterDeploymentException","message":"Could not create Kubernetes cluster \"basic-example\"."},{"type":"org.apache.flink.configuration.IllegalConfigurationException","message":"Kerberos login configuration is invalid: keytab [/path/your/user.keytab] doesn't exist!"}]} {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31224) Add metrics for flink table store
Jun Zhang created FLINK-31224: - Summary: Add metrics for flink table store Key: FLINK-31224 URL: https://issues.apache.org/jira/browse/FLINK-31224 Project: Flink Issue Type: Improvement Components: Table Store Affects Versions: table-store-0.3.1 Reporter: Jun Zhang Fix For: table-store-0.4.0 Add metrics for flink table store -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31128) Add Create Table As for flink table store
Jun Zhang created FLINK-31128: - Summary: Add Create Table As for flink table store Key: FLINK-31128 URL: https://issues.apache.org/jira/browse/FLINK-31128 Project: Flink Issue Type: Improvement Components: Table Store Affects Versions: table-store-0.3.0 Reporter: Jun Zhang Fix For: table-store-0.4.0 Add Create Table As for flink table store -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31028) Provide different compression methods for per level
Jun Zhang created FLINK-31028: - Summary: Provide different compression methods for per level Key: FLINK-31028 URL: https://issues.apache.org/jira/browse/FLINK-31028 Project: Flink Issue Type: New Feature Components: Table Store Affects Versions: table-store-0.3.0 Reporter: Jun Zhang Fix For: table-store-0.4.0 Different compression are provided for different levels. For level 0 ,because the amount of data in this level is not large, we do not want to use compression in exchange for better write performance . For normal levels, we use lz4 . For the last level, access is generally less and data volume is large. we hope to use gzip to reduce space size. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31011) upgrade hiverunner version
Jun Zhang created FLINK-31011: - Summary: upgrade hiverunner version Key: FLINK-31011 URL: https://issues.apache.org/jira/browse/FLINK-31011 Project: Flink Issue Type: Improvement Components: Table Store Affects Versions: table-store-0.3.1 Reporter: Jun Zhang Fix For: table-store-0.4.0 The current HiveRunner Test framework (version 4) has some bugs. for example, when we rename a table, it does not rename the location, I test that, the last version (6) is fine, so we should upgrade the HiveRunner to last version -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30871) add bloom filter for orc
Jun Zhang created FLINK-30871: - Summary: add bloom filter for orc Key: FLINK-30871 URL: https://issues.apache.org/jira/browse/FLINK-30871 Project: Flink Issue Type: Improvement Components: Table Store Reporter: Jun Zhang Fix For: table-store-0.4.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30608) support rename table
Jun Zhang created FLINK-30608: - Summary: support rename table Key: FLINK-30608 URL: https://issues.apache.org/jira/browse/FLINK-30608 Project: Flink Issue Type: Improvement Reporter: Jun Zhang Fix For: table-store-0.4.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30595) support create table like
Jun Zhang created FLINK-30595: - Summary: support create table like Key: FLINK-30595 URL: https://issues.apache.org/jira/browse/FLINK-30595 Project: Flink Issue Type: Improvement Reporter: Jun Zhang Fix For: table-store-0.4.0 support CREATE TABLE LIKE -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-23955) submit flink sql job error when flink HA on yarn is configured
Jun Zhang created FLINK-23955: - Summary: submit flink sql job error when flink HA on yarn is configured Key: FLINK-23955 URL: https://issues.apache.org/jira/browse/FLINK-23955 Project: Flink Issue Type: Bug Components: Table SQL / Client Affects Versions: 1.13.0 Reporter: Jun Zhang Fix For: 1.14.0 1.when I configured the flink HA ,like this {code:java} high-availability: zookeeper high-availability.storageDir: hdfs://xxx/flink/ha/ high-availability.zookeeper.quorum: x:2181 high-availability.zookeeper.path.root: /flink {code} 2.I start a flink session cluster 3.I submit a flink sql job and set the {code:java} set execution.target = yarn-per-job; {code} I get the error {code:java} 2021-08-25 10:40:39,500 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface master3:38052 of application 'application_1629858010528_0002'. 2021-08-25 10:40:42,447 WARN org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - An exception occurred when fetching query results java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., ] at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_291] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) ~[?:1.8.0_291] at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sendRequest(CollectResultFetcher.java:163) ~[flink-dist_2.12-1.13.0.jar:1.13.0] at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:128) ~[flink-dist_2.12-1.13.0.jar:1.13.0] at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) ~[flink-dist_2.12-1.13.0.jar:1.13.0] at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) ~[flink-dist_2.12-1.13.0.jar:1.13.0] at org.apache.iceberg.relocated.com.google.common.collect.Iterators.addAll(Iterators.java:355) ~[iceberg-flink-runtime-77ea34e.dirty.jar:?] at org.apache.iceberg.relocated.com.google.common.collect.Lists.newArrayList(Lists.java:143) ~[iceberg-flink-runtime-77ea34e.dirty.jar:?] at org.apache.iceberg.flink.source.RowDataRewriter.rewriteDataForTasks(RowDataRewriter.java:91) ~[iceberg-flink-runtime-77ea34e.dirty.jar:?] at org.apache.iceberg.flink.actions.RewriteDataFilesAction.rewriteDataForTasks(RewriteDataFilesAction.java:56) ~[iceberg-flink-runtime-77ea34e.dirty.jar:?] at org.apache.iceberg.actions.BaseRewriteDataFilesAction.execute(BaseRewriteDataFilesAction.java:240) ~[iceberg-flink-runtime-77ea34e.dirty.jar:?] at flink.test.RewriteTable.main(RewriteTable.java:52) ~[iceberg-projects-1.0-SNAPSHOT.jar:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_291] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_291] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_291] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_291] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-clients_2.12-1.13.0.jar:1.13.0] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-clients_2.12-1.13.0.jar:1.13.0] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-clients_2.12-1.13.0.jar:1.13.0] at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) ~[flink-clients_2.12-1.13.0.jar:1.13.0] at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) ~[flink-clients_2.12-1.13.0.jar:1.13.0] at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) ~[flink-clients_2.12-1.13.0.jar:1.13.0] at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) ~[flink-clients_2.12-1.13.0.jar:1.13.0] at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_291] at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_291] at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) [flink-shaded-hadoop2-uber-2.8.3-1.8.3.jar:2.8.3-1.8.3] at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) [flink-dist_2.12-1.13.0.jar:1.13.0] at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) [flink-clients_2.12-1.13.0.jar:1.13.0] Caused by:
[jira] [Created] (FLINK-22975) Specify port or range for k8s service
Jun Zhang created FLINK-22975: - Summary: Specify port or range for k8s service Key: FLINK-22975 URL: https://issues.apache.org/jira/browse/FLINK-22975 Project: Flink Issue Type: Improvement Components: Deployment / Kubernetes Affects Versions: 1.13.1 Reporter: Jun Zhang Fix For: 1.14.0 When we deploy the flink program in k8s, the service port is randomly generated. This random port may not be accessible due to the company's network policy, so I think we should be able to let users specify the port or port range that is exposed to the outside, similar to '- -service-node-port-range' parameter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22591) add sub interface MapColumnVector for ColumnVector
Jun Zhang created FLINK-22591: - Summary: add sub interface MapColumnVector for ColumnVector Key: FLINK-22591 URL: https://issues.apache.org/jira/browse/FLINK-22591 Project: Flink Issue Type: Improvement Components: Runtime / Task Affects Versions: 1.13.0 Reporter: Jun Zhang Fix For: 1.14.0 For complex types, the org.apache.flink.table.data.vector.ColumnVector interface has two sub-interfaces: RowColumnVector and ArrayColumnVector, I think we should add another sub-interface : MapColumnVector -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21286) Support BUCKET for flink sql CREATE TABLE
Jun Zhang created FLINK-21286: - Summary: Support BUCKET for flink sql CREATE TABLE Key: FLINK-21286 URL: https://issues.apache.org/jira/browse/FLINK-21286 Project: Flink Issue Type: Improvement Components: Table SQL / API Affects Versions: 1.12.1 Reporter: Jun Zhang Fix For: 1.13.0 Support BUCKET for flink CREATE TABLE : refer to hive syntax {code:java} [CLUSTERED BY (col_name, col_name, ...) [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS] {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21285) Support MERGE INTO for flink sql
Jun Zhang created FLINK-21285: - Summary: Support MERGE INTO for flink sql Key: FLINK-21285 URL: https://issues.apache.org/jira/browse/FLINK-21285 Project: Flink Issue Type: Improvement Components: Table SQL / API Affects Versions: 1.12.1 Reporter: Jun Zhang Fix For: 1.13.0 Support MERGE INTO for flink sql,refer to hive syntax: {code:java} MERGE INTO AS T USING AS S ON WHEN MATCHED [AND ] THEN UPDATE SET WHEN MATCHED [AND ] THEN DELETE WHEN NOT MATCHED [AND ] THEN INSERT VALUES {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21283) Support sql extension for flink sql
Jun Zhang created FLINK-21283: - Summary: Support sql extension for flink sql Key: FLINK-21283 URL: https://issues.apache.org/jira/browse/FLINK-21283 Project: Flink Issue Type: Improvement Components: Table SQL / API Affects Versions: 1.12.1 Reporter: Jun Zhang Fix For: 1.13.0 I think we should add sql extension for flink sql so that users can customize sql parsing, sql optimization, etc. we can refer to [spark sql extension |https://issues.apache.org/jira/browse/SPARK-18127] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21282) Support UPDATE for flink sql
Jun Zhang created FLINK-21282: - Summary: Support UPDATE for flink sql Key: FLINK-21282 URL: https://issues.apache.org/jira/browse/FLINK-21282 Project: Flink Issue Type: Improvement Components: Table SQL / Client Affects Versions: 1.12.1 Reporter: Jun Zhang Fix For: 1.13.0 Support UPDATE for flink sql,the syntax like this: {code:java} UPDATE tablename SET column = value [, column = value ...] [WHERE expression] {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21281) Support DELETE FROM for flink sql
Jun Zhang created FLINK-21281: - Summary: Support DELETE FROM for flink sql Key: FLINK-21281 URL: https://issues.apache.org/jira/browse/FLINK-21281 Project: Flink Issue Type: Improvement Components: Table SQL / Client Affects Versions: 1.12.1 Reporter: Jun Zhang Fix For: 1.13.0 Support DELETE FROM for flink sql,the syntax like this: {code:java} DELETE FROM tablename [WHERE expression] {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21198) Add show create table command support for flink sql client
Jun Zhang created FLINK-21198: - Summary: Add show create table command support for flink sql client Key: FLINK-21198 URL: https://issues.apache.org/jira/browse/FLINK-21198 Project: Flink Issue Type: Improvement Components: Table SQL / Client Affects Versions: 1.12.0 Environment: I think we should add SHOW CREATE TABLE command for flink sql client,so that we can view the table schema. Reporter: Jun Zhang Fix For: 1.13.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21197) DESC command in sql-client should show the column comment
Jun Zhang created FLINK-21197: - Summary: DESC command in sql-client should show the column comment Key: FLINK-21197 URL: https://issues.apache.org/jira/browse/FLINK-21197 Project: Flink Issue Type: Improvement Components: Table SQL / Client Affects Versions: 1.12.0 Reporter: Jun Zhang Fix For: 1.13.0 like hive,I think the DESC/DESCRIBE command in sql client should show the column comment. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20977) can not use `use` command to switch database
Jun Zhang created FLINK-20977: - Summary: can not use `use` command to switch database Key: FLINK-20977 URL: https://issues.apache.org/jira/browse/FLINK-20977 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.12.0 Reporter: Jun Zhang Fix For: 1.13.0 I have a database which name is mod, when I use `use mod` to switch to the db,the system throw an exception, I surround it with backticks ,it is still not well -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20809) the limit push down invalid when use filter
Jun Zhang created FLINK-20809: - Summary: the limit push down invalid when use filter Key: FLINK-20809 URL: https://issues.apache.org/jira/browse/FLINK-20809 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.12.0 Reporter: Jun Zhang Fix For: 1.13.0 when I use flink sql to query hive table , like this {code:java} // select * from hive_table where id = 1 limit 1 {code} when the sql contain query conditions in where clause, I found that the limit push down is invalid. I look up the comment on source code , I think it is should be push down , is it a bug ? [the comment |https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.java#L64] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20767) add nested field support for SupportsFilterPushDown
Jun Zhang created FLINK-20767: - Summary: add nested field support for SupportsFilterPushDown Key: FLINK-20767 URL: https://issues.apache.org/jira/browse/FLINK-20767 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.12.0 Reporter: Jun Zhang Fix For: 1.13.0 I think we should add the nested field support for SupportsFilterPushDown -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19896) Support first-n-rows deduplication in the Deduplicate operator
Jun Zhang created FLINK-19896: - Summary: Support first-n-rows deduplication in the Deduplicate operator Key: FLINK-19896 URL: https://issues.apache.org/jira/browse/FLINK-19896 Project: Flink Issue Type: Improvement Components: Table SQL / Runtime Affects Versions: 1.12.0, 1.11.3 Reporter: Jun Zhang Fix For: 1.11.2 Currently Deduplicate operator only supports first-row deduplication (ordered by proc-time). In scenario of first-n-rows deduplication, the planner has to resort to Rank operator. However, Rank operator is less efficient than Deduplicate in terms of state consumption. This issue proposes to extend DeduplicateKeepFirstRowFunction to support first-n-rows deduplication. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19358) when submit job on application mode with HA,the jobid will be 0000000000
Jun Zhang created FLINK-19358: - Summary: when submit job on application mode with HA,the jobid will be 00 Key: FLINK-19358 URL: https://issues.apache.org/jira/browse/FLINK-19358 Project: Flink Issue Type: Bug Components: Deployment / YARN Affects Versions: 1.11.0 Reporter: Jun Zhang Fix For: 1.12.0 when submit a flink job on application mode with HA ,the flink job id will be , when I have many jobs ,they have the same job id , it will be lead to a checkpoint error -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19254) Invalid UTF-8 start byte exception
Jun Zhang created FLINK-19254: - Summary: Invalid UTF-8 start byte exception Key: FLINK-19254 URL: https://issues.apache.org/jira/browse/FLINK-19254 Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.11.0 Reporter: Jun Zhang Fix For: 1.12.0 when read no utf8 data ,JsonRowDeserializationSchema throw a exception. {code:java} Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Invalid UTF-8 start byte xxx {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18691) add HiveCatalog Construction method with HiveConf
Jun Zhang created FLINK-18691: - Summary: add HiveCatalog Construction method with HiveConf Key: FLINK-18691 URL: https://issues.apache.org/jira/browse/FLINK-18691 Project: Flink Issue Type: Improvement Components: Connectors / Hive Affects Versions: 1.11.1 Reporter: Jun Zhang Fix For: 1.12.0 Currently HiveCatalog has two public construction methods. They all need a hiveConfDir variable, which is the path of hive local configuration file. But when we use the Application mode to submit job, the job is submitted on the master node of the cluster, and there may be no hive configuration on the cluster, we can not get the local hive conf path ,so we add a public construction method with HiveConf, which is convenient for users to use. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18549) flink 1.11 can not commit partition automatically
Jun Zhang created FLINK-18549: - Summary: flink 1.11 can not commit partition automatically Key: FLINK-18549 URL: https://issues.apache.org/jira/browse/FLINK-18549 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.11.0 Reporter: Jun Zhang Fix For: 1.11.1 I use the sql of flink 1.11, read from kafka and writing to hdfs, I found that the partition cannot be submitted automatically. This is my complete code。 My checkpoint interval is 10s. I think it should be normal that there will be _SUCCESS file under the partition of hdfs every 10s, but in fact there is no {code:java} StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); bsEnv.enableCheckpointing(1); bsEnv.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv); String sqlSource = "CREATE TABLE source_kafka (\n" + "appName STRING,\n" + "appVersion STRING,\n" + "uploadTime STRING\n" + ") WITH (\n" + " 'connector.type' = 'kafka', \n" + " 'connector.version' = '0.10',\n" + " 'connector.topic' = 'test_topic',\n" + " 'connector.properties.zookeeper.connect' = 'localhost:2181',\n" + " 'connector.properties.bootstrap.servers' = 'localhost:9092',\n" + " 'connector.properties.group.id' = 'testGroup',\n" + " 'format.type'='json',\n" + " 'update-mode' = 'append' )"; tEnv.executeSql(sqlSource); String sql = "CREATE TABLE fs_table (\n" + "appName STRING,\n" + "appVersion STRING,\n" + "uploadTime STRING,\n" + " dt STRING," + " h string" + ") PARTITIONED BY (dt,h) WITH (\n" + " 'connector'='filesystem',\n" + // " 'path'='hdfs://localhost/tmp/',\n" + " 'sink.partition-commit.policy.kind' = 'success-file', " + " 'format'='orc'\n" + ")"; tEnv.executeSql(sql); String insertSql = "insert into fs_table SELECT appName ,appVersion,uploadTime, " + " DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd'), DATE_FORMAT(LOCALTIMESTAMP, 'HH') FROM source_kafka"; tEnv.executeSql(insertSql); {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18479) can not commit partition when set partition time
Jun Zhang created FLINK-18479: - Summary: can not commit partition when set partition time Key: FLINK-18479 URL: https://issues.apache.org/jira/browse/FLINK-18479 Project: Flink Issue Type: Bug Components: FileSystems Affects Versions: 1.11.0 Reporter: Jun Zhang Fix For: 1.11.1 when we write streaming data to filesystem, and select the 'partition time' , we can not commit the partition when write finished. {code:java} LocalDateTime partTime = extractor.extract( partitionKeys, extractPartitionValues(new Path(partition))); if (watermark > toMills(partTime) + commitDelay) { needCommit.add(partition); iter.remove(); } {code} when we set a not UTC zone, and submit the partition, the method 'toMills' will get the UTC mills ,for example ,in UTC/GMT+08:00 ,the watermark will less than the toMills , so we can not commit the partition forever. if we use a local time , not utc , it will be ok in UTC zone and other zone. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16818) Optimize data skew when flink write data to hive dynamic partition table
Jun Zhang created FLINK-16818: - Summary: Optimize data skew when flink write data to hive dynamic partition table Key: FLINK-16818 URL: https://issues.apache.org/jira/browse/FLINK-16818 Project: Flink Issue Type: Improvement Components: Connectors / Hive Affects Versions: 1.10.0 Environment: {code:java} {code} Reporter: Jun Zhang Fix For: 1.11.0 I read the source table data of hive through flink sql, and then write the target table of hive. The target table is a partitioned table. When the data of a partition is particularly large, data skew occurs, resulting in a particularly long execution time. By default Configuration, the same sql, hive on spark takes five minutes, and flink takes about 40 minutes. example: {code:java} // the schema of myparttable name string, age int, PARTITIONED BY ( type string, day string ) INSERT OVERWRITE myparttable SELECT name, age, type,day from sourcetable; {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16799) add hive partition limit when read from hive
Jun Zhang created FLINK-16799: - Summary: add hive partition limit when read from hive Key: FLINK-16799 URL: https://issues.apache.org/jira/browse/FLINK-16799 Project: Flink Issue Type: Improvement Components: Connectors / Hive Affects Versions: 1.10.0 Reporter: Jun Zhang Fix For: 1.11.0 add a partition limit when read from hive , a query will not be executed if it attempts to fetch more partitions per table than the limit configured. To avoid full table scans -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16731) Support show partitions table command in sql client
Jun Zhang created FLINK-16731: - Summary: Support show partitions table command in sql client Key: FLINK-16731 URL: https://issues.apache.org/jira/browse/FLINK-16731 Project: Flink Issue Type: New Feature Components: Table SQL / Client Affects Versions: 1.10.0 Reporter: Jun Zhang Fix For: 1.11.0 Add a SHOW PARTITIONS TABLE command in sql client to support show the partition information of the partition table -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16709) add a set command to set job name when submit job on sql client
Jun Zhang created FLINK-16709: - Summary: add a set command to set job name when submit job on sql client Key: FLINK-16709 URL: https://issues.apache.org/jira/browse/FLINK-16709 Project: Flink Issue Type: Improvement Components: Table SQL / Client Affects Versions: 1.10.0 Reporter: Jun Zhang Fix For: 1.11.0 When we submit a sql job in the sql client, the default job name is sessionid + sql, and the job name cannot be specified, but when the sql is very long, for example, I have 100 columns, this will be unfriendly to display on the web UI ,when there are many jobs, it is not easy to find job. So we add a command 'set execution.job-name = jobname' which can set the job name of the submitted job -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16646) flink read orc file throw a NullPointerException
Jun Zhang created FLINK-16646: - Summary: flink read orc file throw a NullPointerException Key: FLINK-16646 URL: https://issues.apache.org/jira/browse/FLINK-16646 Project: Flink Issue Type: Bug Components: Connectors / Hive Affects Versions: 1.10.0 Reporter: Jun Zhang Fix For: 1.11.0 When I use OrcRowInputFormat to read multiple orc files, the system throws one NullPointerException . the code like this {code:java} StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.setParallelism(1); String path = "file://tmp/dir"; String schema = . ; OrcRowInputFormat orcRowInputFormat = new OrcRowInputFormat( path, schema, new org.apache.hadoop.conf.Configuration()); DataStream dataStream =environment.createInput(orcRowInputFormat); dataStream.writeAsText("file:///tmp/aaa", FileSystem.WriteMode.OVERWRITE); environment.execute(); {code} the exception is {code:java} Caused by: java.lang.NullPointerExceptionCaused by: java.lang.NullPointerException at org.apache.flink.orc.shim.OrcShimV200.computeProjectionMask(OrcShimV200.java:188) at org.apache.flink.orc.shim.OrcShimV200.createRecordReader(OrcShimV200.java:120) at org.apache.flink.orc.OrcSplitReader.(OrcSplitReader.java:73) at org.apache.flink.orc.OrcRowSplitReader.(OrcRowSplitReader.java:50) at org.apache.flink.orc.OrcRowInputFormat.open(OrcRowInputFormat.java:102) at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:315) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16623) add the shorthand 'desc' for describe on sql client
Jun Zhang created FLINK-16623: - Summary: add the shorthand 'desc' for describe on sql client Key: FLINK-16623 URL: https://issues.apache.org/jira/browse/FLINK-16623 Project: Flink Issue Type: Improvement Components: Table SQL / Client Affects Versions: 1.10.0 Reporter: Jun Zhang Fix For: 1.11.0 When get the table schema in the sql client, we can only use the describe command, not the shorthand desc, but the desc command is supported in many sql clients, such as spark, hive, mysql, etc. We should add the desc command in the flink sql client -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16539) sql client set param error
Jun Zhang created FLINK-16539: - Summary: sql client set param error Key: FLINK-16539 URL: https://issues.apache.org/jira/browse/FLINK-16539 Project: Flink Issue Type: Bug Components: Command Line Client Affects Versions: 1.10.0 Reporter: Jun Zhang Fix For: 1.11.0 When setting int type parameters in sql client, such as: set execution.parallelism = 10; The system threw an exception: {code:java} // Caused by: org.apache.flink.table.api.ValidationException: Property 'parallelism' must be a integer value but was: 10 at org.apache.flink.table.descriptors.DescriptorProperties.validateComparable(DescriptorProperties.java:1572) at org.apache.flink.table.descriptors.DescriptorProperties.validateInt(DescriptorProperties.java:944) at org.apache.flink.table.descriptors.DescriptorProperties.validateInt(DescriptorProperties.java:937) at org.apache.flink.table.client.config.entries.ExecutionEntry.validate(ExecutionEntry.java:140) at org.apache.flink.table.client.config.entries.ConfigEntry.(ConfigEntry.java:39) ... 11 more {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14739) add failOnCastException Configuration to Json FormatDescriptor
Jun Zhang created FLINK-14739: - Summary: add failOnCastException Configuration to Json FormatDescriptor Key: FLINK-14739 URL: https://issues.apache.org/jira/browse/FLINK-14739 Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.9.1 Reporter: Jun Zhang Fix For: 1.9.2 When flink read data from kafka (format is json), the schema is defined, similar to the following DDL {code:java} CREATE TABLE kafka_source ( intotime VARCHAR, userinfo ROW ) WITH ( 'connector.type' = 'kafka', 'format.type' = 'json', . ) {code} But when flink encounters error data, such as the type of userinfo is a string, the program will throw the following exception and then fail. {code:java} Caused by: java.lang.ClassCastException: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode cannot be cast to org.apache.flink.shaded.jackson2.com.fasterxml.jackson. databind.node.ObjectNode {code} I want to find the wrong data and don't want the program to fail. So I want to add a json configuration, just like org.apache.flink.table.descriptors.Json#failOnMissingField, which allows the user to configure. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-10170) Support map types in descriptor-based Table API
Jun Zhang created FLINK-10170: - Summary: Support map types in descriptor-based Table API Key: FLINK-10170 URL: https://issues.apache.org/jira/browse/FLINK-10170 Project: Flink Issue Type: Improvement Components: Table API SQL Affects Versions: 1.6.1, 1.7.0 Reporter: Jun Zhang Assignee: Jun Zhang Fix For: 1.6.0 Since 1.6 the recommended way of creating source/sink table is using connector/format/schema/ descriptors. However, when declaring map types in the schema descriptor, the following exception would be thrown: {quote}org.apache.flink.table.api.TableException: A string representation for array types is not supported yet.{quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10079) Automatically resolve and register sink table name from external catalogs
Jun Zhang created FLINK-10079: - Summary: Automatically resolve and register sink table name from external catalogs Key: FLINK-10079 URL: https://issues.apache.org/jira/browse/FLINK-10079 Project: Flink Issue Type: Improvement Components: Table API SQL Affects Versions: 1.6.0 Reporter: Jun Zhang Assignee: Jun Zhang Fix For: 1.6.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10064) Fixed a typo in ExternalCatalogTable
Jun Zhang created FLINK-10064: - Summary: Fixed a typo in ExternalCatalogTable Key: FLINK-10064 URL: https://issues.apache.org/jira/browse/FLINK-10064 Project: Flink Issue Type: Bug Components: Table API SQL Affects Versions: 1.6.0 Reporter: Jun Zhang Assignee: Jun Zhang Fix For: 1.6.0 IsTableSink should return isSink not isSource, I suppose it was a small typo: {code:java} def isTableSink: Boolean = { isSource }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10061) Fix unsupported configuration in KafkaTableSink
Jun Zhang created FLINK-10061: - Summary: Fix unsupported configuration in KafkaTableSink Key: FLINK-10061 URL: https://issues.apache.org/jira/browse/FLINK-10061 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.6.0 Reporter: Jun Zhang Assignee: Jun Zhang Fix For: 1.6.0 When using KafkaTableSink in "table.writeToSink(), the following exception is thrown: {code:java} "java.lang.UnsupportedOperationException: Reconfiguration of this sink is not supported."{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9444) KafkaAvroTableSource failed to work for map fields
Jun Zhang created FLINK-9444: Summary: KafkaAvroTableSource failed to work for map fields Key: FLINK-9444 URL: https://issues.apache.org/jira/browse/FLINK-9444 Project: Flink Issue Type: Bug Components: Table API SQL Affects Versions: 1.6.0 Reporter: Jun Zhang Fix For: 1.6.0 Once some Avro schema has map fields, an exception will be thrown when registering the KafkaAvroTableSource, complaining like: Exception in thread "main" org.apache.flink.table.api.ValidationException: Type Mapof table field 'event' does not match with type GenericType of the field 'event' of the TableSource return type. at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:74) at org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:92) at org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:71) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:71) at org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33) at org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:124) at org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:438) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9384) KafkaAvroTableSource failed to work due to type mismatch
Jun Zhang created FLINK-9384: Summary: KafkaAvroTableSource failed to work due to type mismatch Key: FLINK-9384 URL: https://issues.apache.org/jira/browse/FLINK-9384 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.6.0 Reporter: Jun Zhang Fix For: 1.6.0 An exception was thrown when using KafkaAvroTableSource as follows: Exception in thread "main" org.apache.flink.table.api.TableException: TableSource of type org.apache.flink.streaming.connectors.kafka.Kafka011AvroTableSource returned a DataStream of type GenericType that does not match with the type Row(id: Integer, name: String, age: Integer, event: GenericType) declared by the TableSource.getReturnType() method. Please validate the implementation of the TableSource. at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:100) at org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:885) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:812) at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:279) at org.apache.flink.table.api.Table.writeToSink(table.scala:862) at org.apache.flink.table.api.Table.writeToSink(table.scala:830) at org.apache.flink.quickstart.StreamingJobAvro.main(StreamingJobAvro.java:85) It is caused by a discrepancy between the type returned by the TableSource and the type returned by the DataStream. I've already fixed it, would someone please review the patch and see if it could be merged. -- This message was sent by Atlassian JIRA (v7.6.3#76005)