[jira] [Commented] (FLINK-21999) The logic about whether Checkpoint is enabled.
[ https://issues.apache.org/jira/browse/FLINK-21999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17314236#comment-17314236 ] ZhangWei commented on FLINK-21999: -- [~trohrmann] In current implementation, when checkpoint interval is Long.MaxValue. DefaultExecutionGraphBuilder will assume checkpoint is enabled also and in the executionGraph returned by DefaultExecutionGraphBuilder#buildGraph, it will set the related checkpoint configuration. This method will Instantiate a CheckpointCoordinator. So when I set the checkpoint enabled condition to also require checkpoint interval not equal to Long.MaxValue, the CheckpointCoordinator will not be Instantiated, then OperatorCoordinatorSchedulerTest will fail with CheckpointCoordinator is null. In short, current implementation will always Instantiate the CheckpointCoordinator whenever the interval equals is Long.MaxValue or not. I am not sure what is the expected behavior. In addition, I found CheckpointCoordinator#isPeriodicCheckpointingConfigured, what does period means here? what is the difference between period and not period then? Is period means regular checkpoint and not period means disable checkpoint then? The failed CI tests result can be found below: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=16033=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=05b74a19-4ee4-5036-c46f-ada307df6cf0 > The logic about whether Checkpoint is enabled. > -- > > Key: FLINK-21999 > URL: https://issues.apache.org/jira/browse/FLINK-21999 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Reporter: ZhangWei >Assignee: ZhangWei >Priority: Major > Labels: pull-request-available > > org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder#isCheckpointingEnabled > assumes checkpoint enabled when JobCheckpointingSettings is not null. While > this is not enough, we must also guarantee the checkpoint interval is between > [MINIMAL_CHECKPOINT_TIME, Long.MaxValue). That is like the > JobGraph#isCheckpointingEnabled does. >In current implement, when we do not set checkpoint interval, leaving it > the default value -1, the interval will be changed to Long.MaxValue. Thus > DefaultExecutionGraphBuilder#isCheckpointingEnabled will return true. That is > not correct. > in addition, there are different classes assume checkpoint enabled with > different interval range. > 1. CheckpointConfig -> (0,Long.MaxValue*]*. > 2. JobGraph -> (0,Long.MaxValue) > This is not consistent. And the correct range is [MINIMAL_CHECKPOINT_TIME, > Long.MaxValue). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22055) RPC main thread executor may schedule commands with wrong time unit of delay
[ https://issues.apache.org/jira/browse/FLINK-22055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17312196#comment-17312196 ] ZhangWei commented on FLINK-22055: -- [~xintongsong] I am est08zw, assign the ticket to me, thanks! > RPC main thread executor may schedule commands with wrong time unit of delay > > > Key: FLINK-22055 > URL: https://issues.apache.org/jira/browse/FLINK-22055 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.12.2, 1.13.0 >Reporter: Xintong Song >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0, 1.12.3 > > > There's a typo in \{{RpcEndpoint#MainThreadExecutor#schedule(callable, delay, > unit)}}, that may cause the command being scheduled with wrong time unit. > > The issue is Major priority because currently it's not causing any actual > damage. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22049) Simplify calculating the starting index of the local key-group range in InternalTimerServiceImpl constructor.
ZhangWei created FLINK-22049: Summary: Simplify calculating the starting index of the local key-group range in InternalTimerServiceImpl constructor. Key: FLINK-22049 URL: https://issues.apache.org/jira/browse/FLINK-22049 Project: Flink Issue Type: Improvement Components: API / DataStream Reporter: ZhangWei For org.apache.flink.streaming.api.operators.InternalTimerServiceImpl; in current implementation, when we try to find the starting index of the local key-group range, we iterate over the KeyGroupRange and try to find the min value. while this is unnecessary and wasteful, because the KeyGroupRange is often monotonically increasing and we can just get the startKeyGroup which is what we want. It is even worse when we set a very large max parallelism(eg: 10k level) and a small parallelism(eg: 10 level), thus a KeyGroupRange may contain about 1k index in it. And then we iterate this 1k increasing numbers to find the min value. so if the KeyGroupRange is not an EMPTY_KEY_GROUP_RANGE(i.e. KeyGroupRange#getNumberOfKeyGroups() > 0), we can just get the startKeyGroup. if KeyGroupRange is just an EMPTY_KEY_GROUP_RANGE, use the Integer.MAX_VALUE. I think this can avoid much unnecessary operation and save time. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-21999) The logic about whether Checkpoint is enabled.
[ https://issues.apache.org/jira/browse/FLINK-21999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17311443#comment-17311443 ] ZhangWei commented on FLINK-21999: -- [~trohrmann] glad to take it! > The logic about whether Checkpoint is enabled. > -- > > Key: FLINK-21999 > URL: https://issues.apache.org/jira/browse/FLINK-21999 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Reporter: ZhangWei >Priority: Major > > org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder#isCheckpointingEnabled > assumes checkpoint enabled when JobCheckpointingSettings is not null. While > this is not enough, we must also guarantee the checkpoint interval is between > [MINIMAL_CHECKPOINT_TIME, Long.MaxValue). That is like the > JobGraph#isCheckpointingEnabled does. >In current implement, when we do not set checkpoint interval, leaving it > the default value -1, the interval will be changed to Long.MaxValue. Thus > DefaultExecutionGraphBuilder#isCheckpointingEnabled will return true. That is > not correct. > in addition, there are different classes assume checkpoint enabled with > different interval range. > 1. CheckpointConfig -> (0,Long.MaxValue*]*. > 2. JobGraph -> (0,Long.MaxValue) > This is not consistent. And the correct range is [MINIMAL_CHECKPOINT_TIME, > Long.MaxValue). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21999) The logic about whether Checkpoint is enabled.
ZhangWei created FLINK-21999: Summary: The logic about whether Checkpoint is enabled. Key: FLINK-21999 URL: https://issues.apache.org/jira/browse/FLINK-21999 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Reporter: ZhangWei org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder#isCheckpointingEnabled assumes checkpoint enabled when JobCheckpointingSettings is not null. While this is not enough, we must also guarantee the checkpoint interval is between [MINIMAL_CHECKPOINT_TIME, Long.MaxValue). That is like the JobGraph#isCheckpointingEnabled does. In current implement, when we do not set checkpoint interval, leaving it the default value -1, the interval will be changed to Long.MaxValue. Thus DefaultExecutionGraphBuilder#isCheckpointingEnabled will return true. That is not correct. in addition, there are different classes assume checkpoint enabled with different interval range. 1. CheckpointConfig -> (0,Long.MaxValue*]*. 2. JobGraph -> (0,Long.MaxValue) This is not consistent. And the correct range is [MINIMAL_CHECKPOINT_TIME, Long.MaxValue). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21943) Redundant checkNotNull
ZhangWei created FLINK-21943: Summary: Redundant checkNotNull Key: FLINK-21943 URL: https://issues.apache.org/jira/browse/FLINK-21943 Project: Flink Issue Type: Improvement Components: API / Core Affects Versions: 1.12.2 Reporter: ZhangWei In org.apache.flink.api.dag.Transformation#setResources(ResourceSpec minResources, ResourceSpec preferredResources), we have already checked that the parameters are not null by calling OperatorValidationUtils#validateMinAndPreferredResources. so it is not necessary to check again in setResources( ) , just assign the parameters. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21754) Exception message does not point to the right path.
ZhangWei created FLINK-21754: Summary: Exception message does not point to the right path. Key: FLINK-21754 URL: https://issues.apache.org/jira/browse/FLINK-21754 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.12.2 Reporter: ZhangWei In org.apache.flink.configuration.GlobalConfiguration, between line 126~133, we are checking whether conf/flink-conf.yaml file exists, not the conf directory. so in line 131, we should use yamlConfigFile.getAbsolutePath() instead of confDirFile.getAbsolutePath(). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-20700) Default value type does not match the return type of method input()
[ https://issues.apache.org/jira/browse/FLINK-20700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ZhangWei updated FLINK-20700: - Description: In org.apache.flink.table.annotation.FunctionHint, the return type of input() is an array of DataTypeHint, while the default value is @DataTypeHint() which is just an object. So the method defination should be as below, *{}*should be added. DataTypeHint[] input() default \{@DataTypeHint()}; was: In org.apache.flink.table.annotation.FunctionHint, the return type of input() is an array of DataTypeHint, while the default value is @DataTypeHint() which is just an object. So the method defination should be as below, *{}*should be added. DataTypeHint[] input() default {@DataTypeHint()}; > Default value type does not match the return type of method input() > --- > > Key: FLINK-20700 > URL: https://issues.apache.org/jira/browse/FLINK-20700 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0 >Reporter: ZhangWei >Priority: Trivial > > In org.apache.flink.table.annotation.FunctionHint, the return type of input() > is an > array of DataTypeHint, while the default value is @DataTypeHint() which is > just an object. > So the method defination should be as below, *{}*should be added. > DataTypeHint[] input() default \{@DataTypeHint()}; -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20700) Default value type does not match the return type of method input()
ZhangWei created FLINK-20700: Summary: Default value type does not match the return type of method input() Key: FLINK-20700 URL: https://issues.apache.org/jira/browse/FLINK-20700 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.12.0 Reporter: ZhangWei In org.apache.flink.table.annotation.FunctionHint, the return type of input() is an array of DataTypeHint, while the default value is @DataTypeHint() which is just an object. So the method defination should be as below, *{}*should be added. DataTypeHint[] input() default {@DataTypeHint()}; -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-14668) LocalExecutor#getOrCreateExecutionContext not working as expected
[ https://issues.apache.org/jira/browse/FLINK-14668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangwei closed FLINK-14668. Resolution: Fixed > LocalExecutor#getOrCreateExecutionContext not working as expected > - > > Key: FLINK-14668 > URL: https://issues.apache.org/jira/browse/FLINK-14668 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.9.0, 1.9.1 >Reporter: zhangwei >Priority: Critical > Fix For: 1.11.0 > > > ExecutionContext > [copy|[https://github.com/apache/flink/blob/b0a9afdd24fb70131b1e80d46d0ca101235a4a36/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java#L134]] > a new SessionContext in the constructor, but SessionContext members do not > always redo equals, so > {code:java} > executionContext.getSessionContext().equals(session) > {code} > is always false. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14668) LocalExecutor#getOrCreateExecutionContext not working as expected
[ https://issues.apache.org/jira/browse/FLINK-14668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16969876#comment-16969876 ] zhangwei commented on FLINK-14668: -- I read it wrong. I'm sorry > LocalExecutor#getOrCreateExecutionContext not working as expected > - > > Key: FLINK-14668 > URL: https://issues.apache.org/jira/browse/FLINK-14668 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.9.0, 1.9.1 >Reporter: zhangwei >Priority: Critical > Fix For: 1.11.0 > > > ExecutionContext > [copy|[https://github.com/apache/flink/blob/b0a9afdd24fb70131b1e80d46d0ca101235a4a36/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java#L134]] > a new SessionContext in the constructor, but SessionContext members do not > always redo equals, so > {code:java} > executionContext.getSessionContext().equals(session) > {code} > is always false. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14668) LocalExecutor#getOrCreateExecutionContext not working as expected
[ https://issues.apache.org/jira/browse/FLINK-14668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangwei updated FLINK-14668: - Description: ExecutionContext [copy|[https://github.com/apache/flink/blob/b0a9afdd24fb70131b1e80d46d0ca101235a4a36/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java#L134]] a new SessionContext in the constructor, but SessionContext members do not always redo equals, so {code:java} executionContext.getSessionContext().equals(session) {code} is always false. was:```ExecutionContext``` [copy](https://github.com/apache/flink/blob/b0a9afdd24fb70131b1e80d46d0ca101235a4a36/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java#L134) a new ```SessionContext``` in the constructor, but ```SessionContext``` members do not always redo ```equals```,so ```executionContext.getSessionContext().equals(session)``` is always false. > LocalExecutor#getOrCreateExecutionContext not working as expected > - > > Key: FLINK-14668 > URL: https://issues.apache.org/jira/browse/FLINK-14668 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.9.0, 1.9.1 >Reporter: zhangwei >Priority: Critical > Fix For: 1.11.0 > > > ExecutionContext > [copy|[https://github.com/apache/flink/blob/b0a9afdd24fb70131b1e80d46d0ca101235a4a36/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java#L134]] > a new SessionContext in the constructor, but SessionContext members do not > always redo equals, so > {code:java} > executionContext.getSessionContext().equals(session) > {code} > is always false. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14668) LocalExecutor#getOrCreateExecutionContext not working as expected
zhangwei created FLINK-14668: Summary: LocalExecutor#getOrCreateExecutionContext not working as expected Key: FLINK-14668 URL: https://issues.apache.org/jira/browse/FLINK-14668 Project: Flink Issue Type: Bug Components: Table SQL / Client Affects Versions: 1.9.1, 1.9.0 Reporter: zhangwei Fix For: 1.11.0 ```ExecutionContext``` [copy](https://github.com/apache/flink/blob/b0a9afdd24fb70131b1e80d46d0ca101235a4a36/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java#L134) a new ```SessionContext``` in the constructor, but ```SessionContext``` members do not always redo ```equals```,so ```executionContext.getSessionContext().equals(session)``` is always false. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-14211) Add jobManager address configuration for SqlClient
[ https://issues.apache.org/jira/browse/FLINK-14211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangwei closed FLINK-14211. Resolution: Incomplete > Add jobManager address configuration for SqlClient > -- > > Key: FLINK-14211 > URL: https://issues.apache.org/jira/browse/FLINK-14211 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Client >Affects Versions: 1.9.0 >Reporter: zhangwei >Priority: Trivial > > Add jobmanager option in deployment configuration, allow SQL clients to > submit jobs to the remote flink cluster > {code:java} > deployment: > # general cluster communication timeout in ms > response-timeout: 5000 > # (optional) address from cluster to gateway > gateway-address: "" > # (optional) port from cluster to gateway > gateway-port: 0 > # (optional) jobmanager address > jobmanager: 127.0.0.1:8081 > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14211) Add jobManager address configuration for SqlClient
[ https://issues.apache.org/jira/browse/FLINK-14211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16945567#comment-16945567 ] zhangwei commented on FLINK-14211: -- sql-client did not add addressOption, so jobmanager should not work in sql-client [https://github.com/apache/flink/blob/a110cec19c5770fadfc460a8ef4d351a3593190a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java#L119-L136] {code:java} public CliFrontend( Configuration configuration, List> customCommandLines) { this.configuration = Preconditions.checkNotNull(configuration); this.customCommandLines = Preconditions.checkNotNull(customCommandLines); FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(configuration)); this.customCommandLineOptions = new Options(); for (CustomCommandLine customCommandLine : customCommandLines) { customCommandLine.addGeneralOptions(customCommandLineOptions); customCommandLine.addRunOptions(customCommandLineOptions); } this.clientTimeout = AkkaUtils.getClientTimeout(this.configuration); this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM); } {code} [https://github.com/apache/flink/blob/a110cec19c5770fadfc460a8ef4d351a3593190a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java#L620-L628] {code:java} private static Options collectCommandLineOptions(List> commandLines) { final Options customOptions = new Options(); for (CustomCommandLine customCommandLine : commandLines) { customCommandLine.addRunOptions(customOptions); } return CliFrontendParser.mergeOptions( CliFrontendParser.getRunCommandOptions(), customOptions); } {code} > Add jobManager address configuration for SqlClient > -- > > Key: FLINK-14211 > URL: https://issues.apache.org/jira/browse/FLINK-14211 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Client >Affects Versions: 1.9.0 >Reporter: zhangwei >Priority: Trivial > > Add jobmanager option in deployment configuration, allow SQL clients to > submit jobs to the remote flink cluster > {code:java} > deployment: > # general cluster communication timeout in ms > response-timeout: 5000 > # (optional) address from cluster to gateway > gateway-address: "" > # (optional) port from cluster to gateway > gateway-port: 0 > # (optional) jobmanager address > jobmanager: 127.0.0.1:8081 > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14211) Add jobManager address configuration for SqlClient
zhangwei created FLINK-14211: Summary: Add jobManager address configuration for SqlClient Key: FLINK-14211 URL: https://issues.apache.org/jira/browse/FLINK-14211 Project: Flink Issue Type: Improvement Components: Table SQL / Client Affects Versions: 1.9.0 Reporter: zhangwei Add jobmanager option in deployment configuration, allow SQL clients to submit jobs to the remote flink cluster {code:java} deployment: # general cluster communication timeout in ms response-timeout: 5000 # (optional) address from cluster to gateway gateway-address: "" # (optional) port from cluster to gateway gateway-port: 0 # (optional) jobmanager address jobmanager: 127.0.0.1:8081 {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)