[jira] [Created] (FLINK-32819) flink can not parse the param `#` correctly in k8s application mode

2023-08-09 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-32819:
-

 Summary: flink can not parse the param `#` correctly in k8s 
application mode
 Key: FLINK-32819
 URL: https://issues.apache.org/jira/browse/FLINK-32819
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Configuration
Affects Versions: 1.17.1
Reporter: Jun Zhang
 Fix For: 1.18.0


when I submit a flink job in k8s application mode, and has a param contains `#` 
,for example mysql password , the flink can not parse the param correctly.  the 
content after the `#` will lost.
{code:java}

/mnt/flink/flink-1.17.0/bin/flink run-application \
-Dexecution.target=kubernetes-application \
-Dkubernetes.container.image=x \
local:///opt/flink/usrlib/my.jar  \
--mysql-conf hostname=localhost \
--mysql-conf username=root \
--mysql-conf password=%&^GGJI#$jh665$fi^% \
--mysql-conf port=3306 

{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31459) add UPDATE COLUMN POSITION for flink table store

2023-03-14 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-31459:
-

 Summary: add UPDATE COLUMN POSITION for flink table store
 Key: FLINK-31459
 URL: https://issues.apache.org/jira/browse/FLINK-31459
 Project: Flink
  Issue Type: Improvement
  Components: Table Store
Affects Versions: table-store-0.3.1
Reporter: Jun Zhang
 Fix For: table-store-0.4.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31338) support infer parallelism for flink table store

2023-03-06 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-31338:
-

 Summary: support  infer parallelism for flink table store
 Key: FLINK-31338
 URL: https://issues.apache.org/jira/browse/FLINK-31338
 Project: Flink
  Issue Type: Improvement
  Components: Table Store
Affects Versions: table-store-0.3.0
Reporter: Jun Zhang
 Fix For: table-store-0.4.0


When using flink  to query the fts table, we can config the scan parallelism by 
set the scan.parallelism, but the user may do not know how much parallelism 
should be used,  setting a too large parallelism will cause resource waste, 
setting the parallelism too small will cause the query to be slow, so we can 
add parallelism infer.

The function is enabled by default. the parallelism is equal to the number of 
read splits. Of course, the user can manually turn off the infer function. In 
order to prevent too many datafiles from causing excessive parallelism, we also 
set a max infer parallelism. When the infer parallelism exceeds the setting, 
use the max parallelism.

In addition, we also need to compare with the limit in the select query 
statement to get a more appropriate parallelism in the case of limit pushdown, 
for example we have a sql select * from table limit 1, and finally we infer the 
parallelism is 10, but we only one parallel is needed , besause we only need 
one data .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31258) can not get kerberos keytab in flink operator

2023-02-28 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-31258:
-

 Summary: can not get kerberos keytab in flink operator
 Key: FLINK-31258
 URL: https://issues.apache.org/jira/browse/FLINK-31258
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Reporter: Jun Zhang


env:

flink k8s operator 1.4

flink 1.14.6 :

the conf
{code:java}
  flinkConfiguration:
    security.kerberos.login.keytab=/path/your/user.keytab 
   security.kerberos.login.principal=y...@hadoop.com  {code}
and I get an exception:

 
{code:java}
Status:
  Cluster Info:
  Error:                          
{"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.client.deployment.ClusterDeploymentException:
 Could not create Kubernetes cluster 
\"basic-example\".","throwableList":[{"type":"org.apache.flink.client.deployment.ClusterDeploymentException","message":"Could
 not create Kubernetes cluster 
\"basic-example\"."},{"type":"org.apache.flink.configuration.IllegalConfigurationException","message":"Kerberos
 login configuration is invalid: keytab [/path/your/user.keytab] doesn't 
exist!"}]} {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31224) Add metrics for flink table store

2023-02-25 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-31224:
-

 Summary: Add metrics for flink table store
 Key: FLINK-31224
 URL: https://issues.apache.org/jira/browse/FLINK-31224
 Project: Flink
  Issue Type: Improvement
  Components: Table Store
Affects Versions: table-store-0.3.1
Reporter: Jun Zhang
 Fix For: table-store-0.4.0


Add metrics for flink table store



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31128) Add Create Table As for flink table store

2023-02-19 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-31128:
-

 Summary: Add Create Table As for flink table store
 Key: FLINK-31128
 URL: https://issues.apache.org/jira/browse/FLINK-31128
 Project: Flink
  Issue Type: Improvement
  Components: Table Store
Affects Versions: table-store-0.3.0
Reporter: Jun Zhang
 Fix For: table-store-0.4.0


Add Create Table As for flink table store



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31028) Provide different compression methods for per level

2023-02-12 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-31028:
-

 Summary: Provide different compression methods for per level
 Key: FLINK-31028
 URL: https://issues.apache.org/jira/browse/FLINK-31028
 Project: Flink
  Issue Type: New Feature
  Components: Table Store
Affects Versions: table-store-0.3.0
Reporter: Jun Zhang
 Fix For: table-store-0.4.0


Different compression are provided for different levels.

For level 0 ,because the amount of data in this level is not large, we do not 
want to use compression in exchange for better write performance . For normal 
levels, we use lz4 . For the last level, access is generally less and data 
volume is large. we hope to use gzip to reduce space size.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31011) upgrade hiverunner version

2023-02-10 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-31011:
-

 Summary: upgrade hiverunner version
 Key: FLINK-31011
 URL: https://issues.apache.org/jira/browse/FLINK-31011
 Project: Flink
  Issue Type: Improvement
  Components: Table Store
Affects Versions: table-store-0.3.1
Reporter: Jun Zhang
 Fix For: table-store-0.4.0


The current HiveRunner Test framework (version 4) has some bugs. for example, 
when we rename a table, it does not rename the location, I test that, the last 
version (6) is fine, so we should upgrade the HiveRunner to last version 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30871) add bloom filter for orc

2023-02-01 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-30871:
-

 Summary: add bloom filter for orc 
 Key: FLINK-30871
 URL: https://issues.apache.org/jira/browse/FLINK-30871
 Project: Flink
  Issue Type: Improvement
  Components: Table Store
Reporter: Jun Zhang
 Fix For: table-store-0.4.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30608) support rename table

2023-01-09 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-30608:
-

 Summary: support rename table 
 Key: FLINK-30608
 URL: https://issues.apache.org/jira/browse/FLINK-30608
 Project: Flink
  Issue Type: Improvement
Reporter: Jun Zhang
 Fix For: table-store-0.4.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30595) support create table like

2023-01-06 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-30595:
-

 Summary: support create table like
 Key: FLINK-30595
 URL: https://issues.apache.org/jira/browse/FLINK-30595
 Project: Flink
  Issue Type: Improvement
Reporter: Jun Zhang
 Fix For: table-store-0.4.0


support CREATE TABLE LIKE 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-23955) submit flink sql job error when flink HA on yarn is configured

2021-08-24 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-23955:
-

 Summary: submit flink sql job error when flink HA on yarn is 
configured
 Key: FLINK-23955
 URL: https://issues.apache.org/jira/browse/FLINK-23955
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.13.0
Reporter: Jun Zhang
 Fix For: 1.14.0


1.when I configured the flink HA ,like this
{code:java}
high-availability: zookeeper
high-availability.storageDir: hdfs://xxx/flink/ha/
high-availability.zookeeper.quorum: x:2181
high-availability.zookeeper.path.root: /flink
{code}
2.I start a flink session cluster

3.I submit a flink sql job and set the 

 
{code:java}
set execution.target = yarn-per-job;
{code}
I get the error

 

 
{code:java}
2021-08-25 10:40:39,500 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
   [] - Found Web Interface master3:38052 of application 
'application_1629858010528_0002'.
2021-08-25 10:40:42,447 WARN  
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - An 
exception occurred when fetching query results
java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.rest.util.RestClientException: [Internal server 
error., ]
at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
~[?:1.8.0_291]
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) 
~[?:1.8.0_291]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sendRequest(CollectResultFetcher.java:163)
 ~[flink-dist_2.12-1.13.0.jar:1.13.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:128)
 ~[flink-dist_2.12-1.13.0.jar:1.13.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
 ~[flink-dist_2.12-1.13.0.jar:1.13.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
 ~[flink-dist_2.12-1.13.0.jar:1.13.0]
at 
org.apache.iceberg.relocated.com.google.common.collect.Iterators.addAll(Iterators.java:355)
 ~[iceberg-flink-runtime-77ea34e.dirty.jar:?]
at 
org.apache.iceberg.relocated.com.google.common.collect.Lists.newArrayList(Lists.java:143)
 ~[iceberg-flink-runtime-77ea34e.dirty.jar:?]
at 
org.apache.iceberg.flink.source.RowDataRewriter.rewriteDataForTasks(RowDataRewriter.java:91)
 ~[iceberg-flink-runtime-77ea34e.dirty.jar:?]
at 
org.apache.iceberg.flink.actions.RewriteDataFilesAction.rewriteDataForTasks(RewriteDataFilesAction.java:56)
 ~[iceberg-flink-runtime-77ea34e.dirty.jar:?]
at 
org.apache.iceberg.actions.BaseRewriteDataFilesAction.execute(BaseRewriteDataFilesAction.java:240)
 ~[iceberg-flink-runtime-77ea34e.dirty.jar:?]
at flink.test.RewriteTable.main(RewriteTable.java:52) 
~[iceberg-projects-1.0-SNAPSHOT.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_291]
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_291]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_291]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_291]
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
 ~[flink-clients_2.12-1.13.0.jar:1.13.0]
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-clients_2.12-1.13.0.jar:1.13.0]
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
~[flink-clients_2.12-1.13.0.jar:1.13.0]
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) 
~[flink-clients_2.12-1.13.0.jar:1.13.0]
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) 
~[flink-clients_2.12-1.13.0.jar:1.13.0]
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) 
~[flink-clients_2.12-1.13.0.jar:1.13.0]
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) 
~[flink-clients_2.12-1.13.0.jar:1.13.0]
at java.security.AccessController.doPrivileged(Native Method) 
~[?:1.8.0_291]
at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_291]
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
 [flink-shaded-hadoop2-uber-2.8.3-1.8.3.jar:2.8.3-1.8.3]
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 [flink-dist_2.12-1.13.0.jar:1.13.0]
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) 
[flink-clients_2.12-1.13.0.jar:1.13.0]
Caused by: 

[jira] [Created] (FLINK-22975) Specify port or range for k8s service

2021-06-11 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-22975:
-

 Summary: Specify port or range for k8s service
 Key: FLINK-22975
 URL: https://issues.apache.org/jira/browse/FLINK-22975
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / Kubernetes
Affects Versions: 1.13.1
Reporter: Jun Zhang
 Fix For: 1.14.0


When we deploy the flink program in k8s, the service port is randomly 
generated. This random port may not be accessible due to the company's network 
policy, so I think we should be able to let users specify the port or port 
range that is exposed to the outside, similar to   '- -service-node-port-range' 
 parameter



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22591) add sub interface MapColumnVector for ColumnVector

2021-05-07 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-22591:
-

 Summary: add sub interface MapColumnVector  for ColumnVector
 Key: FLINK-22591
 URL: https://issues.apache.org/jira/browse/FLINK-22591
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Affects Versions: 1.13.0
Reporter: Jun Zhang
 Fix For: 1.14.0


For complex types, the org.apache.flink.table.data.vector.ColumnVector 
interface has two sub-interfaces: RowColumnVector and ArrayColumnVector, I 
think we should add another sub-interface : MapColumnVector




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21286) Support BUCKET for flink sql CREATE TABLE

2021-02-04 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-21286:
-

 Summary: Support  BUCKET for flink sql CREATE TABLE
 Key: FLINK-21286
 URL: https://issues.apache.org/jira/browse/FLINK-21286
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Affects Versions: 1.12.1
Reporter: Jun Zhang
 Fix For: 1.13.0


Support BUCKET for flink CREATE TABLE : refer to hive syntax
{code:java}
 [CLUSTERED BY (col_name, col_name, ...) [SORTED BY (col_name [ASC|DESC], ...)] 
INTO num_buckets BUCKETS]
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21285) Support MERGE INTO for flink sql

2021-02-04 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-21285:
-

 Summary: Support  MERGE INTO for flink sql
 Key: FLINK-21285
 URL: https://issues.apache.org/jira/browse/FLINK-21285
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Affects Versions: 1.12.1
Reporter: Jun Zhang
 Fix For: 1.13.0


Support MERGE INTO for flink sql,refer to hive syntax:
{code:java}
MERGE INTO  AS T USING  AS S
ON 
WHEN MATCHED [AND ] THEN UPDATE SET 
WHEN MATCHED [AND ] THEN DELETE
WHEN NOT MATCHED [AND ] THEN INSERT VALUES
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21283) Support sql extension for flink sql

2021-02-04 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-21283:
-

 Summary: Support sql extension for flink sql 
 Key: FLINK-21283
 URL: https://issues.apache.org/jira/browse/FLINK-21283
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Affects Versions: 1.12.1
Reporter: Jun Zhang
 Fix For: 1.13.0


I think we should add sql extension for flink sql so that users can customize 
sql parsing, sql optimization, etc. we can refer to [spark sql extension 
|https://issues.apache.org/jira/browse/SPARK-18127]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21282) Support UPDATE for flink sql

2021-02-04 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-21282:
-

 Summary: Support  UPDATE for flink sql
 Key: FLINK-21282
 URL: https://issues.apache.org/jira/browse/FLINK-21282
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Client
Affects Versions: 1.12.1
Reporter: Jun Zhang
 Fix For: 1.13.0


Support UPDATE for flink sql,the syntax like this:
{code:java}
UPDATE tablename SET column = value [, column = value ...] [WHERE expression]
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21281) Support DELETE FROM for flink sql

2021-02-04 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-21281:
-

 Summary: Support  DELETE FROM for flink sql
 Key: FLINK-21281
 URL: https://issues.apache.org/jira/browse/FLINK-21281
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Client
Affects Versions: 1.12.1
Reporter: Jun Zhang
 Fix For: 1.13.0


Support DELETE FROM for flink sql,the syntax like this:
{code:java}
DELETE FROM tablename [WHERE expression]
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21198) Add show create table command support for flink sql client

2021-01-28 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-21198:
-

 Summary: Add show create table command support for flink sql client
 Key: FLINK-21198
 URL: https://issues.apache.org/jira/browse/FLINK-21198
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Client
Affects Versions: 1.12.0
 Environment: I think we should add SHOW CREATE TABLE command for flink 
sql client,so that we can view the table schema.
Reporter: Jun Zhang
 Fix For: 1.13.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21197) DESC command in sql-client should show the column comment

2021-01-28 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-21197:
-

 Summary: DESC command in sql-client should show the column comment
 Key: FLINK-21197
 URL: https://issues.apache.org/jira/browse/FLINK-21197
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Client
Affects Versions: 1.12.0
Reporter: Jun Zhang
 Fix For: 1.13.0


like hive,I think the DESC/DESCRIBE command in sql client should show the 
column comment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20977) can not use `use` command to switch database

2021-01-14 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-20977:
-

 Summary: can not use `use` command to switch database 
 Key: FLINK-20977
 URL: https://issues.apache.org/jira/browse/FLINK-20977
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.12.0
Reporter: Jun Zhang
 Fix For: 1.13.0


I have a database which name is mod, when I use `use mod` to switch to the 
db,the system throw an exception, I surround it with backticks ,it is still not 
well



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20809) the limit push down invalid when use filter

2020-12-29 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-20809:
-

 Summary: the limit push down invalid when use filter 
 Key: FLINK-20809
 URL: https://issues.apache.org/jira/browse/FLINK-20809
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.12.0
Reporter: Jun Zhang
 Fix For: 1.13.0


when I use flink sql to query hive table , like this 
{code:java}
// select * from hive_table where id = 1 limit 1
{code}
 
when the sql contain query conditions in where clause, I found that the limit 
push down is invalid.

I look up the comment on source code , I think it is should be push down , is 
it a bug ?

[the comment 
|https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.java#L64]




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20767) add nested field support for SupportsFilterPushDown

2020-12-24 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-20767:
-

 Summary: add nested field support for SupportsFilterPushDown
 Key: FLINK-20767
 URL: https://issues.apache.org/jira/browse/FLINK-20767
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.12.0
Reporter: Jun Zhang
 Fix For: 1.13.0


I think we should add the nested field support for SupportsFilterPushDown



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19896) Support first-n-rows deduplication in the Deduplicate operator

2020-10-30 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-19896:
-

 Summary: Support first-n-rows deduplication in the Deduplicate 
operator
 Key: FLINK-19896
 URL: https://issues.apache.org/jira/browse/FLINK-19896
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Affects Versions: 1.12.0, 1.11.3
Reporter: Jun Zhang
 Fix For: 1.11.2


Currently Deduplicate operator only supports first-row deduplication (ordered 
by proc-time). In scenario of first-n-rows deduplication, the planner has to 
resort to Rank operator.  However, Rank operator is less efficient than 
Deduplicate in terms of state consumption.

This issue proposes to extend DeduplicateKeepFirstRowFunction to support 
first-n-rows deduplication.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19358) when submit job on application mode with HA,the jobid will be 0000000000

2020-09-22 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-19358:
-

 Summary: when submit job on application mode with HA,the jobid 
will be 00
 Key: FLINK-19358
 URL: https://issues.apache.org/jira/browse/FLINK-19358
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Affects Versions: 1.11.0
Reporter: Jun Zhang
 Fix For: 1.12.0


when submit a flink job on application mode with HA ,the flink job id will be 
, when I have many jobs ,they have the same job 
id , it will be lead to a checkpoint error



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19254) Invalid UTF-8 start byte exception

2020-09-15 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-19254:
-

 Summary: Invalid UTF-8 start byte exception 
 Key: FLINK-19254
 URL: https://issues.apache.org/jira/browse/FLINK-19254
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.11.0
Reporter: Jun Zhang
 Fix For: 1.12.0


when read  no utf8 data ,JsonRowDeserializationSchema throw a exception.
{code:java}

Caused by: 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: 
Invalid UTF-8 start byte xxx 
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18691) add HiveCatalog Construction method with HiveConf

2020-07-23 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-18691:
-

 Summary: add HiveCatalog Construction method with HiveConf
 Key: FLINK-18691
 URL: https://issues.apache.org/jira/browse/FLINK-18691
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Hive
Affects Versions: 1.11.1
Reporter: Jun Zhang
 Fix For: 1.12.0


Currently HiveCatalog has two public construction methods. They all need a 
hiveConfDir variable, which is the path of hive local configuration file. But 
when we use the Application mode to submit job, the job is submitted on the 
master node of the cluster, and there may be no hive configuration on the 
cluster, we can not get the local hive conf path ,so we add a public 
construction method with HiveConf, which is convenient for users to use.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18549) flink 1.11 can not commit partition automatically

2020-07-09 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-18549:
-

 Summary: flink 1.11 can not commit partition automatically
 Key: FLINK-18549
 URL: https://issues.apache.org/jira/browse/FLINK-18549
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.11.0
Reporter: Jun Zhang
 Fix For: 1.11.1


I use the sql of flink 1.11, read from kafka and writing to hdfs, I found that 
the partition cannot be submitted automatically. This is my complete code。

My checkpoint interval is 10s. I think it should be normal that there will be 
_SUCCESS file under the partition of hdfs every 10s, but in fact there is no

 
{code:java}
   StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
  bsEnv.enableCheckpointing(1);
  bsEnv.setParallelism(1);
  StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);

  String sqlSource = "CREATE TABLE  source_kafka (\n" +
 "appName  STRING,\n" +
 "appVersion STRING,\n" +
 "uploadTime STRING\n" +
 ") WITH (\n" +
 "  'connector.type' = 'kafka',   \n" +
 "  'connector.version' = '0.10',\n" +
 "  'connector.topic' = 'test_topic',\n" +
 "  'connector.properties.zookeeper.connect' = 
'localhost:2181',\n" +
 "  'connector.properties.bootstrap.servers' = 
'localhost:9092',\n" +
 "  'connector.properties.group.id' = 'testGroup',\n" +
 "  'format.type'='json',\n" +
 "  'update-mode' = 'append' )";

  tEnv.executeSql(sqlSource);


  String sql = "CREATE TABLE fs_table (\n" +
   "appName  STRING,\n" +
   "appVersion STRING,\n" +
   "uploadTime STRING,\n" +
   "  dt STRING," +
   "  h string" +
   ")  PARTITIONED BY (dt,h)  WITH (\n" +
   "  'connector'='filesystem',\n" +
// "  'path'='hdfs://localhost/tmp/',\n" +
 " 'sink.partition-commit.policy.kind' = 'success-file', " +
 "  'format'='orc'\n" +
 ")";
  tEnv.executeSql(sql);

  String insertSql = "insert into  fs_table SELECT appName 
,appVersion,uploadTime, " +
 " DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd'), 
DATE_FORMAT(LOCALTIMESTAMP, 'HH') FROM source_kafka";

  tEnv.executeSql(insertSql);
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18479) can not commit partition when set partition time

2020-07-02 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-18479:
-

 Summary: can not commit partition when set partition time
 Key: FLINK-18479
 URL: https://issues.apache.org/jira/browse/FLINK-18479
 Project: Flink
  Issue Type: Bug
  Components: FileSystems
Affects Versions: 1.11.0
Reporter: Jun Zhang
 Fix For: 1.11.1


when we write streaming data to filesystem, and select the 'partition time' , 
we can not commit the partition when write finished.

 
{code:java}

LocalDateTime partTime = extractor.extract(
  partitionKeys, extractPartitionValues(new Path(partition)));
if (watermark > toMills(partTime) + commitDelay) {
   needCommit.add(partition);
   iter.remove();
}
{code}
when we set a not UTC zone, and submit the partition, the method 'toMills' will 
get the UTC mills ,for example ,in UTC/GMT+08:00 ,the watermark will less than 
the toMills , so we can not commit the partition forever.

if we use a local time , not utc , it will be ok in UTC zone and other zone.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16818) Optimize data skew when flink write data to hive dynamic partition table

2020-03-26 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-16818:
-

 Summary: Optimize data skew when flink write data to hive dynamic 
partition table
 Key: FLINK-16818
 URL: https://issues.apache.org/jira/browse/FLINK-16818
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Hive
Affects Versions: 1.10.0
 Environment: {code:java}
 {code}
Reporter: Jun Zhang
 Fix For: 1.11.0


I read the source table data of hive through flink sql, and then write the 
target table of hive. The target table is a partitioned table. When the data of 
a partition is particularly large, data skew occurs, resulting in a 
particularly long execution time.

By default Configuration, the same sql, hive on spark takes five minutes, and 
flink takes about 40 minutes.

example:

 
{code:java}
// the schema of myparttable

name string,
age int,
PARTITIONED BY ( 
type string, 
day string
)

INSERT OVERWRITE myparttable SELECT name, age, type,day from sourcetable;
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16799) add hive partition limit when read from hive

2020-03-26 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-16799:
-

 Summary: add hive partition limit when read from hive
 Key: FLINK-16799
 URL: https://issues.apache.org/jira/browse/FLINK-16799
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Hive
Affects Versions: 1.10.0
Reporter: Jun Zhang
 Fix For: 1.11.0


add a partition limit when read from hive , a query will not be executed if it 
attempts to fetch more partitions per table than the limit configured. 
 
 To avoid full table scans



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16731) Support show partitions table command in sql client

2020-03-23 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-16731:
-

 Summary: Support  show partitions table command in sql client
 Key: FLINK-16731
 URL: https://issues.apache.org/jira/browse/FLINK-16731
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Client
Affects Versions: 1.10.0
Reporter: Jun Zhang
 Fix For: 1.11.0


Add a SHOW PARTITIONS TABLE command in sql client to support show the partition 
information of the partition table



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16709) add a set command to set job name when submit job on sql client

2020-03-22 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-16709:
-

 Summary: add a set command to set job name when submit job on sql 
client
 Key: FLINK-16709
 URL: https://issues.apache.org/jira/browse/FLINK-16709
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Client
Affects Versions: 1.10.0
Reporter: Jun Zhang
 Fix For: 1.11.0


When we submit a sql job in the sql client, the default job name is sessionid + 
sql, and the job name cannot be specified, but when the sql is very long, for 
example, I have 100 columns, this will be unfriendly to display on the web UI 
,when there are many jobs, it is not easy to find job. So we add a command 'set 
execution.job-name = jobname' which can set the job name of the submitted job



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16646) flink read orc file throw a NullPointerException

2020-03-18 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-16646:
-

 Summary: flink read orc file throw a NullPointerException
 Key: FLINK-16646
 URL: https://issues.apache.org/jira/browse/FLINK-16646
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Affects Versions: 1.10.0
Reporter: Jun Zhang
 Fix For: 1.11.0


When I use OrcRowInputFormat to read multiple orc files, the system throws one 
NullPointerException .

the code like this

 
{code:java}
StreamExecutionEnvironment environment = 
StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
String path = "file://tmp/dir";
String schema = . ;
OrcRowInputFormat orcRowInputFormat = new OrcRowInputFormat(
   path,
   schema,
   new org.apache.hadoop.conf.Configuration());
DataStream dataStream  =environment.createInput(orcRowInputFormat);
dataStream.writeAsText("file:///tmp/aaa", FileSystem.WriteMode.OVERWRITE);
environment.execute();
{code}
 

the exception is 

 
{code:java}
Caused by: java.lang.NullPointerExceptionCaused by: 
java.lang.NullPointerException at 
org.apache.flink.orc.shim.OrcShimV200.computeProjectionMask(OrcShimV200.java:188)
 at 
org.apache.flink.orc.shim.OrcShimV200.createRecordReader(OrcShimV200.java:120) 
at org.apache.flink.orc.OrcSplitReader.(OrcSplitReader.java:73) at 
org.apache.flink.orc.OrcRowSplitReader.(OrcRowSplitReader.java:50) at 
org.apache.flink.orc.OrcRowInputFormat.open(OrcRowInputFormat.java:102) at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:315)
{code}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16623) add the shorthand 'desc' for describe on sql client

2020-03-16 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-16623:
-

 Summary: add the shorthand 'desc'  for describe on sql client
 Key: FLINK-16623
 URL: https://issues.apache.org/jira/browse/FLINK-16623
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Client
Affects Versions: 1.10.0
Reporter: Jun Zhang
 Fix For: 1.11.0


When get the table schema in the sql client, we can only use the describe 
command, not the shorthand desc, but the desc command is supported in many sql 
clients, such as spark, hive, mysql, etc. We should add the desc command in the 
flink sql client



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16539) sql client set param error

2020-03-11 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-16539:
-

 Summary: sql client set param error
 Key: FLINK-16539
 URL: https://issues.apache.org/jira/browse/FLINK-16539
 Project: Flink
  Issue Type: Bug
  Components: Command Line Client
Affects Versions: 1.10.0
Reporter: Jun Zhang
 Fix For: 1.11.0


When setting int type parameters in sql client, such as:
set execution.parallelism = 10;
The system threw an exception: 
{code:java}
//
Caused by: org.apache.flink.table.api.ValidationException: Property 
'parallelism' must be a integer value but was: 10 at 
org.apache.flink.table.descriptors.DescriptorProperties.validateComparable(DescriptorProperties.java:1572)
 at 
org.apache.flink.table.descriptors.DescriptorProperties.validateInt(DescriptorProperties.java:944)
 at 
org.apache.flink.table.descriptors.DescriptorProperties.validateInt(DescriptorProperties.java:937)
 at 
org.apache.flink.table.client.config.entries.ExecutionEntry.validate(ExecutionEntry.java:140)
 at 
org.apache.flink.table.client.config.entries.ConfigEntry.(ConfigEntry.java:39)
 ... 11 more
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14739) add failOnCastException Configuration to Json FormatDescriptor

2019-11-13 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-14739:
-

 Summary: add failOnCastException Configuration to Json 
FormatDescriptor
 Key: FLINK-14739
 URL: https://issues.apache.org/jira/browse/FLINK-14739
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.9.1
Reporter: Jun Zhang
 Fix For: 1.9.2


When flink read data from kafka (format is json), the schema is defined, 
similar to the following DDL
{code:java}
CREATE TABLE kafka_source (
  intotime VARCHAR,
  userinfo ROW
) WITH (
   'connector.type' = 'kafka',
   'format.type' = 'json',
    .
)

{code}
But when flink encounters error data, such as the type of userinfo is a string, 
the program will throw the following exception and then fail.
{code:java}
Caused by: java.lang.ClassCastException: 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode 
cannot be cast to org.apache.flink.shaded.jackson2.com.fasterxml.jackson. 
databind.node.ObjectNode
{code}
I want to find the wrong data and don't want the program to fail. So I want to 
add a json configuration, just like 
org.apache.flink.table.descriptors.Json#failOnMissingField, which allows the 
user to configure.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-10170) Support map types in descriptor-based Table API

2018-08-19 Thread Jun Zhang (JIRA)
Jun Zhang created FLINK-10170:
-

 Summary: Support map types in descriptor-based Table API
 Key: FLINK-10170
 URL: https://issues.apache.org/jira/browse/FLINK-10170
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL
Affects Versions: 1.6.1, 1.7.0
Reporter: Jun Zhang
Assignee: Jun Zhang
 Fix For: 1.6.0


Since 1.6 the recommended way of creating source/sink table is using 
connector/format/schema/ descriptors. However, when declaring map types in the 
schema descriptor, the following exception would be thrown:

{quote}org.apache.flink.table.api.TableException: A string representation for 
array types is not supported yet.{quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10079) Automatically resolve and register sink table name from external catalogs

2018-08-07 Thread Jun Zhang (JIRA)
Jun Zhang created FLINK-10079:
-

 Summary: Automatically resolve and register sink table name from 
external catalogs
 Key: FLINK-10079
 URL: https://issues.apache.org/jira/browse/FLINK-10079
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL
Affects Versions: 1.6.0
Reporter: Jun Zhang
Assignee: Jun Zhang
 Fix For: 1.6.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10064) Fixed a typo in ExternalCatalogTable

2018-08-05 Thread Jun Zhang (JIRA)
Jun Zhang created FLINK-10064:
-

 Summary: Fixed a typo in ExternalCatalogTable
 Key: FLINK-10064
 URL: https://issues.apache.org/jira/browse/FLINK-10064
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Affects Versions: 1.6.0
Reporter: Jun Zhang
Assignee: Jun Zhang
 Fix For: 1.6.0


IsTableSink should return isSink not isSource, I suppose it was a small typo:
{code:java}
def isTableSink: Boolean = {
 isSource
}{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10061) Fix unsupported configuration in KafkaTableSink

2018-08-05 Thread Jun Zhang (JIRA)
Jun Zhang created FLINK-10061:
-

 Summary: Fix unsupported configuration in KafkaTableSink
 Key: FLINK-10061
 URL: https://issues.apache.org/jira/browse/FLINK-10061
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.6.0
Reporter: Jun Zhang
Assignee: Jun Zhang
 Fix For: 1.6.0


When using KafkaTableSink in "table.writeToSink(), the following exception is 
thrown:
{code:java}
"java.lang.UnsupportedOperationException: Reconfiguration of this sink is not 
supported."{code}
 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9444) KafkaAvroTableSource failed to work for map fields

2018-05-26 Thread Jun Zhang (JIRA)
Jun Zhang created FLINK-9444:


 Summary: KafkaAvroTableSource failed to work for map fields
 Key: FLINK-9444
 URL: https://issues.apache.org/jira/browse/FLINK-9444
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Affects Versions: 1.6.0
Reporter: Jun Zhang
 Fix For: 1.6.0


Once some Avro schema has map fields, an exception will be thrown when 
registering the KafkaAvroTableSource, complaining like:

Exception in thread "main" org.apache.flink.table.api.ValidationException: Type 
Map of table field 'event' does not match with type 
GenericType of the field 'event' of the TableSource return type.
 at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:74)
 at 
org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:92)
 at 
org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:71)
 at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
 at 
org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:71)
 at 
org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33)
 at 
org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:124)
 at 
org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:438)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9384) KafkaAvroTableSource failed to work due to type mismatch

2018-05-16 Thread Jun Zhang (JIRA)
Jun Zhang created FLINK-9384:


 Summary: KafkaAvroTableSource failed to work due to type mismatch
 Key: FLINK-9384
 URL: https://issues.apache.org/jira/browse/FLINK-9384
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.6.0
Reporter: Jun Zhang
 Fix For: 1.6.0


An exception was thrown when using KafkaAvroTableSource as follows:

Exception in thread "main" org.apache.flink.table.api.TableException: 
TableSource of type 
org.apache.flink.streaming.connectors.kafka.Kafka011AvroTableSource returned a 
DataStream of type GenericType that does not match 
with the type Row(id: Integer, name: String, age: Integer, event: 
GenericType) declared by the TableSource.getReturnType() method. 
Please validate the implementation of the TableSource.
 at 
org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:100)
 at 
org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:885)
 at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:812)
 at 
org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:279)
 at org.apache.flink.table.api.Table.writeToSink(table.scala:862)
 at org.apache.flink.table.api.Table.writeToSink(table.scala:830)
 at org.apache.flink.quickstart.StreamingJobAvro.main(StreamingJobAvro.java:85)

 

It is caused by a discrepancy between the type returned by the TableSource and 
the type returned by the DataStream. I've already fixed it, would someone 
please review the patch and see if it could be merged.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)