[jira] [Commented] (FLINK-14053) blink planner dense_rank corner case bug
[ https://issues.apache.org/jira/browse/FLINK-14053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16928289#comment-16928289 ] Jingsong Lee commented on FLINK-14053: -- [~jackylau] Thanks for reporting this bug. Yes, DenseRankAggFunction should be same as RankAggFunction, you can do some abstract to RankLikeAggFunctionBase, these two functions can share some logical. Feel free to submit a PR, I can review it. > blink planner dense_rank corner case bug > > > Key: FLINK-14053 > URL: https://issues.apache.org/jira/browse/FLINK-14053 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.9.0 >Reporter: jackylau >Priority: Major > Fix For: 1.10.0 > > > sql : > val rank = > """ > |SELECT > | gradeId, > | classId, > | stuId, > | score, > | dense_rank() OVER (PARTITION BY gradeId, classId ORDER BY score asc) as > dense_rank_num > |FROM student > | > """.stripMargin > sample date: > row("grade2", "class2", "0006", 90), > row("grade1", "class2", "0007", 90), > row("grade1", "class1", "0001", 95), > row("grade1", "class1", "0002", 94), > row("grade1", "class1", "0003", 97), > row("grade1", "class1", "0004", 95), > row("grade1", "class1", "0005", 0) > the dense_rank ranks from 0, but it should be from 1 > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[GitHub] [flink] flinkbot edited a comment on issue #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…
flinkbot edited a comment on issue #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy… URL: https://github.com/apache/flink/pull/8912#issuecomment-506306353 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 1bd21c6cd2d430b9827b903e80f31c55bcb04750 (Thu Sep 12 07:02:27 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…
zhuzhurk commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy… URL: https://github.com/apache/flink/pull/8912#discussion_r323583514 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java ## @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestartBackoffTimeStrategyOptions; +import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.concurrent.TimeUnit; + +import scala.concurrent.duration.Duration; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A utility class to load {@link RestartBackoffTimeStrategy.Factory} from the configuration. + */ +public class RestartBackoffTimeStrategyFactoryLoader { + + private static final Logger LOG = LoggerFactory.getLogger(RestartBackoffTimeStrategyFactoryLoader.class); + + private static final String CREATE_METHOD = "createFactory"; + + /** +* Creates proper {@link RestartBackoffTimeStrategy.Factory}. +* If new version restart strategy is specified, will directly use it. +* Otherwise will decide based on legacy restart strategy configs. +* +* @param jobRestartStrategyConfiguration restart configuration given within the job graph +* @param clusterConfiguration cluster(server-side) configuration, usually represented as jobmanager config +* @param isCheckpointingEnabled if checkpointing was enabled for the job +* @return new version restart strategy factory +*/ + public static RestartBackoffTimeStrategy.Factory createRestartStrategyFactory( + final RestartStrategies.RestartStrategyConfiguration jobRestartStrategyConfiguration, + final Configuration clusterConfiguration, + final boolean isCheckpointingEnabled) throws Exception { + + checkNotNull(jobRestartStrategyConfiguration); + checkNotNull(clusterConfiguration); + + final String restartStrategyClassName = clusterConfiguration.getString( + RestartBackoffTimeStrategyOptions.RESTART_BACKOFF_TIME_STRATEGY_CLASS_NAME); + + if (restartStrategyClassName != null) { + // create new restart strategy directly if it is specified in cluster config + return createRestartStrategyFactoryInternal(clusterConfiguration); + } else { + // adapt the legacy restart strategy configs as new restart strategy configs + final Configuration adaptedConfiguration = getAdaptedConfiguration( + jobRestartStrategyConfiguration, + clusterConfiguration, + isCheckpointingEnabled); + + // create new restart strategy from the adapted config + return createRestartStrategyFactoryInternal(adaptedConfiguration); + } + } + + /** +* Decides the {@link RestartBackoffTimeStrategy} to use and its params based on legacy configs, +* and records its class name and the params into a adapted configuration. +* +* The decision making is as follows: +* +* Use strategy of {@link RestartStrategies.Re
[GitHub] [flink] zhuzhurk commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…
zhuzhurk commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy… URL: https://github.com/apache/flink/pull/8912#discussion_r323583514 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java ## @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestartBackoffTimeStrategyOptions; +import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.concurrent.TimeUnit; + +import scala.concurrent.duration.Duration; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A utility class to load {@link RestartBackoffTimeStrategy.Factory} from the configuration. + */ +public class RestartBackoffTimeStrategyFactoryLoader { + + private static final Logger LOG = LoggerFactory.getLogger(RestartBackoffTimeStrategyFactoryLoader.class); + + private static final String CREATE_METHOD = "createFactory"; + + /** +* Creates proper {@link RestartBackoffTimeStrategy.Factory}. +* If new version restart strategy is specified, will directly use it. +* Otherwise will decide based on legacy restart strategy configs. +* +* @param jobRestartStrategyConfiguration restart configuration given within the job graph +* @param clusterConfiguration cluster(server-side) configuration, usually represented as jobmanager config +* @param isCheckpointingEnabled if checkpointing was enabled for the job +* @return new version restart strategy factory +*/ + public static RestartBackoffTimeStrategy.Factory createRestartStrategyFactory( + final RestartStrategies.RestartStrategyConfiguration jobRestartStrategyConfiguration, + final Configuration clusterConfiguration, + final boolean isCheckpointingEnabled) throws Exception { + + checkNotNull(jobRestartStrategyConfiguration); + checkNotNull(clusterConfiguration); + + final String restartStrategyClassName = clusterConfiguration.getString( + RestartBackoffTimeStrategyOptions.RESTART_BACKOFF_TIME_STRATEGY_CLASS_NAME); + + if (restartStrategyClassName != null) { + // create new restart strategy directly if it is specified in cluster config + return createRestartStrategyFactoryInternal(clusterConfiguration); + } else { + // adapt the legacy restart strategy configs as new restart strategy configs + final Configuration adaptedConfiguration = getAdaptedConfiguration( + jobRestartStrategyConfiguration, + clusterConfiguration, + isCheckpointingEnabled); + + // create new restart strategy from the adapted config + return createRestartStrategyFactoryInternal(adaptedConfiguration); + } + } + + /** +* Decides the {@link RestartBackoffTimeStrategy} to use and its params based on legacy configs, +* and records its class name and the params into a adapted configuration. +* +* The decision making is as follows: +* +* Use strategy of {@link RestartStrategies.Re
[GitHub] [flink] zhuzhurk commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…
zhuzhurk commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy… URL: https://github.com/apache/flink/pull/8912#discussion_r323583478 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java ## @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestartBackoffTimeStrategyOptions; +import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.concurrent.TimeUnit; + +import scala.concurrent.duration.Duration; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A utility class to load {@link RestartBackoffTimeStrategy.Factory} from the configuration. + */ +public class RestartBackoffTimeStrategyFactoryLoader { + + private static final Logger LOG = LoggerFactory.getLogger(RestartBackoffTimeStrategyFactoryLoader.class); + + private static final String CREATE_METHOD = "createFactory"; + + /** +* Creates proper {@link RestartBackoffTimeStrategy.Factory}. +* If new version restart strategy is specified, will directly use it. +* Otherwise will decide based on legacy restart strategy configs. +* +* @param jobRestartStrategyConfiguration restart configuration given within the job graph +* @param clusterConfiguration cluster(server-side) configuration, usually represented as jobmanager config +* @param isCheckpointingEnabled if checkpointing was enabled for the job +* @return new version restart strategy factory +*/ + public static RestartBackoffTimeStrategy.Factory createRestartStrategyFactory( + final RestartStrategies.RestartStrategyConfiguration jobRestartStrategyConfiguration, + final Configuration clusterConfiguration, + final boolean isCheckpointingEnabled) throws Exception { + + checkNotNull(jobRestartStrategyConfiguration); + checkNotNull(clusterConfiguration); + + final String restartStrategyClassName = clusterConfiguration.getString( + RestartBackoffTimeStrategyOptions.RESTART_BACKOFF_TIME_STRATEGY_CLASS_NAME); + + if (restartStrategyClassName != null) { + // create new restart strategy directly if it is specified in cluster config + return createRestartStrategyFactoryInternal(clusterConfiguration); + } else { + // adapt the legacy restart strategy configs as new restart strategy configs + final Configuration adaptedConfiguration = getAdaptedConfiguration( + jobRestartStrategyConfiguration, + clusterConfiguration, + isCheckpointingEnabled); + + // create new restart strategy from the adapted config + return createRestartStrategyFactoryInternal(adaptedConfiguration); + } + } + + /** +* Decides the {@link RestartBackoffTimeStrategy} to use and its params based on legacy configs, +* and records its class name and the params into a adapted configuration. +* +* The decision making is as follows: +* +* Use strategy of {@link RestartStrategies.Re
[GitHub] [flink] zhuzhurk commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…
zhuzhurk commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy… URL: https://github.com/apache/flink/pull/8912#discussion_r323583682 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java ## @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestartBackoffTimeStrategyOptions; +import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.concurrent.TimeUnit; + +import scala.concurrent.duration.Duration; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A utility class to load {@link RestartBackoffTimeStrategy.Factory} from the configuration. + */ +public class RestartBackoffTimeStrategyFactoryLoader { + + private static final Logger LOG = LoggerFactory.getLogger(RestartBackoffTimeStrategyFactoryLoader.class); + + private static final String CREATE_METHOD = "createFactory"; + + /** +* Creates proper {@link RestartBackoffTimeStrategy.Factory}. +* If new version restart strategy is specified, will directly use it. +* Otherwise will decide based on legacy restart strategy configs. +* +* @param jobRestartStrategyConfiguration restart configuration given within the job graph +* @param clusterConfiguration cluster(server-side) configuration, usually represented as jobmanager config +* @param isCheckpointingEnabled if checkpointing was enabled for the job +* @return new version restart strategy factory +*/ + public static RestartBackoffTimeStrategy.Factory createRestartStrategyFactory( + final RestartStrategies.RestartStrategyConfiguration jobRestartStrategyConfiguration, + final Configuration clusterConfiguration, + final boolean isCheckpointingEnabled) throws Exception { + + checkNotNull(jobRestartStrategyConfiguration); + checkNotNull(clusterConfiguration); + + final String restartStrategyClassName = clusterConfiguration.getString( + RestartBackoffTimeStrategyOptions.RESTART_BACKOFF_TIME_STRATEGY_CLASS_NAME); + + if (restartStrategyClassName != null) { + // create new restart strategy directly if it is specified in cluster config + return createRestartStrategyFactoryInternal(clusterConfiguration); + } else { + // adapt the legacy restart strategy configs as new restart strategy configs + final Configuration adaptedConfiguration = getAdaptedConfiguration( + jobRestartStrategyConfiguration, + clusterConfiguration, + isCheckpointingEnabled); + + // create new restart strategy from the adapted config + return createRestartStrategyFactoryInternal(adaptedConfiguration); + } + } + + /** +* Decides the {@link RestartBackoffTimeStrategy} to use and its params based on legacy configs, +* and records its class name and the params into a adapted configuration. +* +* The decision making is as follows: +* +* Use strategy of {@link RestartStrategies.Re
[GitHub] [flink] GJL commented on a change in pull request #9013: [FLINK-13136] Fix documentation error about stopping job with restful api
GJL commented on a change in pull request #9013: [FLINK-13136] Fix documentation error about stopping job with restful api URL: https://github.com/apache/flink/pull/9013#discussion_r323583397 ## File path: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java ## @@ -375,7 +374,7 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) public void cancel(JobID jobID) throws Exception { JobCancellationMessageParameters params = new JobCancellationMessageParameters(); params.jobPathParameter.resolve(jobID); - params.terminationModeQueryParameter.resolve(Collections.singletonList(TerminationModeQueryParameter.TerminationMode.CANCEL)); + params.terminationModeQueryParameter.resolve(Collections.singletonList("CANCEL")); Review comment: I liked using the enum `TerminationModeQueryParameter.TerminationMode` better. The deprecation of `TerminationModeQueryParameter` and `TerminationMode` was wrong and should be reverted. What should be deprecated is `TerminationMode.STOP` since `TerminationMode.CANCEL` is still valid for non-checkpointed and batch jobs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…
flinkbot edited a comment on issue #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy… URL: https://github.com/apache/flink/pull/8912#issuecomment-506306353 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 1bd21c6cd2d430b9827b903e80f31c55bcb04750 (Thu Sep 12 07:03:28 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…
zhuzhurk commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy… URL: https://github.com/apache/flink/pull/8912#discussion_r323583982 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoaderTest.java ## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestartBackoffTimeStrategyOptions; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsInstanceOf.instanceOf; + +/** + * Unit tests for {@link RestartBackoffTimeStrategyFactoryLoader}. + */ +public class RestartBackoffTimeStrategyFactoryLoaderTest extends TestLogger { + + private static final RestartStrategies.RestartStrategyConfiguration DEFAULT_RESTART_STRATEGY_CONFIGURATION = + new RestartStrategies.FallbackRestartStrategyConfiguration(); + + @Test + public void testNewStrategySpecified() throws Exception { + // specify RestartBackoffTimeStrategy directly in cluster config + final Configuration conf = new Configuration(); + conf.setString( + RestartBackoffTimeStrategyOptions.RESTART_BACKOFF_TIME_STRATEGY_CLASS_NAME, + TestRestartBackoffTimeStrategy.class.getName()); + + // the RestartStrategyConfiguration should not take effect as the loader will + // directly create the factory from the config of the new version strategy + final RestartBackoffTimeStrategy.Factory factory = + RestartBackoffTimeStrategyFactoryLoader.createRestartStrategyFactory( + new RestartStrategies.FailureRateRestartStrategyConfiguration( + 1, + Time.milliseconds(1000), + Time.milliseconds(1000)), + conf, + true); + + assertThat( + factory, + instanceOf(TestRestartBackoffTimeStrategy.TestRestartBackoffTimeStrategyFactory.class)); + } + + @Test + public void testInvalidNewStrategySpecified() throws Exception { + final Configuration conf = new Configuration(); + conf.setString( + RestartBackoffTimeStrategyOptions.RESTART_BACKOFF_TIME_STRATEGY_CLASS_NAME, + InvalidTestRestartBackoffTimeStrategy.class.getName()); + + final RestartBackoffTimeStrategy.Factory factory = + RestartBackoffTimeStrategyFactoryLoader.createRestartStrategyFactory( + DEFAULT_RESTART_STRATEGY_CONFIGURATION, + conf, + true); + + assertThat( + factory, + instanceOf(NoRestartBackoffTimeStrategy.NoRestartBackoffTimeStrategyFactory.class)); + } + + @Test + public void testNoStrategySpecifiedWhenCheckpointingEnabled() throws Exception { + final RestartBackoffTimeStrategy.Factory factory = + RestartBackoffTimeStrategyFactoryLoader.createRestartStrategyFactory( + DEFAULT_RESTART_STRATEGY_CONFIGURATION, + new Configuration(), + true); + + assertThat( + factory, + instanceOf(FixedDelayRestartBackoffTimeStrategy.FixedDelayRestartBackoffTimeStrategyFactory.class)); + } + + @Test + public void t
[GitHub] [flink] GJL commented on a change in pull request #9013: [FLINK-13136] Fix documentation error about stopping job with restful api
GJL commented on a change in pull request #9013: [FLINK-13136] Fix documentation error about stopping job with restful api URL: https://github.com/apache/flink/pull/9013#discussion_r323583397 ## File path: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java ## @@ -375,7 +374,7 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) public void cancel(JobID jobID) throws Exception { JobCancellationMessageParameters params = new JobCancellationMessageParameters(); params.jobPathParameter.resolve(jobID); - params.terminationModeQueryParameter.resolve(Collections.singletonList(TerminationModeQueryParameter.TerminationMode.CANCEL)); + params.terminationModeQueryParameter.resolve(Collections.singletonList("CANCEL")); Review comment: I liked using the enum `TerminationModeQueryParameter.TerminationMode` better because now we have to repeat String constants. The deprecation of `TerminationModeQueryParameter` and `TerminationMode` was wrong and should be reverted. What should be deprecated is `TerminationMode.STOP` since `TerminationMode.CANCEL` is still valid for non-checkpointed and batch jobs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #9013: [FLINK-13136] Fix documentation error about stopping job with restful api
GJL commented on a change in pull request #9013: [FLINK-13136] Fix documentation error about stopping job with restful api URL: https://github.com/apache/flink/pull/9013#discussion_r323583397 ## File path: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java ## @@ -375,7 +374,7 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) public void cancel(JobID jobID) throws Exception { JobCancellationMessageParameters params = new JobCancellationMessageParameters(); params.jobPathParameter.resolve(jobID); - params.terminationModeQueryParameter.resolve(Collections.singletonList(TerminationModeQueryParameter.TerminationMode.CANCEL)); + params.terminationModeQueryParameter.resolve(Collections.singletonList("CANCEL")); Review comment: I liked using the enum `TerminationModeQueryParameter.TerminationMode` better because now we have to repeat String constants. The deprecation of `TerminationModeQueryParameter` and `TerminationMode` was wrong and should be reverted. What should be deprecated is `TerminationMode.STOP` since `TerminationMode.CANCEL` is still valid for non-checkpointed jobs and batch jobs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9013: [FLINK-13136] Fix documentation error about stopping job with restful api
flinkbot edited a comment on issue #9013: [FLINK-13136] Fix documentation error about stopping job with restful api URL: https://github.com/apache/flink/pull/9013#issuecomment-509065631 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 097af583c8040c16da17b796f6e4060c270b5b1d (Thu Sep 12 07:04:29 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…
flinkbot edited a comment on issue #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy… URL: https://github.com/apache/flink/pull/8912#issuecomment-506306353 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 1bd21c6cd2d430b9827b903e80f31c55bcb04750 (Thu Sep 12 07:05:30 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11420) Serialization of case classes containing a Map[String, Any] sometimes throws ArrayIndexOutOfBoundsException
[ https://issues.apache.org/jira/browse/FLINK-11420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16928298#comment-16928298 ] lamber-ken commented on FLINK-11420: (y) > Serialization of case classes containing a Map[String, Any] sometimes throws > ArrayIndexOutOfBoundsException > --- > > Key: FLINK-11420 > URL: https://issues.apache.org/jira/browse/FLINK-11420 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.7.1 >Reporter: Jürgen Kreileder >Assignee: Dawid Wysakowicz >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.3, 1.8.0 > > Time Spent: 2h > Remaining Estimate: 0h > > We frequently run into random ArrayIndexOutOfBounds exceptions when flink > tries to serialize Scala case classes containing a Map[String, Any] (Any > being String, Long, Int, or Boolean) with the FsStateBackend. (This probably > happens with any case class containing a type requiring Kryo, see this thread > for instance: > [http://mail-archives.apache.org/mod_mbox/flink-user/201710.mbox/%3cCANNGFpjX4gjV=df6tlfeojsb_rhwxs_ruoylqcqv2gvwqtt...@mail.gmail.com%3e]) > Disabling asynchronous snapshots seems to work around the problem, so maybe > something is not thread-safe in CaseClassSerializer. > Our objects look like this: > {code} > case class Event(timestamp: Long, [...], content: Map[String, Any] > case class EnrichedEvent(event: Event, additionalInfo: Map[String, Any]) > {code} > I've looked at a few of the exceptions in a debugger. It always happens when > serializing the right-hand side a tuple from EnrichedEvent -> Event -> > content, e.g: 13 from ("foo", 13) or false from ("bar", false). > Stacktrace: > {code:java} > java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 0 > at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157) > at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822) > at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.$anonfun$copy$1(TraversableSerializer.scala:69) > at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:234) > at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:465) > at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:465) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:99) > at > org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:42) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311) > at > org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:95) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:391) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.base/java.lang.Thread.run(Thread.java:834){code} > > > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Assigned] (FLINK-14053) blink planner dense_rank corner case bug
[ https://issues.apache.org/jira/browse/FLINK-14053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-14053: --- Assignee: jackylau > blink planner dense_rank corner case bug > > > Key: FLINK-14053 > URL: https://issues.apache.org/jira/browse/FLINK-14053 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.9.0 >Reporter: jackylau >Assignee: jackylau >Priority: Major > Fix For: 1.10.0 > > > sql : > val rank = > """ > |SELECT > | gradeId, > | classId, > | stuId, > | score, > | dense_rank() OVER (PARTITION BY gradeId, classId ORDER BY score asc) as > dense_rank_num > |FROM student > | > """.stripMargin > sample date: > row("grade2", "class2", "0006", 90), > row("grade1", "class2", "0007", 90), > row("grade1", "class1", "0001", 95), > row("grade1", "class1", "0002", 94), > row("grade1", "class1", "0003", 97), > row("grade1", "class1", "0004", 95), > row("grade1", "class1", "0005", 0) > the dense_rank ranks from 0, but it should be from 1 > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[GitHub] [flink] libenchao closed pull request #9650: [hotfix][blink-planner] Fix return type for FROM_BASE64 builtin function
libenchao closed pull request #9650: [hotfix][blink-planner] Fix return type for FROM_BASE64 builtin function URL: https://github.com/apache/flink/pull/9650 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9650: [hotfix][blink-planner] Fix return type for FROM_BASE64 builtin function
flinkbot edited a comment on issue #9650: [hotfix][blink-planner] Fix return type for FROM_BASE64 builtin function URL: https://github.com/apache/flink/pull/9650#issuecomment-529428588 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 699536e117e058414474792baed09edffc1094fb (Thu Sep 12 07:25:53 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r323591961 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/FailingExecutionVertexOperationsDecorator.java ## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; + +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Allows to fail ExecutionVertex operations for testing. + */ +public class FailingExecutionVertexOperationsDecorator implements ExecutionVertexOperations { + + private final ExecutionVertexOperations delegate; + + private boolean failDeploy; + + private boolean failCancel; + + public FailingExecutionVertexOperationsDecorator(final ExecutionVertexOperations delegate) { + this.delegate = checkNotNull(delegate); + } + + @Override + public void deploy(final ExecutionVertex executionVertex, final DeploymentOption deploymentOption) throws JobException { + if (failDeploy) { + throw new RuntimeException("Expected"); + } else { + delegate.deploy(executionVertex, deploymentOption); + } + } + + @Override + public CompletableFuture cancel(final ExecutionVertex executionVertex) { + if (failCancel) { + return FutureUtils.completedExceptionally(new RuntimeException("Expected")); + } else { + return delegate.cancel(executionVertex); + } + } + + public void enableFailDeploy() { + failDeploy = true; + } + + public void disableFailDeploy() { + failDeploy = false; + } + + public void enableFailCancel() { + failCancel = true; + } + + public void disableFailCancel() { + failCancel = false; + } Review comment: I don't understand. Do you want to declare `failCancel` `final` or set the default value to `false`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
flinkbot edited a comment on issue #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#issuecomment-529928149 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 166b8777fcc896f3f6ef7008133af35dbb554204 (Thu Sep 12 07:27:56 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r323593378 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ## @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy; +import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker; +import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy; +import org.apache.flink.runtime.shuffle.NettyShuffleMaster; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; + +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link DefaultScheduler}. + */ +public class DefaultSchedulerTest extends TestLogger { + + private static final int TIMEOUT_MS = 1000; + + private static final JobID TEST_JOB_ID = new JobID(); + + private ManuallyTriggeredScheduledExecutor taskRestartExecutor = new ManuallyTriggeredScheduledExecutor(); + + private ExecutorService executor; + + private ScheduledExecutorService scheduledExecutorService; + + private Configuration configuration; + + private SubmissionTrackingTaskManagerGateway testTaskManagerGateway; + + private TestRestartBackoffTimeStrategy testRestartBackoffTimeStrategy; + + private FailingExecutionVertexOperationsDecorator testExecutionVertexOperations; + + private SimpleSlotProvider slotProvider; + + private ExecutionVertexVersioner executionVertexVersioner; +
[GitHub] [flink] flinkbot edited a comment on issue #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
flinkbot edited a comment on issue #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#issuecomment-529928149 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 166b8777fcc896f3f6ef7008133af35dbb554204 (Thu Sep 12 07:32:02 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9478: [FLINK-13766][task] Refactor the implementation of StreamInputProcessor based on StreamTaskInput#emitNext
flinkbot edited a comment on issue #9478: [FLINK-13766][task] Refactor the implementation of StreamInputProcessor based on StreamTaskInput#emitNext URL: https://github.com/apache/flink/pull/9478#issuecomment-522374042 ## CI report: * 1530600eaf36324966f343c277437e48c2416dc2 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/123653758) * 8d08aaa1fedf7338ef3de48991e841f9baf7018b : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/123700944) * 9791697c559fa39cd9f7a1c574420d62a1193545 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124210361) * fbd14e2320371a3cd6615deee0498d477644b954 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124297960) * 04f75585056f083e57aa036d5fccf14286c073ee : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124687473) * cc68c22f6cba85de6ef7babad5a42b9bf187e59f : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124691082) * 6a276b78132bf3de6691a073134865e9c7632fcf : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124692424) * 84806d71cfe3ba5d2f0c1fc8136dcd1fd64251cf : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124693853) * b74f4e456ccad744aedff59da951bfc099ce13c8 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124700117) * f84935334f191b73b5766550743de3193ccb1ada : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124701767) * 61e0d65b905bc86dda95b8d81e9163177434adce : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124706860) * 9cdfb712682d5f07c0249bebc873ebd5879d1ec1 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124712485) * e1ab467f585b289eb02c6bcf206b6f73d71d1c51 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124714601) * 2da198414e1421f855d035741207c0d00aa946f3 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124718316) * 4aba3ff64ff2a4f2518150bc7d4ca37f42adf1fb : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124726416) * 53fbb0b3e1b094ef82b8f3192146aeb4a915dda7 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124845339) * eec2a79613ec30b174e2e133c4e3844c5e27f443 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124857651) * 27f1c1d0224a43754beee2e6d4637de8d942db16 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/125408360) * 2225da63d943df06eacceb57c7d77de7aa90dfc1 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/125527049) * d99301f4c6704710c479647a14d170080cea79b2 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/125545740) * 52ec180a141ebc5d26519f8bc3ac4892ad054f8a : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/125550692) * 96d5c224e6a3258a84b9e943a93bb12dff93253f : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/126558705) * a3e09500022ee31f5f1fb8c406a8f939a2d5f87c : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/126559774) * eed3ad980b2a46c563d28f2158404f29b0d2018b : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/126565945) * 3eed308317f40f75f919e4085e51b404eee41abe : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/127126432) * 2bcc989a0c7a57e70df40aee2e2289b8cf34dac0 : UNKNOWN This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…
zhuzhurk commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy… URL: https://github.com/apache/flink/pull/8912#discussion_r323595466 ## File path: flink-core/src/main/java/org/apache/flink/configuration/RestartBackoffTimeStrategyOptions.java ## @@ -46,7 +57,7 @@ .withDescription("Time interval in milliseconds for measuring failure rate."); /** -* Backoff time between two consecutive restart attempts in FailureRateRestartBackoffTimeStrategy. +* Backoff time (milli-seconds) between two consecutive restart attempts in FailureRateRestartBackoffTimeStrategy. */ @PublicEvolving public static final ConfigOption RESTART_BACKOFF_TIME_STRATEGY_FAILURE_RATE_FAILURE_RATE_BACKOFF_TIME = ConfigOptions Review comment: Good question. Previously the configs for strategy strategies are quite a mess in ConfigConstants and I wanted to deprecate them. But with FLIP 61 the configs are better organized in RestartStrategyOptions.java. I think now we can use the configs from RestartStrategyOptions.java directly. And I'm also thinking to rename RestartBackoffTimeStrategy to RestartStrategy once we have removed the legacy RestartStrategy. I will open a hotfix commit to migrate these options and their usages in related strategy. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…
zhuzhurk commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy… URL: https://github.com/apache/flink/pull/8912#discussion_r323595466 ## File path: flink-core/src/main/java/org/apache/flink/configuration/RestartBackoffTimeStrategyOptions.java ## @@ -46,7 +57,7 @@ .withDescription("Time interval in milliseconds for measuring failure rate."); /** -* Backoff time between two consecutive restart attempts in FailureRateRestartBackoffTimeStrategy. +* Backoff time (milli-seconds) between two consecutive restart attempts in FailureRateRestartBackoffTimeStrategy. */ @PublicEvolving public static final ConfigOption RESTART_BACKOFF_TIME_STRATEGY_FAILURE_RATE_FAILURE_RATE_BACKOFF_TIME = ConfigOptions Review comment: Good question. Previously the configs for strategy strategies are quite a mess in ConfigConstants and I wanted to deprecate them. But with FLIP 61 the configs are better organized in RestartStrategyOptions.java. I think now we can use the configs from RestartStrategyOptions.java directly. I will open a hotfix commit to migrate these options and their usages in related strategy. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…
flinkbot edited a comment on issue #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy… URL: https://github.com/apache/flink/pull/8912#issuecomment-506306353 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 1bd21c6cd2d430b9827b903e80f31c55bcb04750 (Thu Sep 12 07:38:08 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9675: [FLINK-13953] [runtime] Facilitate enabling new scheduler in MiniCluster Tests
flinkbot edited a comment on issue #9675: [FLINK-13953] [runtime] Facilitate enabling new scheduler in MiniCluster Tests URL: https://github.com/apache/flink/pull/9675#issuecomment-530686777 ## CI report: * d15f9751632d4c7897f68a0f8829d5facbdfe14e : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/127138398) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-13417) Bump Zookeeper to 3.5.5
[ https://issues.apache.org/jira/browse/FLINK-13417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16928316#comment-16928316 ] TisonKun commented on FLINK-13417: -- [~StephanEwen] I figured out that it is because since 3.5.3 ZooKeeper default disable four letter words which HBase uses for waiting for server up. We can set the property in Flink test scope to enable four letter words and workaround this issue. Have sent the fix and still digging whether there are other issues. references: [1] https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_4lw [2] https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_clusterOptions (look up {{zookeeper.4lw.commands.whitelist}}) > Bump Zookeeper to 3.5.5 > --- > > Key: FLINK-13417 > URL: https://issues.apache.org/jira/browse/FLINK-13417 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.9.0 >Reporter: Konstantin Knauf >Priority: Blocker > Fix For: 1.10.0 > > > User might want to secure their Zookeeper connection via SSL. > This requires a Zookeeper version >= 3.5.1. We might as well try to bump it > to 3.5.5, which is the latest version. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Comment Edited] (FLINK-13417) Bump Zookeeper to 3.5.5
[ https://issues.apache.org/jira/browse/FLINK-13417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16928316#comment-16928316 ] TisonKun edited comment on FLINK-13417 at 9/12/19 7:43 AM: --- [~StephanEwen] I figured out that it is because since 3.5.3 ZooKeeper default disable four letter words which HBase uses for waiting for server up. We can set the property in Flink test scope to enable four letter words and workaround this issue. Have patched the fix in private branch and still digging whether there are other issues. references: [1] https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_4lw [2] https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_clusterOptions (look up {{zookeeper.4lw.commands.whitelist}}) was (Author: tison): [~StephanEwen] I figured out that it is because since 3.5.3 ZooKeeper default disable four letter words which HBase uses for waiting for server up. We can set the property in Flink test scope to enable four letter words and workaround this issue. Have sent the fix and still digging whether there are other issues. references: [1] https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_4lw [2] https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_clusterOptions (look up {{zookeeper.4lw.commands.whitelist}}) > Bump Zookeeper to 3.5.5 > --- > > Key: FLINK-13417 > URL: https://issues.apache.org/jira/browse/FLINK-13417 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.9.0 >Reporter: Konstantin Knauf >Priority: Blocker > Fix For: 1.10.0 > > > User might want to secure their Zookeeper connection via SSL. > This requires a Zookeeper version >= 3.5.1. We might as well try to bump it > to 3.5.5, which is the latest version. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[GitHub] [flink] zhuzhurk commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…
zhuzhurk commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy… URL: https://github.com/apache/flink/pull/8912#discussion_r323598464 ## File path: flink-core/src/main/java/org/apache/flink/configuration/RestartBackoffTimeStrategyOptions.java ## @@ -25,6 +25,16 @@ */ @PublicEvolving public class RestartBackoffTimeStrategyOptions { + + /** +* Class name of the RestartBackoffTimeStrategy implementation to use. +*/ + @PublicEvolving + public static final ConfigOption RESTART_BACKOFF_TIME_STRATEGY_CLASS_NAME = ConfigOptions + .key("restart-backoff-time-strategy.class-name") Review comment: Yes it is a breaking change if a user was using a custom RestartStrategy. I will open a discussion to 1. notify users that previous custom RestartStrategy does not work in the new scheduler anymore 2. discuss whether we should keep supporting custom RestartBackoffTimeStrategy for existing custom RestartStrategy migration or future use This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…
flinkbot edited a comment on issue #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy… URL: https://github.com/apache/flink/pull/8912#issuecomment-506306353 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 1bd21c6cd2d430b9827b903e80f31c55bcb04750 (Thu Sep 12 07:46:16 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…
GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy… URL: https://github.com/apache/flink/pull/8912#discussion_r323195327 ## File path: flink-core/src/main/java/org/apache/flink/configuration/RestartBackoffTimeStrategyOptions.java ## @@ -25,6 +25,16 @@ */ @PublicEvolving public class RestartBackoffTimeStrategyOptions { + + /** +* Class name of the RestartBackoffTimeStrategy implementation to use. +*/ + @PublicEvolving + public static final ConfigOption RESTART_BACKOFF_TIME_STRATEGY_CLASS_NAME = ConfigOptions + .key("restart-backoff-time-strategy.class-name") Review comment: With the introduction of the new scheduler, users will not be able to use custom implementations of the `RestartStrategy` interface. Strictly speaking this is a breaking change. However, it is not clear to me if there are any Flink users at all that require a custom `RestartStrategy`. I think it makes sense to start a discussion on the dev mailing list. Until it is clear that users require this level of customization, I think we should not provide users a way to write their own back off time strategy. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…
zhuzhurk commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy… URL: https://github.com/apache/flink/pull/8912#discussion_r323599815 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java ## @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestartBackoffTimeStrategyOptions; +import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.concurrent.TimeUnit; + +import scala.concurrent.duration.Duration; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A utility class to load {@link RestartBackoffTimeStrategy.Factory} from the configuration. + */ +public class RestartBackoffTimeStrategyFactoryLoader { + + private static final Logger LOG = LoggerFactory.getLogger(RestartBackoffTimeStrategyFactoryLoader.class); + + private static final String CREATE_METHOD = "createFactory"; + + /** +* Creates proper {@link RestartBackoffTimeStrategy.Factory}. +* If new version restart strategy is specified, will directly use it. +* Otherwise will decide based on legacy restart strategy configs. +* +* @param jobRestartStrategyConfiguration restart configuration given within the job graph +* @param clusterConfiguration cluster(server-side) configuration, usually represented as jobmanager config +* @param isCheckpointingEnabled if checkpointing was enabled for the job +* @return new version restart strategy factory +*/ + public static RestartBackoffTimeStrategy.Factory createRestartStrategyFactory( + final RestartStrategies.RestartStrategyConfiguration jobRestartStrategyConfiguration, + final Configuration clusterConfiguration, + final boolean isCheckpointingEnabled) throws Exception { + + checkNotNull(jobRestartStrategyConfiguration); + checkNotNull(clusterConfiguration); + + final String restartStrategyClassName = clusterConfiguration.getString( + RestartBackoffTimeStrategyOptions.RESTART_BACKOFF_TIME_STRATEGY_CLASS_NAME); + + if (restartStrategyClassName != null) { + // create new restart strategy directly if it is specified in cluster config + return createRestartStrategyFactoryInternal(clusterConfiguration); + } else { + // adapt the legacy restart strategy configs as new restart strategy configs + final Configuration adaptedConfiguration = getAdaptedConfiguration( + jobRestartStrategyConfiguration, + clusterConfiguration, + isCheckpointingEnabled); + + // create new restart strategy from the adapted config + return createRestartStrategyFactoryInternal(adaptedConfiguration); + } + } + + /** +* Decides the {@link RestartBackoffTimeStrategy} to use and its params based on legacy configs, +* and records its class name and the params into a adapted configuration. +* +* The decision making is as follows: +* +* Use strategy of {@link RestartStrategies.Re
[GitHub] [flink] flinkbot edited a comment on issue #9478: [FLINK-13766][task] Refactor the implementation of StreamInputProcessor based on StreamTaskInput#emitNext
flinkbot edited a comment on issue #9478: [FLINK-13766][task] Refactor the implementation of StreamInputProcessor based on StreamTaskInput#emitNext URL: https://github.com/apache/flink/pull/9478#issuecomment-522374042 ## CI report: * 1530600eaf36324966f343c277437e48c2416dc2 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/123653758) * 8d08aaa1fedf7338ef3de48991e841f9baf7018b : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/123700944) * 9791697c559fa39cd9f7a1c574420d62a1193545 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124210361) * fbd14e2320371a3cd6615deee0498d477644b954 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124297960) * 04f75585056f083e57aa036d5fccf14286c073ee : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124687473) * cc68c22f6cba85de6ef7babad5a42b9bf187e59f : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124691082) * 6a276b78132bf3de6691a073134865e9c7632fcf : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124692424) * 84806d71cfe3ba5d2f0c1fc8136dcd1fd64251cf : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124693853) * b74f4e456ccad744aedff59da951bfc099ce13c8 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124700117) * f84935334f191b73b5766550743de3193ccb1ada : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124701767) * 61e0d65b905bc86dda95b8d81e9163177434adce : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124706860) * 9cdfb712682d5f07c0249bebc873ebd5879d1ec1 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124712485) * e1ab467f585b289eb02c6bcf206b6f73d71d1c51 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124714601) * 2da198414e1421f855d035741207c0d00aa946f3 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124718316) * 4aba3ff64ff2a4f2518150bc7d4ca37f42adf1fb : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124726416) * 53fbb0b3e1b094ef82b8f3192146aeb4a915dda7 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124845339) * eec2a79613ec30b174e2e133c4e3844c5e27f443 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124857651) * 27f1c1d0224a43754beee2e6d4637de8d942db16 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/125408360) * 2225da63d943df06eacceb57c7d77de7aa90dfc1 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/125527049) * d99301f4c6704710c479647a14d170080cea79b2 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/125545740) * 52ec180a141ebc5d26519f8bc3ac4892ad054f8a : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/125550692) * 96d5c224e6a3258a84b9e943a93bb12dff93253f : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/126558705) * a3e09500022ee31f5f1fb8c406a8f939a2d5f87c : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/126559774) * eed3ad980b2a46c563d28f2158404f29b0d2018b : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/126565945) * 3eed308317f40f75f919e4085e51b404eee41abe : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/127126432) * 2bcc989a0c7a57e70df40aee2e2289b8cf34dac0 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/127144350) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…
flinkbot edited a comment on issue #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy… URL: https://github.com/apache/flink/pull/8912#issuecomment-506306353 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 1bd21c6cd2d430b9827b903e80f31c55bcb04750 (Thu Sep 12 07:49:19 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…
GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy… URL: https://github.com/apache/flink/pull/8912#discussion_r323600583 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java ## @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestartBackoffTimeStrategyOptions; +import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.concurrent.TimeUnit; + +import scala.concurrent.duration.Duration; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A utility class to load {@link RestartBackoffTimeStrategy.Factory} from the configuration. + */ +public class RestartBackoffTimeStrategyFactoryLoader { + + private static final Logger LOG = LoggerFactory.getLogger(RestartBackoffTimeStrategyFactoryLoader.class); + + private static final String CREATE_METHOD = "createFactory"; + + /** +* Creates proper {@link RestartBackoffTimeStrategy.Factory}. +* If new version restart strategy is specified, will directly use it. +* Otherwise will decide based on legacy restart strategy configs. +* +* @param jobRestartStrategyConfiguration restart configuration given within the job graph +* @param clusterConfiguration cluster(server-side) configuration, usually represented as jobmanager config +* @param isCheckpointingEnabled if checkpointing was enabled for the job +* @return new version restart strategy factory +*/ + public static RestartBackoffTimeStrategy.Factory createRestartStrategyFactory( + final RestartStrategies.RestartStrategyConfiguration jobRestartStrategyConfiguration, + final Configuration clusterConfiguration, + final boolean isCheckpointingEnabled) throws Exception { + + checkNotNull(jobRestartStrategyConfiguration); + checkNotNull(clusterConfiguration); + + final String restartStrategyClassName = clusterConfiguration.getString( + RestartBackoffTimeStrategyOptions.RESTART_BACKOFF_TIME_STRATEGY_CLASS_NAME); + + if (restartStrategyClassName != null) { + // create new restart strategy directly if it is specified in cluster config + return createRestartStrategyFactoryInternal(clusterConfiguration); + } else { + // adapt the legacy restart strategy configs as new restart strategy configs + final Configuration adaptedConfiguration = getAdaptedConfiguration( + jobRestartStrategyConfiguration, + clusterConfiguration, + isCheckpointingEnabled); + + // create new restart strategy from the adapted config + return createRestartStrategyFactoryInternal(adaptedConfiguration); + } + } + + /** +* Decides the {@link RestartBackoffTimeStrategy} to use and its params based on legacy configs, +* and records its class name and the params into a adapted configuration. +* +* The decision making is as follows: +* +* Use strategy of {@link RestartStrategies.Restart
[GitHub] [flink] flinkbot edited a comment on issue #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…
flinkbot edited a comment on issue #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy… URL: https://github.com/apache/flink/pull/8912#issuecomment-506306353 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 1bd21c6cd2d430b9827b903e80f31c55bcb04750 (Thu Sep 12 07:51:21 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…
GJL commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy… URL: https://github.com/apache/flink/pull/8912#discussion_r323601246 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java ## @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestartBackoffTimeStrategyOptions; +import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.concurrent.TimeUnit; + +import scala.concurrent.duration.Duration; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A utility class to load {@link RestartBackoffTimeStrategy.Factory} from the configuration. + */ +public class RestartBackoffTimeStrategyFactoryLoader { Review comment: Since there are static members only, I'd make this class final with a private constructor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…
flinkbot edited a comment on issue #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy… URL: https://github.com/apache/flink/pull/8912#issuecomment-506306353 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 1bd21c6cd2d430b9827b903e80f31c55bcb04750 (Thu Sep 12 07:53:23 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10333) Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, CompletedCheckpoints)
[ https://issues.apache.org/jira/browse/FLINK-10333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16928342#comment-16928342 ] TisonKun commented on FLINK-10333: -- I revoke the statement {{LeaderServer}} is a prerequisite for new high-availability services. As we discussed in the mailing list, we should narrow the intention per step. Let's recur the big picture under this thread. We'd like to introduce a mechanism to ensure that - commit new state in ZooKeeper only if the contender is leader and we choose a transaction store implementation for ZooKeeper scenario. I will break down the implementation steps as below First, re-implement {{ZooKeeperLeaderElectionService}} as described in the design document. All interfaces are compatible except we possibly change the layout of znodes(let's defer this discussion until a dedicated subtask created). Second and further, we separately replace access points to ZooKeeper(abstractly, high-availability storage) such as JobGraphStore, CheckpointStore and so on with new leader election services which can return a transactional store. If you agree this approach, I will create the first subtask and describe detailedly what we do and what we gain. > Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, > CompletedCheckpoints) > - > > Key: FLINK-10333 > URL: https://issues.apache.org/jira/browse/FLINK-10333 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Priority: Major > Attachments: screenshot-1.png > > > While going over the ZooKeeper based stores > ({{ZooKeeperSubmittedJobGraphStore}}, {{ZooKeeperMesosWorkerStore}}, > {{ZooKeeperCompletedCheckpointStore}}) and the underlying > {{ZooKeeperStateHandleStore}} I noticed several inconsistencies which were > introduced with past incremental changes. > * Depending whether {{ZooKeeperStateHandleStore#getAllSortedByNameAndLock}} > or {{ZooKeeperStateHandleStore#getAllAndLock}} is called, deserialization > problems will either lead to removing the Znode or not > * {{ZooKeeperStateHandleStore}} leaves inconsistent state in case of > exceptions (e.g. {{#getAllAndLock}} won't release the acquired locks in case > of a failure) > * {{ZooKeeperStateHandleStore}} has too many responsibilities. It would be > better to move {{RetrievableStateStorageHelper}} out of it for a better > separation of concerns > * {{ZooKeeperSubmittedJobGraphStore}} overwrites a stored {{JobGraph}} even > if it is locked. This should not happen since it could leave another system > in an inconsistent state (imagine a changed {{JobGraph}} which restores from > an old checkpoint) > * Redundant but also somewhat inconsistent put logic in the different stores > * Shadowing of ZooKeeper specific exceptions in {{ZooKeeperStateHandleStore}} > which were expected to be caught in {{ZooKeeperSubmittedJobGraphStore}} > * Getting rid of the {{SubmittedJobGraphListener}} would be helpful > These problems made me think how reliable these components actually work. > Since these components are very important, I propose to refactor them. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[GitHub] [flink] GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r323608610 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -258,6 +260,8 @@ private SchedulingTopology schedulingTopology; + private TaskFailureListener taskFailureListener = null; Review comment: Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r323608753 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -75,10 +137,281 @@ public DefaultScheduler( slotRequestTimeout, shuffleMaster, partitionTracker); + + this.restartBackoffTimeStrategy = restartBackoffTimeStrategy; + this.slotRequestTimeout = slotRequestTimeout; + this.slotProvider = slotProvider; + this.delayExecutor = delayExecutor; + this.userCodeLoader = userCodeLoader; + this.schedulingStrategyFactory = checkNotNull(schedulingStrategyFactory); + this.failoverStrategyFactory = checkNotNull(failoverStrategyFactory); + this.executionVertexOperations = checkNotNull(executionVertexOperations); + this.executionVertexVersioner = executionVertexVersioner; + this.conditionalFutureHandlerFactory = new ConditionalFutureHandlerFactory(executionVertexVersioner); Review comment: Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r323608957 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/FailingExecutionVertexOperationsDecorator.java ## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; + +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Allows to fail ExecutionVertex operations for testing. + */ +public class FailingExecutionVertexOperationsDecorator implements ExecutionVertexOperations { Review comment: Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r323608993 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ## @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy; +import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker; +import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy; +import org.apache.flink.runtime.shuffle.NettyShuffleMaster; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; + +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link DefaultScheduler}. + */ +public class DefaultSchedulerTest extends TestLogger { + + private static final int TIMEOUT_MS = 1000; + + private static final JobID TEST_JOB_ID = new JobID(); + + private ManuallyTriggeredScheduledExecutor taskRestartExecutor = new ManuallyTriggeredScheduledExecutor(); + + private ExecutorService executor; + + private ScheduledExecutorService scheduledExecutorService; + + private Configuration configuration; + + private SubmissionTrackingTaskManagerGateway testTaskManagerGateway; + + private TestRestartBackoffTimeStrategy testRestartBackoffTimeStrategy; + + private FailingExecutionVertexOperationsDecorator testExecutionVertexOperations; + + private SimpleSlotProvider slotProvider; + + private ExecutionVertexVersioner executionVertexVersioner; +
[GitHub] [flink] flinkbot edited a comment on issue #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
flinkbot edited a comment on issue #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#issuecomment-529928149 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit ce99124c9c341b7717bbd26088ab4b5ef5b032db (Thu Sep 12 08:11:42 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r323609192 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -845,6 +853,10 @@ else if (current == CREATED || current == SCHEDULED) { } private void scheduleConsumer(ExecutionVertex consumerVertex) { + if (!vertex.isLegacyScheduling()) { Review comment: Fixed. But in static methods, we cannot use `isLegacyScheduling()`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r323609649 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java ## @@ -71,7 +71,7 @@ this.slotNumber = slotNumber; this.allocationId = Preconditions.checkNotNull(allocationId); this.slotRequestId = Preconditions.checkNotNull(slotRequestId); - this.slotSharingGroupId = Preconditions.checkNotNull(slotSharingGroupId); + this.slotSharingGroupId = slotSharingGroupId; Review comment: Also `slotOwner` is missing it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-14066) bug of building pyflink in master and 1.9.0 version
Xu Yang created FLINK-14066: --- Summary: bug of building pyflink in master and 1.9.0 version Key: FLINK-14066 URL: https://issues.apache.org/jira/browse/FLINK-14066 Project: Flink Issue Type: Bug Components: Build System Affects Versions: 1.9.0, 1.10.0 Environment: windows 10 enterprise x64 powershell x64 flink source master and 1.9.0 version jdk-8u202 maven-3.2.5 Reporter: Xu Yang Attachments: setup.py During we build pyflink... After we have built flink from flink source code, a folder named "target" is generated. Then, following the document description, "cd flink-python; python3 setup.py sdist bdist_wheel", error happens. Root cause: in the setup.py file, line 75, "FLINK_HOME = os.path.abspath("../build-target")", the program can't found folder "build-target", however, the building of flink generated a folder named "target". So error happens in this way... The right way: in ../flink-python/setup.py line 75, modify code as following: FLINK_HOME = os.path.abspath("../target") -- This message was sent by Atlassian Jira (v8.3.2#803003)
[GitHub] [flink] flinkbot edited a comment on issue #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
flinkbot edited a comment on issue #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#issuecomment-529928149 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit ce99124c9c341b7717bbd26088ab4b5ef5b032db (Thu Sep 12 08:13:44 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r323609649 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java ## @@ -71,7 +71,7 @@ this.slotNumber = slotNumber; this.allocationId = Preconditions.checkNotNull(allocationId); this.slotRequestId = Preconditions.checkNotNull(slotRequestId); - this.slotSharingGroupId = Preconditions.checkNotNull(slotSharingGroupId); + this.slotSharingGroupId = slotSharingGroupId; Review comment: ~~Also `slotOwner` is missing it~~ Edit: `slotOwner` should have null check This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r323611275 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java ## @@ -71,7 +71,7 @@ this.slotNumber = slotNumber; this.allocationId = Preconditions.checkNotNull(allocationId); this.slotRequestId = Preconditions.checkNotNull(slotRequestId); - this.slotSharingGroupId = Preconditions.checkNotNull(slotSharingGroupId); + this.slotSharingGroupId = slotSharingGroupId; Review comment: Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
flinkbot edited a comment on issue #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#issuecomment-529928149 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 9a6afcb65af3c4c59381e77fa93b2df73d2f2216 (Thu Sep 12 08:17:49 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r323613449 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -0,0 +1,644 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.queryablestate.KvStateID; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; +import org.apache.flink.runtime.blob.BlobWriter; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder; +import org.apache.flink.runtime.executiongraph.ExecutionGraphException; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.executiongraph.JobStatusListener; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving; +import org.apache.flink.runtime.io.network.partition.PartitionTracker; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; +import org.apache.flink.runtime.jobmaster.SerializedInputSplit; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.query.KvStateLocation; +import org.apache.flink.runtime.query.KvStateLocationRegistry; +import org.apache.flink.runtime.query.UnknownKvStateLocation; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats; +import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.function.FunctionUtils; + +import org
[GitHub] [flink] GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r323613495 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -0,0 +1,644 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.queryablestate.KvStateID; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; +import org.apache.flink.runtime.blob.BlobWriter; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder; +import org.apache.flink.runtime.executiongraph.ExecutionGraphException; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.executiongraph.JobStatusListener; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving; +import org.apache.flink.runtime.io.network.partition.PartitionTracker; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; +import org.apache.flink.runtime.jobmaster.SerializedInputSplit; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.query.KvStateLocation; +import org.apache.flink.runtime.query.KvStateLocationRegistry; +import org.apache.flink.runtime.query.UnknownKvStateLocation; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats; +import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.function.FunctionUtils; + +import org
[GitHub] [flink] flinkbot edited a comment on issue #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
flinkbot edited a comment on issue #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#issuecomment-529930381 ## CI report: * 5bfec29d38218b1bd5236163a7f2dd2571afa8b2 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/126616886) * 166b8777fcc896f3f6ef7008133af35dbb554204 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/126619729) * 9a6afcb65af3c4c59381e77fa93b2df73d2f2216 : UNKNOWN This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
flinkbot edited a comment on issue #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#issuecomment-529928149 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 09b61ef86b135960b2d21c6cf8d5f510684137ad (Thu Sep 12 08:23:56 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on issue #9672: [FLINK-14047][rest] Let JobConfigHandler replace sensitive values from user configuration
tillrohrmann commented on issue #9672: [FLINK-14047][rest] Let JobConfigHandler replace sensitive values from user configuration URL: https://github.com/apache/flink/pull/9672#issuecomment-530723361 I would treat this as a separate/follow-up issue @jiasheng55. At the moment people can circumvent the problem by prepending `password` or `secret` before the key name and then cut it off again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9672: [FLINK-14047][rest] Let JobConfigHandler replace sensitive values from user configuration
flinkbot edited a comment on issue #9672: [FLINK-14047][rest] Let JobConfigHandler replace sensitive values from user configuration URL: https://github.com/apache/flink/pull/9672#issuecomment-530352346 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit f2186517eb748a6e637104983c4c978f9f3804b4 (Thu Sep 12 08:32:06 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-14054) Enable checkpointing via job configuration
[ https://issues.apache.org/jira/browse/FLINK-14054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-14054: -- Component/s: Runtime / Checkpointing > Enable checkpointing via job configuration > -- > > Key: FLINK-14054 > URL: https://issues.apache.org/jira/browse/FLINK-14054 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / Configuration >Reporter: Jun Qin >Priority: Major > > Currently enabling checkpointing can only be done via the job code, see the > following quote from this Flink > [checkpointing|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing] > doc: > {quote}By default, checkpointing is disabled. To enable checkpointing, call > {{enableCheckpointing(n)}} on the {{StreamExecutionEnvironment}}, where _n_ > is the checkpoint interval in milliseconds. > {quote} > This makes enabling checkingpointing after the job code has been released > difficult: one has to change and rebuild the job code. > In addition, not only for developer, making checkpointing enabling > configurable is also of interest for operation teams: > * They may want to enable checkpointing for production but disable in test > (e.g., to save storage space) > * They may want to try out with and without checkpointing to evaluate the > impact to the job behaviour and performance. > Therefore, this request. Thanks. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-14066) bug of building pyflink in master and 1.9.0 version
[ https://issues.apache.org/jira/browse/FLINK-14066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16928369#comment-16928369 ] Dian Fu commented on FLINK-14066: - Hi [~coldmoon777], currently PyFlink is still not supported on Windows. So I'm afraid that there may be also other issues beside this one, i.e. the corresponding window scripts for pyflink-gateway-server.sh is needed to run on windows (There is an ticket FLINK-12717 for this). > bug of building pyflink in master and 1.9.0 version > --- > > Key: FLINK-14066 > URL: https://issues.apache.org/jira/browse/FLINK-14066 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.9.0, 1.10.0 > Environment: windows 10 enterprise x64 > powershell x64 > flink source master and 1.9.0 version > jdk-8u202 > maven-3.2.5 >Reporter: Xu Yang >Priority: Blocker > Labels: beginner, build > Attachments: setup.py > > Original Estimate: 1h > Remaining Estimate: 1h > > During we build pyflink... > After we have built flink from flink source code, a folder named "target" is > generated. > Then, following the document description, "cd flink-python; python3 setup.py > sdist bdist_wheel", error happens. > Root cause: in the setup.py file, line 75, "FLINK_HOME = > os.path.abspath("../build-target")", the program can't found folder > "build-target", however, the building of flink generated a folder named > "target". So error happens in this way... > > The right way: > in ../flink-python/setup.py line 75, modify code as following: > FLINK_HOME = os.path.abspath("../target") -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-14030) Nonequivalent conversion happens in Table planner
[ https://issues.apache.org/jira/browse/FLINK-14030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16928370#comment-16928370 ] Leonard Xu commented on FLINK-14030: The root cause is that Calcite's simplification logic of isNull expression will convert from *"f(operand0, operand1) IS NULL"* to *"operand0 IS NULL OR operand1 IS NULL"* when the *Policy* of RexNode‘s *__ SqlKind* is *ANY* 。 {code:java} //org.apache.calcite.rex.RexSimplify.java private RexNode simplifyIsNull(RexNode a) { // Simplify the argument first, // call ourselves recursively to see whether we can make more progress. // For example, given // "(CASE WHEN FALSE THEN 1 ELSE 2) IS NULL" we first simplify the // argument to "2", and only then we can simplify "2 IS NULL" to "FALSE". a = simplify(a, UNKNOWN); if (!a.getType().isNullable() && isSafeExpression(a)) { return rexBuilder.makeLiteral(false); } if (RexUtil.isNull(a)) { return rexBuilder.makeLiteral(true); } if (a.getKind() == SqlKind.CAST) { return null; } switch (Strong.policy(a.getKind())) { case NOT_NULL: return rexBuilder.makeLiteral(false); case ANY: // "f" is a strong operator, so "f(operand0, operand1) IS NULL" simplifies // to "operand0 IS NULL OR operand1 IS NULL" final List operands = new ArrayList<>(); for (RexNode operand : ((RexCall) a).getOperands()) { final RexNode simplified = simplifyIsNull(operand); if (simplified == null) { operands.add( rexBuilder.makeCall(SqlStdOperatorTable.IS_NULL, operand)); } else { operands.add(simplified); } } return RexUtil.composeDisjunction(rexBuilder, operands, false); case AS_IS: default: return null; } }{code} Unfortunately, most of calculating SqlKinds are assigned *Policy.ANY* at present. {code:java} //org.apache.calcite.plan.Strong.java public static Policy policy(SqlKind kind) { return MAP.getOrDefault(kind, Policy.AS_IS); } map.put(SqlKind.PLUS, Policy.ANY); map.put(SqlKind.PLUS_PREFIX, Policy.ANY); map.put(SqlKind.MINUS, Policy.ANY); map.put(SqlKind.MINUS_PREFIX, Policy.ANY); map.put(SqlKind.TIMES, Policy.ANY); map.put(SqlKind.DIVIDE, Policy.ANY); * that operator evaluates to null. */ public enum Policy { /** This kind of expression is never null. No need to look at its arguments, * if it has any. */ NOT_NULL, /** This kind of expression has its own particular rules about whether it * is null. */ CUSTOM, /** This kind of expression is null if and only if at least one of its * arguments is null. */ ANY, /** This kind of expression may be null. There is no way to rewrite. */ AS_IS, }{code} Both Flink SQL and Flink table API will call this simplification logic. It seems difficult to fix this issue elegantly since this simplification is treated as normal behavior in Calcite. Maybe we should find a better way to fix this later. Do you have any suggestion? [~lzljs3620320] [~godfreyhe] thanks. > Nonequivalent conversion happens in Table planner > -- > > Key: FLINK-14030 > URL: https://issues.apache.org/jira/browse/FLINK-14030 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.9.0 >Reporter: Leonard Xu >Priority: Critical > Fix For: 1.9.1 > > > *testAllApis()* unit tests will run fail because planner make a conversion > from *[ifThenElse(isNull(plus(f0, f1)), 'null', 'not null')]* > to *[CASE(OR(IS NULL($0), IS NULL($1)), _UTF-16LE'null', _UTF-16LE'not > null')]* > which is not a equivalence conversion. The result of expression 'f0 + 'f1 > should be null > when the result overflows even if its two operands both are not null. > It's easy to reproduce as following: > testAllApis( > 'f0 + 'f1, > "f1 + f1", > "f1 + f1", > "null")// the result should be null because overflow > override def testData: Row = > { val testData = new Row(2) testData.setField(0, > BigDecimal("1e10").bigDecimal) testData.setField(1, > BigDecimal("0").bigDecimal) testData } > override def typeInfo: RowTypeInfo = > { new RowTypeInfo( /* 0 */ fromLogicalTypeToTypeInfo(DECIMAL(38, 10)), /* 1 > */ fromLogicalTypeToTypeInfo(DECIMAL(38, 28)) ) } > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[GitHub] [flink] flinkbot edited a comment on issue #9478: [FLINK-13766][task] Refactor the implementation of StreamInputProcessor based on StreamTaskInput#emitNext
flinkbot edited a comment on issue #9478: [FLINK-13766][task] Refactor the implementation of StreamInputProcessor based on StreamTaskInput#emitNext URL: https://github.com/apache/flink/pull/9478#issuecomment-522374042 ## CI report: * 1530600eaf36324966f343c277437e48c2416dc2 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/123653758) * 8d08aaa1fedf7338ef3de48991e841f9baf7018b : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/123700944) * 9791697c559fa39cd9f7a1c574420d62a1193545 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124210361) * fbd14e2320371a3cd6615deee0498d477644b954 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124297960) * 04f75585056f083e57aa036d5fccf14286c073ee : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124687473) * cc68c22f6cba85de6ef7babad5a42b9bf187e59f : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124691082) * 6a276b78132bf3de6691a073134865e9c7632fcf : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124692424) * 84806d71cfe3ba5d2f0c1fc8136dcd1fd64251cf : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124693853) * b74f4e456ccad744aedff59da951bfc099ce13c8 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124700117) * f84935334f191b73b5766550743de3193ccb1ada : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124701767) * 61e0d65b905bc86dda95b8d81e9163177434adce : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124706860) * 9cdfb712682d5f07c0249bebc873ebd5879d1ec1 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124712485) * e1ab467f585b289eb02c6bcf206b6f73d71d1c51 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124714601) * 2da198414e1421f855d035741207c0d00aa946f3 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124718316) * 4aba3ff64ff2a4f2518150bc7d4ca37f42adf1fb : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124726416) * 53fbb0b3e1b094ef82b8f3192146aeb4a915dda7 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124845339) * eec2a79613ec30b174e2e133c4e3844c5e27f443 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124857651) * 27f1c1d0224a43754beee2e6d4637de8d942db16 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/125408360) * 2225da63d943df06eacceb57c7d77de7aa90dfc1 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/125527049) * d99301f4c6704710c479647a14d170080cea79b2 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/125545740) * 52ec180a141ebc5d26519f8bc3ac4892ad054f8a : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/125550692) * 96d5c224e6a3258a84b9e943a93bb12dff93253f : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/126558705) * a3e09500022ee31f5f1fb8c406a8f939a2d5f87c : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/126559774) * eed3ad980b2a46c563d28f2158404f29b0d2018b : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/126565945) * 3eed308317f40f75f919e4085e51b404eee41abe : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/127126432) * 2bcc989a0c7a57e70df40aee2e2289b8cf34dac0 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/127144350) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-14030) Nonequivalent conversion happens in Table planner
[ https://issues.apache.org/jira/browse/FLINK-14030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16928370#comment-16928370 ] Leonard Xu edited comment on FLINK-14030 at 9/12/19 8:37 AM: - The root cause is that Calcite's simplification logic of isNull expression will convert from *"f(operand0, operand1) IS NULL"* to *"operand0 IS NULL OR operand1 IS NULL"* when the *Policy* of RexNode‘s *SqlKind* is *ANY* 。 {code:java} //org.apache.calcite.rex.RexSimplify.java private RexNode simplifyIsNull(RexNode a) { // Simplify the argument first, // call ourselves recursively to see whether we can make more progress. // For example, given // "(CASE WHEN FALSE THEN 1 ELSE 2) IS NULL" we first simplify the // argument to "2", and only then we can simplify "2 IS NULL" to "FALSE". a = simplify(a, UNKNOWN); if (!a.getType().isNullable() && isSafeExpression(a)) { return rexBuilder.makeLiteral(false); } if (RexUtil.isNull(a)) { return rexBuilder.makeLiteral(true); } if (a.getKind() == SqlKind.CAST) { return null; } switch (Strong.policy(a.getKind())) { case NOT_NULL: return rexBuilder.makeLiteral(false); case ANY: // "f" is a strong operator, so "f(operand0, operand1) IS NULL" simplifies // to "operand0 IS NULL OR operand1 IS NULL" final List operands = new ArrayList<>(); for (RexNode operand : ((RexCall) a).getOperands()) { final RexNode simplified = simplifyIsNull(operand); if (simplified == null) { operands.add( rexBuilder.makeCall(SqlStdOperatorTable.IS_NULL, operand)); } else { operands.add(simplified); } } return RexUtil.composeDisjunction(rexBuilder, operands, false); case AS_IS: default: return null; } }{code} Unfortunately, most of calculating SqlKinds are assigned *Policy.ANY* at present. {code:java} //org.apache.calcite.plan.Strong.java public static Policy policy(SqlKind kind) { return MAP.getOrDefault(kind, Policy.AS_IS); } map.put(SqlKind.PLUS, Policy.ANY); map.put(SqlKind.PLUS_PREFIX, Policy.ANY); map.put(SqlKind.MINUS, Policy.ANY); map.put(SqlKind.MINUS_PREFIX, Policy.ANY); map.put(SqlKind.TIMES, Policy.ANY); map.put(SqlKind.DIVIDE, Policy.ANY); * that operator evaluates to null. */ public enum Policy { /** This kind of expression is never null. No need to look at its arguments, * if it has any. */ NOT_NULL, /** This kind of expression has its own particular rules about whether it * is null. */ CUSTOM, /** This kind of expression is null if and only if at least one of its * arguments is null. */ ANY, /** This kind of expression may be null. There is no way to rewrite. */ AS_IS, }{code} Both Flink SQL and Flink table API will call this simplification logic. It seems difficult to fix this issue elegantly since this simplification is treated as normal behavior in Calcite. Maybe we should find a better way to fix this later. Do you have any suggestion? [~lzljs3620320] [~godfreyhe] thanks. was (Author: leonard xu): The root cause is that Calcite's simplification logic of isNull expression will convert from *"f(operand0, operand1) IS NULL"* to *"operand0 IS NULL OR operand1 IS NULL"* when the *Policy* of RexNode‘s *__ SqlKind* is *ANY* 。 {code:java} //org.apache.calcite.rex.RexSimplify.java private RexNode simplifyIsNull(RexNode a) { // Simplify the argument first, // call ourselves recursively to see whether we can make more progress. // For example, given // "(CASE WHEN FALSE THEN 1 ELSE 2) IS NULL" we first simplify the // argument to "2", and only then we can simplify "2 IS NULL" to "FALSE". a = simplify(a, UNKNOWN); if (!a.getType().isNullable() && isSafeExpression(a)) { return rexBuilder.makeLiteral(false); } if (RexUtil.isNull(a)) { return rexBuilder.makeLiteral(true); } if (a.getKind() == SqlKind.CAST) { return null; } switch (Strong.policy(a.getKind())) { case NOT_NULL: return rexBuilder.makeLiteral(false); case ANY: // "f" is a strong operator, so "f(operand0, operand1) IS NULL" simplifies // to "operand0 IS NULL OR operand1 IS NULL" final List operands = new ArrayList<>(); for (RexNode operand : ((RexCall) a).getOperands()) { final RexNode simplified = simplifyIsNull(operand); if (simplified == null) { operands.add( rexBuilder.makeCall(SqlStdOperatorTable.IS_NULL, operand)); } else { operands.add(simplified); } } return RexUtil.composeDisjunction(rexBuilder, operands, false); case AS_IS: default: return null; } }{code} Unfortunately, most of calculating SqlKinds are assigned *Policy.ANY* at present. {code:java} //org.apache.calcite.plan.Strong.java public static Policy policy(SqlKind kind) { return MAP.getOrDefault(kind, Policy.AS_IS); } map.put(SqlKind.PLUS, Policy.ANY); map.put(SqlKind.PLUS_PREFIX, Policy.ANY); map.put(SqlKind.MINUS, Policy.ANY); map.put(SqlKind.MINUS_PREFIX, Policy.ANY); map.put(SqlKind.TIM
[jira] [Commented] (FLINK-10333) Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, CompletedCheckpoints)
[ https://issues.apache.org/jira/browse/FLINK-10333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16928373#comment-16928373 ] Till Rohrmann commented on FLINK-10333: --- Sounds good to me. One last clarification, the plan is to add completely new {{HighAvailabilityServices}} implementation without touching the existing implementations, right? > Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, > CompletedCheckpoints) > - > > Key: FLINK-10333 > URL: https://issues.apache.org/jira/browse/FLINK-10333 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Priority: Major > Attachments: screenshot-1.png > > > While going over the ZooKeeper based stores > ({{ZooKeeperSubmittedJobGraphStore}}, {{ZooKeeperMesosWorkerStore}}, > {{ZooKeeperCompletedCheckpointStore}}) and the underlying > {{ZooKeeperStateHandleStore}} I noticed several inconsistencies which were > introduced with past incremental changes. > * Depending whether {{ZooKeeperStateHandleStore#getAllSortedByNameAndLock}} > or {{ZooKeeperStateHandleStore#getAllAndLock}} is called, deserialization > problems will either lead to removing the Znode or not > * {{ZooKeeperStateHandleStore}} leaves inconsistent state in case of > exceptions (e.g. {{#getAllAndLock}} won't release the acquired locks in case > of a failure) > * {{ZooKeeperStateHandleStore}} has too many responsibilities. It would be > better to move {{RetrievableStateStorageHelper}} out of it for a better > separation of concerns > * {{ZooKeeperSubmittedJobGraphStore}} overwrites a stored {{JobGraph}} even > if it is locked. This should not happen since it could leave another system > in an inconsistent state (imagine a changed {{JobGraph}} which restores from > an old checkpoint) > * Redundant but also somewhat inconsistent put logic in the different stores > * Shadowing of ZooKeeper specific exceptions in {{ZooKeeperStateHandleStore}} > which were expected to be caught in {{ZooKeeperSubmittedJobGraphStore}} > * Getting rid of the {{SubmittedJobGraphListener}} would be helpful > These problems made me think how reliable these components actually work. > Since these components are very important, I propose to refactor them. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[GitHub] [flink] flinkbot edited a comment on issue #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
flinkbot edited a comment on issue #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#issuecomment-529930381 ## CI report: * 5bfec29d38218b1bd5236163a7f2dd2571afa8b2 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/126616886) * 166b8777fcc896f3f6ef7008133af35dbb554204 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/126619729) * 9a6afcb65af3c4c59381e77fa93b2df73d2f2216 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/127150468) * 09b61ef86b135960b2d21c6cf8d5f510684137ad : UNKNOWN This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on issue #9672: [FLINK-14047][rest] Let JobConfigHandler replace sensitive values from user configuration
tillrohrmann commented on issue #9672: [FLINK-14047][rest] Let JobConfigHandler replace sensitive values from user configuration URL: https://github.com/apache/flink/pull/9672#issuecomment-530729132 Merging this PR now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann closed pull request #9672: [FLINK-14047][rest] Let JobConfigHandler replace sensitive values from user configuration
tillrohrmann closed pull request #9672: [FLINK-14047][rest] Let JobConfigHandler replace sensitive values from user configuration URL: https://github.com/apache/flink/pull/9672 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9672: [FLINK-14047][rest] Let JobConfigHandler replace sensitive values from user configuration
flinkbot edited a comment on issue #9672: [FLINK-14047][rest] Let JobConfigHandler replace sensitive values from user configuration URL: https://github.com/apache/flink/pull/9672#issuecomment-530352346 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit f2186517eb748a6e637104983c4c978f9f3804b4 (Thu Sep 12 08:48:23 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-14047) Hide secret values when displaying user configuration/global job parameters in web UI
[ https://issues.apache.org/jira/browse/FLINK-14047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-14047. - Fix Version/s: 1.10.0 Resolution: Implemented Implemented via e6db3ebd24a9a6cf9b5cec66d11d78a9ca030966 7b862d3e690099bd0a845431622e0a5f505ab162 dd31f8327b8670cd4855f1d4b454acb99d68f27d > Hide secret values when displaying user configuration/global job parameters > in web UI > - > > Key: FLINK-14047 > URL: https://issues.apache.org/jira/browse/FLINK-14047 > Project: Flink > Issue Type: New Feature > Components: Runtime / REST, Runtime / Web Frontend >Affects Versions: 1.8.1, 1.9.0, 1.10.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 20m > Remaining Estimate: 0h > > A user requested the feature to hide secret values when displaying the user > configuration/global job parameters in the web UI. Ideally, there is a way to > denote keys which contain secret values so that the {{JobConfigHandler}} > excludes them from the response. > For the cluster level configuration Flink supports a similar functionality. > The {{ClusterConfigHandler}} replaces the values of all keys which contain > one of the strings specified in {{GlobalConfiguration.SENSITIVE_KEYS}}, which > is currently defined as {{["password", "secret"]}}, with > {{GlobalConfiguration.HIDDEN_CONTENT}}. That way Flink hides the sensitive > information. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[GitHub] [flink] flinkbot edited a comment on issue #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
flinkbot edited a comment on issue #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#issuecomment-529928149 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 9441f984cf179d8dc9212ffc59aea4b5ef922350 (Thu Sep 12 08:51:26 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r323627300 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -1727,6 +1779,10 @@ void notifyExecutionChange( final ExecutionState newExecutionState, final Throwable error) { + if (!isLegacyScheduling()) { + return; + } Review comment: Yes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
flinkbot edited a comment on issue #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#issuecomment-529928149 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 9441f984cf179d8dc9212ffc59aea4b5ef922350 (Thu Sep 12 08:54:28 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
flinkbot edited a comment on issue #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#issuecomment-529930381 ## CI report: * 5bfec29d38218b1bd5236163a7f2dd2571afa8b2 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/126616886) * 166b8777fcc896f3f6ef7008133af35dbb554204 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/126619729) * 9a6afcb65af3c4c59381e77fa93b2df73d2f2216 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/127150468) * 09b61ef86b135960b2d21c6cf8d5f510684137ad : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/127152805) * 9441f984cf179d8dc9212ffc59aea4b5ef922350 : UNKNOWN This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #9013: [FLINK-13136] Fix documentation error about stopping job with restful api
yanghua commented on a change in pull request #9013: [FLINK-13136] Fix documentation error about stopping job with restful api URL: https://github.com/apache/flink/pull/9013#discussion_r323628359 ## File path: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java ## @@ -375,7 +374,7 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) public void cancel(JobID jobID) throws Exception { JobCancellationMessageParameters params = new JobCancellationMessageParameters(); params.jobPathParameter.resolve(jobID); - params.terminationModeQueryParameter.resolve(Collections.singletonList(TerminationModeQueryParameter.TerminationMode.CANCEL)); + params.terminationModeQueryParameter.resolve(Collections.singletonList("CANCEL")); Review comment: Originally, I also think enum is a good choice than string. However, when I removed `TerminationMode.STOP`, there is only one enum value `CANCEL `. I suspect that multiple modes may be the reason why we define `TerminationMode` and I also saw the deprecation annotation, so I replaced enum with String value. Whatever, I am not against keeping `TerminationMode`. Will refactor the change soon. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9013: [FLINK-13136] Fix documentation error about stopping job with restful api
flinkbot edited a comment on issue #9013: [FLINK-13136] Fix documentation error about stopping job with restful api URL: https://github.com/apache/flink/pull/9013#issuecomment-509065631 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 097af583c8040c16da17b796f6e4060c270b5b1d (Thu Sep 12 08:57:30 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10333) Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, CompletedCheckpoints)
[ https://issues.apache.org/jira/browse/FLINK-10333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16928384#comment-16928384 ] TisonKun commented on FLINK-10333: -- Yes that's it. For implementation details it is an alternative that we add two new methods - {{void removeLeaderInfo()}} - {{LeaderStore getLeaderStore()}} onto {{LeaderElectionService}} interface and simply adjust existing implementations to implement the methods but since the leader store hasn't been into use we can even defer the changes at interface level to next step. Briefly, we *can* do without touching the existing implementations. Let's move more details into subtask :- ) > Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, > CompletedCheckpoints) > - > > Key: FLINK-10333 > URL: https://issues.apache.org/jira/browse/FLINK-10333 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Priority: Major > Attachments: screenshot-1.png > > > While going over the ZooKeeper based stores > ({{ZooKeeperSubmittedJobGraphStore}}, {{ZooKeeperMesosWorkerStore}}, > {{ZooKeeperCompletedCheckpointStore}}) and the underlying > {{ZooKeeperStateHandleStore}} I noticed several inconsistencies which were > introduced with past incremental changes. > * Depending whether {{ZooKeeperStateHandleStore#getAllSortedByNameAndLock}} > or {{ZooKeeperStateHandleStore#getAllAndLock}} is called, deserialization > problems will either lead to removing the Znode or not > * {{ZooKeeperStateHandleStore}} leaves inconsistent state in case of > exceptions (e.g. {{#getAllAndLock}} won't release the acquired locks in case > of a failure) > * {{ZooKeeperStateHandleStore}} has too many responsibilities. It would be > better to move {{RetrievableStateStorageHelper}} out of it for a better > separation of concerns > * {{ZooKeeperSubmittedJobGraphStore}} overwrites a stored {{JobGraph}} even > if it is locked. This should not happen since it could leave another system > in an inconsistent state (imagine a changed {{JobGraph}} which restores from > an old checkpoint) > * Redundant but also somewhat inconsistent put logic in the different stores > * Shadowing of ZooKeeper specific exceptions in {{ZooKeeperStateHandleStore}} > which were expected to be caught in {{ZooKeeperSubmittedJobGraphStore}} > * Getting rid of the {{SubmittedJobGraphListener}} would be helpful > These problems made me think how reliable these components actually work. > Since these components are very important, I propose to refactor them. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-14057) Add Remove Other Timers to TimerService
[ https://issues.apache.org/jira/browse/FLINK-14057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16928385#comment-16928385 ] Aljoscha Krettek commented on FLINK-14057: -- It would probably be better to add a separate method for dropping timers instead of overloading that functionality on "add timer". > Add Remove Other Timers to TimerService > --- > > Key: FLINK-14057 > URL: https://issues.apache.org/jira/browse/FLINK-14057 > Project: Flink > Issue Type: Improvement >Reporter: Jesse Anderson >Priority: Major > > The TimerService service has the ability to add timers with > registerProcessingTimeTimer. This method can be called many times and have > different timer times. > If you want to add a new timer and delete other timers, you have to keep > track of all previous timer times and call deleteProcessingTimeTimer for each > time. This method forces you to keep track of all previous (unexpired) timers > for a key. > Instead, I suggest overloading registerProcessingTimeTimer with a second > boolean argument that will remove all previous timers and set the new timer. > Note: although I'm using registerProcessingTimeTimer, this applies to > registerEventTimeTimer as well. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[GitHub] [flink] GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
GJL commented on a change in pull request #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#discussion_r323631332 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionVertexOperations.java ## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; + +import java.util.concurrent.CompletableFuture; + +class DefaultExecutionVertexOperations implements ExecutionVertexOperations { + + @Override + public void deploy(final ExecutionVertex executionVertex, final DeploymentOption deploymentOption) throws JobException { + executionVertex.setSendScheduleOrUpdateConsumerMessage(deploymentOption.sendScheduleOrUpdateConsumerMessage()); Review comment: You are right, that's a bug. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
flinkbot edited a comment on issue #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#issuecomment-529928149 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 9441f984cf179d8dc9212ffc59aea4b5ef922350 (Thu Sep 12 09:03:38 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on issue #9667: [FLINK-13619] Update FlinkKafkaProducerMigrationOperatorTest to restore from 1.9 savepoint
yanghua commented on issue #9667: [FLINK-13619] Update FlinkKafkaProducerMigrationOperatorTest to restore from 1.9 savepoint URL: https://github.com/apache/flink/pull/9667#issuecomment-530736092 cc @tillrohrmann This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9667: [FLINK-13619] Update FlinkKafkaProducerMigrationOperatorTest to restore from 1.9 savepoint
flinkbot edited a comment on issue #9667: [FLINK-13619] Update FlinkKafkaProducerMigrationOperatorTest to restore from 1.9 savepoint URL: https://github.com/apache/flink/pull/9667#issuecomment-529947799 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit d56a6ff167bccb8797766ee6419c3516c1bc37c5 (Thu Sep 12 09:06:42 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-14067) Decouple PlanExecutor from JSON plan generation
Aljoscha Krettek created FLINK-14067: Summary: Decouple PlanExecutor from JSON plan generation Key: FLINK-14067 URL: https://issues.apache.org/jira/browse/FLINK-14067 Project: Flink Issue Type: Sub-task Components: API / DataSet, Client / Job Submission Reporter: Aljoscha Krettek Assignee: Aljoscha Krettek {{PlanExecutor}} has a method {{getOptimizerPlanAsJSON()}} that is used by DataSet environments to get a JSON version of the execution plan. To ease future work and to make it more maintainable we should get rid of that method and instead have a dedicated utility for generating JSON plans that the environments can use. (The only reason this method is on the executor is because only {{flink-clients}} via {{flink-optimizer}} has the required components to derive a JSON plan.) -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-11767) Update TypeSerializerSnapshotMigrationTestBase and subclasses for 1.8
[ https://issues.apache.org/jira/browse/FLINK-11767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16928392#comment-16928392 ] Till Rohrmann commented on FLINK-11767: --- Any progress on this issue [~tzulitai]? > Update TypeSerializerSnapshotMigrationTestBase and subclasses for 1.8 > - > > Key: FLINK-11767 > URL: https://issues.apache.org/jira/browse/FLINK-11767 > Project: Flink > Issue Type: Sub-task > Components: API / Type Serialization System, Tests >Affects Versions: 1.8.0 >Reporter: vinoyang >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Update {{TypeSerializerSnapshotMigrationTestBase}} and subclasses to cover > restoring from Flink 1.8. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-14068) Use Java's Duration instead of Flink's Time
TisonKun created FLINK-14068: Summary: Use Java's Duration instead of Flink's Time Key: FLINK-14068 URL: https://issues.apache.org/jira/browse/FLINK-14068 Project: Flink Issue Type: Sub-task Components: API / DataStream, Runtime / Configuration, Runtime / Coordination Reporter: TisonKun Fix For: 2.0.0 As discussion in mailing list [here|https://lists.apache.org/x/thread.html/90ad2f1d7856cfe5bdc8f7dd678c626be96eeaeeb736e98f31660039@%3Cdev.flink.apache.org%3E] the community reaches a consensus that we will use Java's Duration for representing "time interval" instead of use Flink's Time for it. Specifically, Flink has two {{Time}} classes, which are {{org.apache.flink.api.common.time.Time}} {{org.apache.flink.streaming.api.windowing.time.Time}} the latter has been already deprecated and superseded by the former. Now we want to also deprecated the format and drop it in 2.0.0(we don't drop it just now because it is part of {{@Public}} interfaces). -- This message was sent by Atlassian Jira (v8.3.2#803003)
[GitHub] [flink] flinkbot edited a comment on issue #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
flinkbot edited a comment on issue #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#issuecomment-529930381 ## CI report: * 5bfec29d38218b1bd5236163a7f2dd2571afa8b2 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/126616886) * 166b8777fcc896f3f6ef7008133af35dbb554204 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/126619729) * 9a6afcb65af3c4c59381e77fa93b2df73d2f2216 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/127150468) * 09b61ef86b135960b2d21c6cf8d5f510684137ad : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/127152805) * 9441f984cf179d8dc9212ffc59aea4b5ef922350 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/127155093) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann closed pull request #9667: [FLINK-13619] Update FlinkKafkaProducerMigrationOperatorTest to restore from 1.9 savepoint
tillrohrmann closed pull request #9667: [FLINK-13619] Update FlinkKafkaProducerMigrationOperatorTest to restore from 1.9 savepoint URL: https://github.com/apache/flink/pull/9667 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-13619) Update FlinkKafkaProducerMigrationOperatorTest to restore from 1.9 savepoint
[ https://issues.apache.org/jira/browse/FLINK-13619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-13619. - Resolution: Done Done via 82ed1a64d1d1e5cde9ab0999804e93d38c9f9149 > Update FlinkKafkaProducerMigrationOperatorTest to restore from 1.9 savepoint > > > Key: FLINK-13619 > URL: https://issues.apache.org/jira/browse/FLINK-13619 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka, Tests >Affects Versions: 1.10.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Update {{FlinkKafkaProducerMigrationOperatorTest}} to restore from 1.9 > savepoint. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[GitHub] [flink] flinkbot edited a comment on issue #9667: [FLINK-13619] Update FlinkKafkaProducerMigrationOperatorTest to restore from 1.9 savepoint
flinkbot edited a comment on issue #9667: [FLINK-13619] Update FlinkKafkaProducerMigrationOperatorTest to restore from 1.9 savepoint URL: https://github.com/apache/flink/pull/9667#issuecomment-529947799 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit d56a6ff167bccb8797766ee6419c3516c1bc37c5 (Thu Sep 12 09:27:03 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong opened a new pull request #9676: [FLINK-13981][runtime] Introduce config switch for enabling the new FLIP-49 task executor memory configurations.
xintongsong opened a new pull request #9676: [FLINK-13981][runtime] Introduce config switch for enabling the new FLIP-49 task executor memory configurations. URL: https://github.com/apache/flink/pull/9676 ## What is the purpose of the change This PR introduce a config switch for enabling FLIP-49 task executor memory configurations. This is a temporal config option for developing purpose. ## Brief change log - Introduce "taskmanager.enable-flip-49" for enabling FLIP-49 task executor memory configurations. ## Verifying this change This change is a trivial work without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13981) Introduce a switch for enabling the new task executor memory configurations
[ https://issues.apache.org/jira/browse/FLINK-13981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-13981: --- Labels: pull-request-available (was: ) > Introduce a switch for enabling the new task executor memory configurations > --- > > Key: FLINK-13981 > URL: https://issues.apache.org/jira/browse/FLINK-13981 > Project: Flink > Issue Type: Sub-task >Reporter: Xintong Song >Assignee: Xintong Song >Priority: Major > Labels: pull-request-available > > Introduce a temporal config option as a switch between the current / new task > executor memory configuration code paths. This allows us to implement and > test the new code paths without affect the existing code paths and behaviors. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[GitHub] [flink] flinkbot commented on issue #9676: [FLINK-13981][runtime] Introduce config switch for enabling the new FLIP-49 task executor memory configurations.
flinkbot commented on issue #9676: [FLINK-13981][runtime] Introduce config switch for enabling the new FLIP-49 task executor memory configurations. URL: https://github.com/apache/flink/pull/9676#issuecomment-530745271 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 43f4f046581a22a59e5fef74e535a2abdfe27eb1 (Thu Sep 12 09:30:26 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9483: [FLINK-13767][task] Migrate isFinished method from AvailabilityListener to AsyncDataInput
flinkbot edited a comment on issue #9483: [FLINK-13767][task] Migrate isFinished method from AvailabilityListener to AsyncDataInput URL: https://github.com/apache/flink/pull/9483#issuecomment-522597807 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit ed9f2ccf2b47da38109dcd6c9ec72e67482bcd7e (Thu Sep 12 09:47:28 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #9676: [FLINK-13981][runtime] Introduce config switch for enabling the new FLIP-49 task executor memory configurations.
flinkbot commented on issue #9676: [FLINK-13981][runtime] Introduce config switch for enabling the new FLIP-49 task executor memory configurations. URL: https://github.com/apache/flink/pull/9676#issuecomment-530751409 ## CI report: * 43f4f046581a22a59e5fef74e535a2abdfe27eb1 : UNKNOWN This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9483: [FLINK-13767][task] Refactor StreamInputProcessor#processInput based on InputStatus
flinkbot edited a comment on issue #9483: [FLINK-13767][task] Refactor StreamInputProcessor#processInput based on InputStatus URL: https://github.com/apache/flink/pull/9483#issuecomment-522602263 ## CI report: * 5d079442de5815edf896b85fa7aa3ca599975bb0 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/123734022) * 725be7c2226608382c67d5b3d372886be737fff6 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124870650) * eeb6f54f2316e5a6ef8f1cbed4f002c3ae37107e : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/12569) * e8d22a8c88d9c4a3d10fe989761c54e2d56ba3bf : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/125557289) * 9b8109a8586bcf8095eba8d24c6cd0dbb21d7810 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/125559250) * ac519615c6a2e28bb6544d2dce2dee43a03f3928 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/125831290) * ed9f2ccf2b47da38109dcd6c9ec72e67482bcd7e : UNKNOWN This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub
flinkbot edited a comment on issue #9663: [WIP][FLINK-12433][runtime] Implement DefaultScheduler stub URL: https://github.com/apache/flink/pull/9663#issuecomment-529930381 ## CI report: * 5bfec29d38218b1bd5236163a7f2dd2571afa8b2 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/126616886) * 166b8777fcc896f3f6ef7008133af35dbb554204 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/126619729) * 9a6afcb65af3c4c59381e77fa93b2df73d2f2216 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/127150468) * 09b61ef86b135960b2d21c6cf8d5f510684137ad : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/127152805) * 9441f984cf179d8dc9212ffc59aea4b5ef922350 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/127155093) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9676: [FLINK-13981][runtime] Introduce config switch for enabling the new FLIP-49 task executor memory configurations.
flinkbot edited a comment on issue #9676: [FLINK-13981][runtime] Introduce config switch for enabling the new FLIP-49 task executor memory configurations. URL: https://github.com/apache/flink/pull/9676#issuecomment-530751409 ## CI report: * 43f4f046581a22a59e5fef74e535a2abdfe27eb1 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/127162251) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9483: [FLINK-13767][task] Refactor StreamInputProcessor#processInput based on InputStatus
zhijiangW commented on a change in pull request #9483: [FLINK-13767][task] Refactor StreamInputProcessor#processInput based on InputStatus URL: https://github.com/apache/flink/pull/9483#discussion_r323659128 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java ## @@ -121,18 +115,46 @@ public StreamTwoInputProcessor( taskManagerConfig, taskName); checkState(checkpointedInputGates.length == 2); - this.input1 = new StreamTaskNetworkInput(checkpointedInputGates[0], inputSerializer1, ioManager, 0); - this.input2 = new StreamTaskNetworkInput(checkpointedInputGates[1], inputSerializer2, ioManager, 1); - this.statusWatermarkValve1 = new StatusWatermarkValve( - unionedInputGate1.getNumberOfInputChannels(), - new ForwardingValveOutputHandler(streamOperator, lock, streamStatusMaintainer, input1WatermarkGauge, 0)); - this.statusWatermarkValve2 = new StatusWatermarkValve( - unionedInputGate2.getNumberOfInputChannels(), - new ForwardingValveOutputHandler(streamOperator, lock, streamStatusMaintainer, input2WatermarkGauge, 1)); + this.output1 = new StreamTaskNetworkOutput<>( + streamOperator, + (StreamRecord record) -> { Review comment: Some missing explanations: my previous version took the function way for all the elements processing including record, watermark, latencyMarker. But @pnowojski did not suggest that way because of inconvenient debugging issue, so I adjusted to only retain function way for frequent record processing because of possible JIT optimization concerns. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9483: [FLINK-13767][task] Refactor StreamInputProcessor#processInput based on InputStatus
flinkbot edited a comment on issue #9483: [FLINK-13767][task] Refactor StreamInputProcessor#processInput based on InputStatus URL: https://github.com/apache/flink/pull/9483#issuecomment-522597807 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit ed9f2ccf2b47da38109dcd6c9ec72e67482bcd7e (Thu Sep 12 10:07:49 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9483: [FLINK-13767][task] Refactor StreamInputProcessor#processInput based on InputStatus
flinkbot edited a comment on issue #9483: [FLINK-13767][task] Refactor StreamInputProcessor#processInput based on InputStatus URL: https://github.com/apache/flink/pull/9483#issuecomment-522602263 ## CI report: * 5d079442de5815edf896b85fa7aa3ca599975bb0 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/123734022) * 725be7c2226608382c67d5b3d372886be737fff6 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/124870650) * eeb6f54f2316e5a6ef8f1cbed4f002c3ae37107e : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/12569) * e8d22a8c88d9c4a3d10fe989761c54e2d56ba3bf : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/125557289) * 9b8109a8586bcf8095eba8d24c6cd0dbb21d7810 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/125559250) * ac519615c6a2e28bb6544d2dce2dee43a03f3928 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/125831290) * ed9f2ccf2b47da38109dcd6c9ec72e67482bcd7e : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/127164915) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13767) Refactor StreamInputProcessor#processInput based on InputStatus
[ https://issues.apache.org/jira/browse/FLINK-13767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13767: - Summary: Refactor StreamInputProcessor#processInput based on InputStatus (was: Migrate isFinished method from AvailabilityListener to AsyncDataInput) > Refactor StreamInputProcessor#processInput based on InputStatus > --- > > Key: FLINK-13767 > URL: https://issues.apache.org/jira/browse/FLINK-13767 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network, Runtime / Task >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > AvailabilityListener is both used in AsyncDataInput and StreamTaskInput. We > already introduced InputStatus for PushBasedAsyncDataInput#emitNext, and then > InputStatus#END_OF_INPUT has the same semantic with > AvailabilityListener#isFinished. > But for the case of AsyncDataInput which is mainly used by InputGate layer, > the isFinished() method is still needed at the moment. So we migrate this > method from AvailabilityListener to AsyncDataInput, and refactor the > StreamInputProcessor implementations by using InputStatus to judge the > finished state. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (FLINK-13767) Refactor StreamInputProcessor#processInput based on InputStatus
[ https://issues.apache.org/jira/browse/FLINK-13767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-13767: - Description: StreamInputProcessor#processInput could return InputStatus instead of current boolean value to keep consistent with PushingAsyncDataInput#emitNext. For the implementation of StreamTwoInputProcessor#processInput, we could maintain and judge the two input status together with the next selected input index to determine the final precise status. To do so we could avoid invalid processInput call except for the first call. In addition, AvailabilityProvider#isFinished has the duplicated semantic with InputStatus#END_OF_INPUT for PushingAsyncDataInput, and it is only meaningful for PullingAsyncDataInput now. So we migrate the #isFinished method from AvailabilityProvider to PullingAsyncDataInput. was: AvailabilityListener is both used in AsyncDataInput and StreamTaskInput. We already introduced InputStatus for PushBasedAsyncDataInput#emitNext, and then InputStatus#END_OF_INPUT has the same semantic with AvailabilityListener#isFinished. But for the case of AsyncDataInput which is mainly used by InputGate layer, the isFinished() method is still needed at the moment. So we migrate this method from AvailabilityListener to AsyncDataInput, and refactor the StreamInputProcessor implementations by using InputStatus to judge the finished state. > Refactor StreamInputProcessor#processInput based on InputStatus > --- > > Key: FLINK-13767 > URL: https://issues.apache.org/jira/browse/FLINK-13767 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network, Runtime / Task >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > StreamInputProcessor#processInput could return InputStatus instead of current > boolean value to keep consistent with PushingAsyncDataInput#emitNext. > For the implementation of StreamTwoInputProcessor#processInput, we could > maintain and judge the two input status together with the next selected input > index to determine the final precise status. To do so we could avoid invalid > processInput call except for the first call. > In addition, AvailabilityProvider#isFinished has the duplicated semantic > with InputStatus#END_OF_INPUT for PushingAsyncDataInput, and it is only > meaningful for PullingAsyncDataInput now. So we migrate the #isFinished > method from AvailabilityProvider to PullingAsyncDataInput. -- This message was sent by Atlassian Jira (v8.3.2#803003)