[jira] [Updated] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
[ https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-11087: Fix Version/s: 1.7.1 1.6.3 > Broadcast state migration Incompatibility from 1.5.3 to 1.7.0 > - > > Key: FLINK-11087 > URL: https://issues.apache.org/jira/browse/FLINK-11087 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.6.2, 1.7.0 > Environment: Migration from Flink 1.5.3 to Flink 1.7.0 >Reporter: Edward Rojas >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Labels: Migration, State, broadcast > Fix For: 1.6.3, 1.7.1 > > > When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast > state throws the following error: > {noformat} > org.apache.flink.util.StateMigrationException: The new key serializer for > broadcast state must not be incompatible. > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238) > at > org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745){noformat} > The broadcast is using a MapState with StringSerializer as key serializer and > a custom JsonSerializer as value serializer. > There was no changes in the TypeSerializers used, only upgrade of version. > > With some debugging I see that at the moment of the validation of the > compatibility of states in the DefaultOperatorStateBackend class, the > "*registeredBroadcastStates*" containing the data about the 'old' state, > contains wrong association of the key and value serializer. This is, > JsonSerializer appears as key serializer and StringSerializer appears as > value serializer. (when it should be the contrary) > > After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" > class is the responsible of this swap here: > https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
[ https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711637#comment-16711637 ] Chesnay Schepler commented on FLINK-11087: -- [~tzulitai] Can you amend the migration matrix in the [documentation|https://ci.apache.org/projects/flink/flink-docs-master/ops/upgrading.html#compatibility-table]? > Broadcast state migration Incompatibility from 1.5.3 to 1.7.0 > - > > Key: FLINK-11087 > URL: https://issues.apache.org/jira/browse/FLINK-11087 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.7.0 > Environment: Migration from Flink 1.5.3 to Flink 1.7.0 >Reporter: Edward Rojas >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Labels: Migration, State, broadcast > > When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast > state throws the following error: > {noformat} > org.apache.flink.util.StateMigrationException: The new key serializer for > broadcast state must not be incompatible. > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238) > at > org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745){noformat} > The broadcast is using a MapState with StringSerializer as key serializer and > a custom JsonSerializer as value serializer. > There was no changes in the TypeSerializers used, only upgrade of version. > > With some debugging I see that at the moment of the validation of the > compatibility of states in the DefaultOperatorStateBackend class, the > "*registeredBroadcastStates*" containing the data about the 'old' state, > contains wrong association of the key and value serializer. This is, > JsonSerializer appears as key serializer and StringSerializer appears as > value serializer. (when it should be the contrary) > > After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" > class is the responsible of this swap here: > https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
[ https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711635#comment-16711635 ] Tzu-Li (Gordon) Tai edited comment on FLINK-11087 at 12/6/18 4:06 PM: -- Confirmed, this is indeed a bug, and the cause is what we've discussed above. This bug prevents Flink 1.6.x, 1.7.0 from restoring broadcast state from a 1.5.x savepoint. Setting this as a blocker for 1.6.4 and 1.7.1, providing a fix for this. Thanks for the investigation [~edRojas]. was (Author: tzulitai): Confirmed, this is indeed a bug, and the cause is what we've discussed above. Setting this as a blocker for 1.6.4 and 1.7.1, providing a fix for this. Thanks for the investigation [~edRojas]. > Broadcast state migration Incompatibility from 1.5.3 to 1.7.0 > - > > Key: FLINK-11087 > URL: https://issues.apache.org/jira/browse/FLINK-11087 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.7.0 > Environment: Migration from Flink 1.5.3 to Flink 1.7.0 >Reporter: Edward Rojas >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Labels: Migration, State, broadcast > > When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast > state throws the following error: > {noformat} > org.apache.flink.util.StateMigrationException: The new key serializer for > broadcast state must not be incompatible. > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238) > at > org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745){noformat} > The broadcast is using a MapState with StringSerializer as key serializer and > a custom JsonSerializer as value serializer. > There was no changes in the TypeSerializers used, only upgrade of version. > > With some debugging I see that at the moment of the validation of the > compatibility of states in the DefaultOperatorStateBackend class, the > "*registeredBroadcastStates*" containing the data about the 'old' state, > contains wrong association of the key and value serializer. This is, > JsonSerializer appears as key serializer and StringSerializer appears as > value serializer. (when it should be the contrary) > > After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" > class is the responsible of this swap here: > https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
[ https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711635#comment-16711635 ] Tzu-Li (Gordon) Tai commented on FLINK-11087: - Confirmed, this is indeed a bug, and the cause is what we've discussed above. Setting this as a blocker for 1.6.4 and 1.7.1, providing a fix for this. Thanks for the investigation [~edRojas]. > Broadcast state migration Incompatibility from 1.5.3 to 1.7.0 > - > > Key: FLINK-11087 > URL: https://issues.apache.org/jira/browse/FLINK-11087 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.7.0 > Environment: Migration from Flink 1.5.3 to Flink 1.7.0 >Reporter: Edward Rojas >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Labels: Migration, State, broadcast > > When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast > state throws the following error: > {noformat} > org.apache.flink.util.StateMigrationException: The new key serializer for > broadcast state must not be incompatible. > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238) > at > org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745){noformat} > The broadcast is using a MapState with StringSerializer as key serializer and > a custom JsonSerializer as value serializer. > There was no changes in the TypeSerializers used, only upgrade of version. > > With some debugging I see that at the moment of the validation of the > compatibility of states in the DefaultOperatorStateBackend class, the > "*registeredBroadcastStates*" containing the data about the 'old' state, > contains wrong association of the key and value serializer. This is, > JsonSerializer appears as key serializer and StringSerializer appears as > value serializer. (when it should be the contrary) > > After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" > class is the responsible of this swap here: > https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore
[ https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711634#comment-16711634 ] ASF GitHub Bot commented on FLINK-11048: zentol commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r239509248 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig } } - @Override - public JobExecutionResult execute(String jobName) throws ProgramInvocationException { - StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); - transformations.clear(); - return executeRemotely(streamGraph, jarFiles); + protected List getJarFiles() throws ProgramInvocationException { + return jarFiles; } /** * Executes the remote job. * -* @param streamGraph -*Stream Graph to execute +* @param streamExecutionEnvironment +*Execution Environment with Stream Graph to execute * @param jarFiles *List of jar file URLs to ship to the cluster * @return The result of the job execution, containing elapsed time and accumulators. */ - protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List jarFiles) throws ProgramInvocationException { + public static JobExecutionResult executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment, Review comment: This cannot be moved to the ClusterClient since it relies on Streaming API classes (StreamExecutionEnvironment, or with your suggestion the StreamGraph) that aren't available in `flink-clients`. Hence why there we are only working on the `JobGraph`. And we can't just add a dependency since `flink-streaming-java` already depends on `flink-clients`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Ability to programmatically execute streaming pipeline with savepoint restore > > --- > > Key: FLINK-11048 > URL: https://issues.apache.org/jira/browse/FLINK-11048 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.7.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Minor > Labels: pull-request-available > > RemoteStreamEnvironment.execute doesn't support restore from savepoint, > though the underlying ClusterClient does. Add an explicit "execute remotely" > that can be used by downstream projects. > [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E] > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore
zentol commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r239509248 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig } } - @Override - public JobExecutionResult execute(String jobName) throws ProgramInvocationException { - StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); - transformations.clear(); - return executeRemotely(streamGraph, jarFiles); + protected List getJarFiles() throws ProgramInvocationException { + return jarFiles; } /** * Executes the remote job. * -* @param streamGraph -*Stream Graph to execute +* @param streamExecutionEnvironment +*Execution Environment with Stream Graph to execute * @param jarFiles *List of jar file URLs to ship to the cluster * @return The result of the job execution, containing elapsed time and accumulators. */ - protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List jarFiles) throws ProgramInvocationException { + public static JobExecutionResult executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment, Review comment: This cannot be moved to the ClusterClient since it relies on Streaming API classes (StreamExecutionEnvironment, or with your suggestion the StreamGraph) that aren't available in `flink-clients`. Hence why there we are only working on the `JobGraph`. And we can't just add a dependency since `flink-streaming-java` already depends on `flink-clients`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
[ https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai reassigned FLINK-11087: --- Assignee: Tzu-Li (Gordon) Tai > Broadcast state migration Incompatibility from 1.5.3 to 1.7.0 > - > > Key: FLINK-11087 > URL: https://issues.apache.org/jira/browse/FLINK-11087 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.7.0 > Environment: Migration from Flink 1.5.3 to Flink 1.7.0 >Reporter: Edward Rojas >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Labels: Migration, State, broadcast > > When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast > state throws the following error: > {noformat} > org.apache.flink.util.StateMigrationException: The new key serializer for > broadcast state must not be incompatible. > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238) > at > org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745){noformat} > The broadcast is using a MapState with StringSerializer as key serializer and > a custom JsonSerializer as value serializer. > There was no changes in the TypeSerializers used, only upgrade of version. > > With some debugging I see that at the moment of the validation of the > compatibility of states in the DefaultOperatorStateBackend class, the > "*registeredBroadcastStates*" containing the data about the 'old' state, > contains wrong association of the key and value serializer. This is, > JsonSerializer appears as key serializer and StringSerializer appears as > value serializer. (when it should be the contrary) > > After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" > class is the responsible of this swap here: > https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9740) Support group windows over intervals of months
[ https://issues.apache.org/jira/browse/FLINK-9740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711618#comment-16711618 ] Timo Walther commented on FLINK-9740: - I cannot see it in the JIRA yet but in any case: In general we can do it similar to your proposal. Maybe we can also use more efficient Java 8 time classes. This is a feature where also the DataStream API could profit from it. [~aljoscha] what it your opinion here? I guess monthly windows should be added to DataStream API first? > Support group windows over intervals of months > --- > > Key: FLINK-9740 > URL: https://issues.apache.org/jira/browse/FLINK-9740 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Timo Walther >Assignee: Renjie Liu >Priority: Major > Labels: pull-request-available > > Currently, time-based group windows can be defined using intervals of > milliseconds such as {{.window(Tumble over 10.minutes on 'rowtime as 'w)}}. > For some use cases it might useful to define windows of months (esp. in > event-time) that work even with leap years and other special time cases. > The following should be supported in Table API & SQL: > {{.window(Tumble over 1.month on 'rowtime as 'w)}} > {{.window(Tumble over 1.quarter on 'rowtime as 'w)}} > {{.window(Tumble over 1.year on 'rowtime as 'w)}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore
[ https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711607#comment-16711607 ] ASF GitHub Bot commented on FLINK-11048: mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r239501341 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig } } - @Override - public JobExecutionResult execute(String jobName) throws ProgramInvocationException { - StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); - transformations.clear(); - return executeRemotely(streamGraph, jarFiles); + protected List getJarFiles() throws ProgramInvocationException { + return jarFiles; } /** * Executes the remote job. * -* @param streamGraph -*Stream Graph to execute +* @param streamExecutionEnvironment +*Execution Environment with Stream Graph to execute * @param jarFiles *List of jar file URLs to ship to the cluster * @return The result of the job execution, containing elapsed time and accumulators. */ - protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List jarFiles) throws ProgramInvocationException { + public static JobExecutionResult executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment, Review comment: StreamGraph is exposed anyway because there is `StreamExecutionEnvironment.getStreamGraph()` though I see it is marked `@Internal`. I'm fine also with passing the environment. >IMO any use-case where usage of the StreamGraph is appropriate (for which tbh there are none as the JobGraph would be a better choice anyway) is already a pretty low-level thing in which case you can work directly against a ClusterClient. So you are suggesting to move the code here to `ClusterClient`? I think that would make sense. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Ability to programmatically execute streaming pipeline with savepoint restore > > --- > > Key: FLINK-11048 > URL: https://issues.apache.org/jira/browse/FLINK-11048 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.7.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Minor > Labels: pull-request-available > > RemoteStreamEnvironment.execute doesn't support restore from savepoint, > though the underlying ClusterClient does. Add an explicit "execute remotely" > that can be used by downstream projects. > [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E] > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10945) Avoid resource deadlocks for finite stream jobs when resources are limited
[ https://issues.apache.org/jira/browse/FLINK-10945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711599#comment-16711599 ] ASF GitHub Bot commented on FLINK-10945: zhuzhurk closed pull request #7252: [FLINK-10945] Add an InputDependencyConstraint to avoid resource dead… URL: https://github.com/apache/flink/pull/7252 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 3b3132b3d11..6def1dfa488 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -159,6 +159,9 @@ /** Determines if a task fails or not if there is an error in writing its checkpoint data. Default: true */ private boolean failTaskOnCheckpointError = true; + /** The input dependency constraint to schedule tasks. */ + private InputDependencyConstraint inputDependencyConstraint = InputDependencyConstraint.ANY; + // --- User code values private GlobalJobParameters globalJobParameters; @@ -518,6 +521,30 @@ public ExecutionMode getExecutionMode() { return executionMode; } + /** +* Sets the input dependency constraint for vertex scheduling. It indicates when a task +* should be scheduled considering its inputs status. +* +* The default constraint is {@link InputDependencyConstraint#ANY}. +* +* @param inputDependencyConstraint The input dependency constraint. +*/ + public void setInputDependencyConstraint(InputDependencyConstraint inputDependencyConstraint) { + this.inputDependencyConstraint = inputDependencyConstraint; + } + + /** +* Gets the input dependency constraint for vertex scheduling. It indicates when a task +* should be scheduled considering its inputs status. +* +* The default constraint is {@link InputDependencyConstraint#ANY}. +* +* @return The input dependency constraint of this job. +*/ + public InputDependencyConstraint getInputDependencyConstraint() { + return inputDependencyConstraint; + } + /** * Force TypeExtractor to use Kryo serializer for POJOS even though we could analyze as POJO. * In some cases this might be preferable. For example, when using interfaces diff --git a/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java b/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java new file mode 100644 index 000..c7a1e6a3519 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common; + +/** + * This constraint indicates when a task should be scheduled considering its inputs status. + */ +public enum InputDependencyConstraint { + + /** +* Schedule the task if any input is consumable. +*/ + ANY, + + /** +* Schedule the task if all the inputs are consumable. +*/ + ALL +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java index 7c2b30db32a..15bdb36f1d3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java @@ -101,7 +101,8 @@ public String
[GitHub] mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore
mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r239501341 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig } } - @Override - public JobExecutionResult execute(String jobName) throws ProgramInvocationException { - StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); - transformations.clear(); - return executeRemotely(streamGraph, jarFiles); + protected List getJarFiles() throws ProgramInvocationException { + return jarFiles; } /** * Executes the remote job. * -* @param streamGraph -*Stream Graph to execute +* @param streamExecutionEnvironment +*Execution Environment with Stream Graph to execute * @param jarFiles *List of jar file URLs to ship to the cluster * @return The result of the job execution, containing elapsed time and accumulators. */ - protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List jarFiles) throws ProgramInvocationException { + public static JobExecutionResult executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment, Review comment: StreamGraph is exposed anyway because there is `StreamExecutionEnvironment.getStreamGraph()` though I see it is marked `@Internal`. I'm fine also with passing the environment. >IMO any use-case where usage of the StreamGraph is appropriate (for which tbh there are none as the JobGraph would be a better choice anyway) is already a pretty low-level thing in which case you can work directly against a ClusterClient. So you are suggesting to move the code here to `ClusterClient`? I think that would make sense. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10945) Avoid resource deadlocks for finite stream jobs when resources are limited
[ https://issues.apache.org/jira/browse/FLINK-10945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711598#comment-16711598 ] ASF GitHub Bot commented on FLINK-10945: zhuzhurk commented on issue #7252: [FLINK-10945] Add an InputDependencyConstraint to avoid resource dead… URL: https://github.com/apache/flink/pull/7252#issuecomment-444913214 > Please open the PR against the master branch. Thanks Zentol for pointing out the fault. The new PR is https://github.com/apache/flink/pull/7255 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Avoid resource deadlocks for finite stream jobs when resources are limited > -- > > Key: FLINK-10945 > URL: https://issues.apache.org/jira/browse/FLINK-10945 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.7.1 >Reporter: Zhu Zhu >Assignee: Zhu Zhu >Priority: Major > Labels: pull-request-available > > Currently *resource deadlocks* can happen to finite stream jobs(or batch > jobs) when resources are limited. In 2 cases as below: > # Task Y is a pipelined downstream task of task X. When X takes all > resources(slots), Y cannot acquire slots to start, thus the back pressure > will block X to finish > # Task Y is a upstream task of task X. When X takes all resources(slots) and > Y cannot start, X cannot finish as some of its inputs are not finished > > We can avoid case 1 by setting all edges to be BLOCKING to avoid pipeline > back pressure. However, case 2 cannot be avoided as X(downstream task) will > be launched when any of its input result is ready. > To be detailed, say task X has BLOCKING upstream task Y and Z, X can be > launched when Z finishes, though task Y is not launched yet. This pre-launch > behaviour can be beneficial when there are plenty of resources, thus X can > process data from Z earlier before Y finishes its data processing. However, > resource deadlocks may happen when the resources are limited, e.g. in small > sessions. > > I’d propose introducing a constraint named as *InputDependencyConstraint* to > control the scheduling of vertices. It has 2 values: > # *ANY*. The vertex can be scheduled when any of its inputs is consumable. > # *ALL*. The vertex can be scheduled when all of its inputs are consumable. > > The design doc is here. > [https://docs.google.com/document/d/1jpqC7OW_nLOSVOg06_QCWelicVtV6Au0Krg5m_S4kjY/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zhuzhurk closed pull request #7252: [FLINK-10945] Add an InputDependencyConstraint to avoid resource dead…
zhuzhurk closed pull request #7252: [FLINK-10945] Add an InputDependencyConstraint to avoid resource dead… URL: https://github.com/apache/flink/pull/7252 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 3b3132b3d11..6def1dfa488 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -159,6 +159,9 @@ /** Determines if a task fails or not if there is an error in writing its checkpoint data. Default: true */ private boolean failTaskOnCheckpointError = true; + /** The input dependency constraint to schedule tasks. */ + private InputDependencyConstraint inputDependencyConstraint = InputDependencyConstraint.ANY; + // --- User code values private GlobalJobParameters globalJobParameters; @@ -518,6 +521,30 @@ public ExecutionMode getExecutionMode() { return executionMode; } + /** +* Sets the input dependency constraint for vertex scheduling. It indicates when a task +* should be scheduled considering its inputs status. +* +* The default constraint is {@link InputDependencyConstraint#ANY}. +* +* @param inputDependencyConstraint The input dependency constraint. +*/ + public void setInputDependencyConstraint(InputDependencyConstraint inputDependencyConstraint) { + this.inputDependencyConstraint = inputDependencyConstraint; + } + + /** +* Gets the input dependency constraint for vertex scheduling. It indicates when a task +* should be scheduled considering its inputs status. +* +* The default constraint is {@link InputDependencyConstraint#ANY}. +* +* @return The input dependency constraint of this job. +*/ + public InputDependencyConstraint getInputDependencyConstraint() { + return inputDependencyConstraint; + } + /** * Force TypeExtractor to use Kryo serializer for POJOS even though we could analyze as POJO. * In some cases this might be preferable. For example, when using interfaces diff --git a/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java b/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java new file mode 100644 index 000..c7a1e6a3519 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common; + +/** + * This constraint indicates when a task should be scheduled considering its inputs status. + */ +public enum InputDependencyConstraint { + + /** +* Schedule the task if any input is consumable. +*/ + ANY, + + /** +* Schedule the task if all the inputs are consumable. +*/ + ALL +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java index 7c2b30db32a..15bdb36f1d3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java @@ -101,7 +101,8 @@ public String toString() { final ResultPartitionLocation partitionLocation; // The producing task needs to be RUNNING or already FINISHED - if (consumedPartition.isConsumable() &&
[GitHub] zhuzhurk commented on issue #7252: [FLINK-10945] Add an InputDependencyConstraint to avoid resource dead…
zhuzhurk commented on issue #7252: [FLINK-10945] Add an InputDependencyConstraint to avoid resource dead… URL: https://github.com/apache/flink/pull/7252#issuecomment-444913214 > Please open the PR against the master branch. Thanks Zentol for pointing out the fault. The new PR is https://github.com/apache/flink/pull/7255 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore
[ https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711596#comment-16711596 ] ASF GitHub Bot commented on FLINK-11048: zentol commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r239497531 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig } } - @Override - public JobExecutionResult execute(String jobName) throws ProgramInvocationException { - StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); - transformations.clear(); - return executeRemotely(streamGraph, jarFiles); + protected List getJarFiles() throws ProgramInvocationException { + return jarFiles; } /** * Executes the remote job. * -* @param streamGraph -*Stream Graph to execute +* @param streamExecutionEnvironment +*Execution Environment with Stream Graph to execute * @param jarFiles *List of jar file URLs to ship to the cluster * @return The result of the job execution, containing elapsed time and accumulators. */ - protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List jarFiles) throws ProgramInvocationException { + public static JobExecutionResult executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment, Review comment: Using the environment is the simplest option and hides several internal classes (like the StreamGraph) that we shouldn't expose to users. Effectively this serves as an extension to the `RemoteStreamEnvironment` without baking it into a `RemoteStreamEnvironment` instance itself, which is fair imo. IMO any use-case where usage of the StreamGraph is appropriate (for which tbh there are none as the JobGraph would be a better choice anyway) is already a pretty low-level thing in which case you can work directly against a ClusterClient. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Ability to programmatically execute streaming pipeline with savepoint restore > > --- > > Key: FLINK-11048 > URL: https://issues.apache.org/jira/browse/FLINK-11048 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.7.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Minor > Labels: pull-request-available > > RemoteStreamEnvironment.execute doesn't support restore from savepoint, > though the underlying ClusterClient does. Add an explicit "execute remotely" > that can be used by downstream projects. > [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E] > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10945) Avoid resource deadlocks for finite stream jobs when resources are limited
[ https://issues.apache.org/jira/browse/FLINK-10945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711595#comment-16711595 ] ASF GitHub Bot commented on FLINK-10945: zhuzhurk opened a new pull request #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead… URL: https://github.com/apache/flink/pull/7255 …locks in LAZY_FROM_SOURCES scheduling when resources are limited ## What is the purpose of the change This PR add a job config InputDependencyConstraint, which helps to avoid resource deadlocks in LAZY_FROM_SOURCES scheduling when resources are limited, as described in [FLINK-10945](https://issues.apache.org/jira/browse/FLINK-10945). ## Brief change log - *Add InputDependencyConstraint to ExecutionConfig* - *Adjust isConsumable interface in IntermediateResultPartition to fit for the data actual consumable definition* - Change current execution lazy scheduling logic(in Execution.scheduleOrUpdateConsumers(edges)) to schedule tasks only if the InputDependencyConstraint is satisfied(an interface ExecutionVertex.checkInputDependencyConstraints is added to serve this purpose). ## Verifying this change This change added tests and can be verified as follows: - *Added IntermediateResultPartitionTest to validate IntermediateResultPartition changes* - *Added ExecutionVertexInputConstraintTest to validate the constraint check logic in ExecutionVertex* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Avoid resource deadlocks for finite stream jobs when resources are limited > -- > > Key: FLINK-10945 > URL: https://issues.apache.org/jira/browse/FLINK-10945 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.7.1 >Reporter: Zhu Zhu >Assignee: Zhu Zhu >Priority: Major > Labels: pull-request-available > > Currently *resource deadlocks* can happen to finite stream jobs(or batch > jobs) when resources are limited. In 2 cases as below: > # Task Y is a pipelined downstream task of task X. When X takes all > resources(slots), Y cannot acquire slots to start, thus the back pressure > will block X to finish > # Task Y is a upstream task of task X. When X takes all resources(slots) and > Y cannot start, X cannot finish as some of its inputs are not finished > > We can avoid case 1 by setting all edges to be BLOCKING to avoid pipeline > back pressure. However, case 2 cannot be avoided as X(downstream task) will > be launched when any of its input result is ready. > To be detailed, say task X has BLOCKING upstream task Y and Z, X can be > launched when Z finishes, though task Y is not launched yet. This pre-launch > behaviour can be beneficial when there are plenty of resources, thus X can > process data from Z earlier before Y finishes its data processing. However, > resource deadlocks may happen when the resources are limited, e.g. in small > sessions. > > I’d propose introducing a constraint named as *InputDependencyConstraint* to > control the scheduling of vertices. It has 2 values: > # *ANY*. The vertex can be scheduled when any of its inputs is consumable. > # *ALL*. The vertex can be scheduled when all of its inputs are consumable. > > The design doc is here. > [https://docs.google.com/document/d/1jpqC7OW_nLOSVOg06_QCWelicVtV6Au0Krg5m_S4kjY/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore
zentol commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r239497531 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig } } - @Override - public JobExecutionResult execute(String jobName) throws ProgramInvocationException { - StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); - transformations.clear(); - return executeRemotely(streamGraph, jarFiles); + protected List getJarFiles() throws ProgramInvocationException { + return jarFiles; } /** * Executes the remote job. * -* @param streamGraph -*Stream Graph to execute +* @param streamExecutionEnvironment +*Execution Environment with Stream Graph to execute * @param jarFiles *List of jar file URLs to ship to the cluster * @return The result of the job execution, containing elapsed time and accumulators. */ - protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List jarFiles) throws ProgramInvocationException { + public static JobExecutionResult executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment, Review comment: Using the environment is the simplest option and hides several internal classes (like the StreamGraph) that we shouldn't expose to users. Effectively this serves as an extension to the `RemoteStreamEnvironment` without baking it into a `RemoteStreamEnvironment` instance itself, which is fair imo. IMO any use-case where usage of the StreamGraph is appropriate (for which tbh there are none as the JobGraph would be a better choice anyway) is already a pretty low-level thing in which case you can work directly against a ClusterClient. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhuzhurk opened a new pull request #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead…
zhuzhurk opened a new pull request #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead… URL: https://github.com/apache/flink/pull/7255 …locks in LAZY_FROM_SOURCES scheduling when resources are limited ## What is the purpose of the change This PR add a job config InputDependencyConstraint, which helps to avoid resource deadlocks in LAZY_FROM_SOURCES scheduling when resources are limited, as described in [FLINK-10945](https://issues.apache.org/jira/browse/FLINK-10945). ## Brief change log - *Add InputDependencyConstraint to ExecutionConfig* - *Adjust isConsumable interface in IntermediateResultPartition to fit for the data actual consumable definition* - Change current execution lazy scheduling logic(in Execution.scheduleOrUpdateConsumers(edges)) to schedule tasks only if the InputDependencyConstraint is satisfied(an interface ExecutionVertex.checkInputDependencyConstraints is added to serve this purpose). ## Verifying this change This change added tests and can be verified as follows: - *Added IntermediateResultPartitionTest to validate IntermediateResultPartition changes* - *Added ExecutionVertexInputConstraintTest to validate the constraint check logic in ExecutionVertex* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10887) Add source watermark tracking to the JobMaster
[ https://issues.apache.org/jira/browse/FLINK-10887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711581#comment-16711581 ] ASF GitHub Bot commented on FLINK-10887: jgrier edited a comment on issue #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster URL: https://github.com/apache/flink/pull/7099#issuecomment-444907282 @aljoscha @tweise Can you guys comment on the above generic aggregator proposal? I'd like to keep this moving forward. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add source watermark tracking to the JobMaster > -- > > Key: FLINK-10887 > URL: https://issues.apache.org/jira/browse/FLINK-10887 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: Jamie Grier >Assignee: Jamie Grier >Priority: Major > Labels: pull-request-available > Original Estimate: 24h > Remaining Estimate: 24h > > We need to add a new RPC to the JobMaster such that the current watermark for > every source sub-task can be reported and the current global minimum/maximum > watermark can be retrieved so that each source can adjust their partition > read rates in an attempt to keep sources roughly aligned in event time. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] jgrier edited a comment on issue #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster
jgrier edited a comment on issue #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster URL: https://github.com/apache/flink/pull/7099#issuecomment-444907282 @aljoscha @tweise Can you guys comment on the above generic aggregator proposal? I'd like to keep this moving forward. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10887) Add source watermark tracking to the JobMaster
[ https://issues.apache.org/jira/browse/FLINK-10887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711578#comment-16711578 ] ASF GitHub Bot commented on FLINK-10887: jgrier commented on issue #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster URL: https://github.com/apache/flink/pull/7099#issuecomment-444907282 @aljoscha @tweise Can you guys comment on the above proposal? I'd like to keep this moving forward. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add source watermark tracking to the JobMaster > -- > > Key: FLINK-10887 > URL: https://issues.apache.org/jira/browse/FLINK-10887 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: Jamie Grier >Assignee: Jamie Grier >Priority: Major > Labels: pull-request-available > Original Estimate: 24h > Remaining Estimate: 24h > > We need to add a new RPC to the JobMaster such that the current watermark for > every source sub-task can be reported and the current global minimum/maximum > watermark can be retrieved so that each source can adjust their partition > read rates in an attempt to keep sources roughly aligned in event time. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] jgrier commented on issue #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster
jgrier commented on issue #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster URL: https://github.com/apache/flink/pull/7099#issuecomment-444907282 @aljoscha @tweise Can you guys comment on the above proposal? I'd like to keep this moving forward. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-9740) Support group windows over intervals of months
[ https://issues.apache.org/jira/browse/FLINK-9740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xuqianjin updated FLINK-9740: - Attachment: (was: About [FLINK-9740] Support group windows over intervals of months.pdf) > Support group windows over intervals of months > --- > > Key: FLINK-9740 > URL: https://issues.apache.org/jira/browse/FLINK-9740 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Timo Walther >Assignee: Renjie Liu >Priority: Major > Labels: pull-request-available > > Currently, time-based group windows can be defined using intervals of > milliseconds such as {{.window(Tumble over 10.minutes on 'rowtime as 'w)}}. > For some use cases it might useful to define windows of months (esp. in > event-time) that work even with leap years and other special time cases. > The following should be supported in Table API & SQL: > {{.window(Tumble over 1.month on 'rowtime as 'w)}} > {{.window(Tumble over 1.quarter on 'rowtime as 'w)}} > {{.window(Tumble over 1.year on 'rowtime as 'w)}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Issue Comment Deleted] (FLINK-9740) Support group windows over intervals of months
[ https://issues.apache.org/jira/browse/FLINK-9740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xuqianjin updated FLINK-9740: - Comment: was deleted (was: hi [~twalthr] The attachment has been submitted) > Support group windows over intervals of months > --- > > Key: FLINK-9740 > URL: https://issues.apache.org/jira/browse/FLINK-9740 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Timo Walther >Assignee: Renjie Liu >Priority: Major > Labels: pull-request-available > > Currently, time-based group windows can be defined using intervals of > milliseconds such as {{.window(Tumble over 10.minutes on 'rowtime as 'w)}}. > For some use cases it might useful to define windows of months (esp. in > event-time) that work even with leap years and other special time cases. > The following should be supported in Table API & SQL: > {{.window(Tumble over 1.month on 'rowtime as 'w)}} > {{.window(Tumble over 1.quarter on 'rowtime as 'w)}} > {{.window(Tumble over 1.year on 'rowtime as 'w)}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
[ https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711511#comment-16711511 ] Tzu-Li (Gordon) Tai commented on FLINK-11087: - I'm currently modifying the {{StatefulJobWBroadcastStateMigrationITCase}} to verify this. Currently that ITCase had now way to capture this bug, because the key / value types of broadcast state tested there are all identical .. If it turns out to be true, then we'll need to lift this as a blocker bug. > Broadcast state migration Incompatibility from 1.5.3 to 1.7.0 > - > > Key: FLINK-11087 > URL: https://issues.apache.org/jira/browse/FLINK-11087 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.7.0 > Environment: Migration from Flink 1.5.3 to Flink 1.7.0 >Reporter: Edward Rojas >Priority: Major > Labels: Migration, State, broadcast > > When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast > state throws the following error: > {noformat} > org.apache.flink.util.StateMigrationException: The new key serializer for > broadcast state must not be incompatible. > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238) > at > org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745){noformat} > The broadcast is using a MapState with StringSerializer as key serializer and > a custom JsonSerializer as value serializer. > There was no changes in the TypeSerializers used, only upgrade of version. > > With some debugging I see that at the moment of the validation of the > compatibility of states in the DefaultOperatorStateBackend class, the > "*registeredBroadcastStates*" containing the data about the 'old' state, > contains wrong association of the key and value serializer. This is, > JsonSerializer appears as key serializer and StringSerializer appears as > value serializer. (when it should be the contrary) > > After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" > class is the responsible of this swap here: > https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
[ https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711511#comment-16711511 ] Tzu-Li (Gordon) Tai edited comment on FLINK-11087 at 12/6/18 2:35 PM: -- I'm currently modifying the {{StatefulJobWBroadcastStateMigrationITCase}} to verify this. Currently that ITCase had no way to capture this potential bug, because the key / value types of broadcast state tested there are all identical .. If it turns out to be true, then we'll need to lift this as a blocker bug. was (Author: tzulitai): I'm currently modifying the {{StatefulJobWBroadcastStateMigrationITCase}} to verify this. Currently that ITCase had now way to capture this bug, because the key / value types of broadcast state tested there are all identical .. If it turns out to be true, then we'll need to lift this as a blocker bug. > Broadcast state migration Incompatibility from 1.5.3 to 1.7.0 > - > > Key: FLINK-11087 > URL: https://issues.apache.org/jira/browse/FLINK-11087 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.7.0 > Environment: Migration from Flink 1.5.3 to Flink 1.7.0 >Reporter: Edward Rojas >Priority: Major > Labels: Migration, State, broadcast > > When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast > state throws the following error: > {noformat} > org.apache.flink.util.StateMigrationException: The new key serializer for > broadcast state must not be incompatible. > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238) > at > org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745){noformat} > The broadcast is using a MapState with StringSerializer as key serializer and > a custom JsonSerializer as value serializer. > There was no changes in the TypeSerializers used, only upgrade of version. > > With some debugging I see that at the moment of the validation of the > compatibility of states in the DefaultOperatorStateBackend class, the > "*registeredBroadcastStates*" containing the data about the 'old' state, > contains wrong association of the key and value serializer. This is, > JsonSerializer appears as key serializer and StringSerializer appears as > value serializer. (when it should be the contrary) > > After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" > class is the responsible of this swap here: > https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11049) Unable to execute partial DAG
[ https://issues.apache.org/jira/browse/FLINK-11049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711508#comment-16711508 ] Till Rohrmann commented on FLINK-11049: --- Hi [~zjffdu], the Flink works is that all branches of the DAG with a sink at its end will be executed when one calls {{ExecutionEnvironment#execute}} or calls a method which triggers eager execution ({{print}}, {{collect}} or {{count}}). In that sense, the behaviour of the code snippet is expected. How do you think Flink should behave in this situation. > Unable to execute partial DAG > - > > Key: FLINK-11049 > URL: https://issues.apache.org/jira/browse/FLINK-11049 > Project: Flink > Issue Type: Bug > Components: Job-Submission >Affects Versions: 1.7.0 >Reporter: Jeff Zhang >Assignee: Jeff Zhang >Priority: Major > Labels: pull-request-available > > {code} > val benv = ExecutionEnvironment.getExecutionEnvironment > val btEnv = TableEnvironment.getTableEnvironment(benv) > val data = benv.fromElements("hello world", "hello flink", "hello hadoop") > data.writeAsText("/Users/jzhang/a.txt", FileSystem.WriteMode.OVERWRITE); > val table = data.flatMap(line=>line.split("\\s")). > map(w => (w, 1)). > toTable(btEnv, 'word, 'number) > btEnv.registerTable("wc", table) > btEnv.sqlQuery("select word, count(1) from wc group by word"). > toDataSet[Row].print() > {code} > In the above example, the last statement will trigger 2 job execution > (writeAsText and print), but what user expect is the print job. The root > cause is that currently, flink unable to submit partial dag. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
[ https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711495#comment-16711495 ] Tzu-Li (Gordon) Tai commented on FLINK-11087: - I checked the key / value serializer association in the {{DefaultOperatorStateBackend}}; it seems to be correct. The problem, as [~edRojas] has indicated, is when restoring from 1.5 (regardless of whether the restore happens in 1.6 or 1.7), the serializer keys ( {{KEY_SERIALIZER}} and {{VALUE_SERIALIZER}} ) in the state meta info had swapped positions. Because of this line, [https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165], for broadcast state meta info the first read serializer (which is the key serializer) is assigned the {{VALUE_SERIALIZER}} key, while the second read serializer (which is the value serializer) is assigned the {{KEY_SERIALIZER}} key. This looks like a bug that affects both Flink 1.6.x and Flink 1.7.0. [~stefanrichte...@gmail.com] could you confirm this? > Broadcast state migration Incompatibility from 1.5.3 to 1.7.0 > - > > Key: FLINK-11087 > URL: https://issues.apache.org/jira/browse/FLINK-11087 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.7.0 > Environment: Migration from Flink 1.5.3 to Flink 1.7.0 >Reporter: Edward Rojas >Priority: Major > Labels: Migration, State, broadcast > > When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast > state throws the following error: > {noformat} > org.apache.flink.util.StateMigrationException: The new key serializer for > broadcast state must not be incompatible. > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238) > at > org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745){noformat} > The broadcast is using a MapState with StringSerializer as key serializer and > a custom JsonSerializer as value serializer. > There was no changes in the TypeSerializers used, only upgrade of version. > > With some debugging I see that at the moment of the validation of the > compatibility of states in the DefaultOperatorStateBackend class, the > "*registeredBroadcastStates*" containing the data about the 'old' state, > contains wrong association of the key and value serializer. This is, > JsonSerializer appears as key serializer and StringSerializer appears as > value serializer. (when it should be the contrary) > > After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" > class is the responsible of this swap here: > https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9920) BucketingSinkFaultToleranceITCase fails on travis
[ https://issues.apache.org/jira/browse/FLINK-9920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711492#comment-16711492 ] Dawid Wysakowicz commented on FLINK-9920: - Another instance: https://api.travis-ci.org/v3/job/464266452/log.txt > BucketingSinkFaultToleranceITCase fails on travis > - > > Key: FLINK-9920 > URL: https://issues.apache.org/jira/browse/FLINK-9920 > Project: Flink > Issue Type: Bug > Components: filesystem-connector >Affects Versions: 1.6.0, 1.7.0 >Reporter: Chesnay Schepler >Priority: Critical > Labels: test-stability > Fix For: 1.8.0 > > > https://travis-ci.org/zentol/flink/jobs/407021898 > {code} > Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 13.082 sec > <<< FAILURE! - in > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkFaultToleranceITCase > runCheckpointedProgram(org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkFaultToleranceITCase) > Time elapsed: 5.696 sec <<< FAILURE! > java.lang.AssertionError: Read line does not match expected pattern. > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkFaultToleranceITCase.postSubmit(BucketingSinkFaultToleranceITCase.java:182) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11086) flink-hadoop-compatibility tests fail for 3.x hadoop versions
[ https://issues.apache.org/jira/browse/FLINK-11086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711431#comment-16711431 ] Chesnay Schepler commented on FLINK-11086: -- We haven't looked into hadoop3 support yet. > flink-hadoop-compatibility tests fail for 3.x hadoop versions > - > > Key: FLINK-11086 > URL: https://issues.apache.org/jira/browse/FLINK-11086 > Project: Flink > Issue Type: Bug > Components: YARN >Reporter: Sebastian Klemke >Priority: Major > > All builds using maven 3.2.5 on commithash > ed8ff14ed39d08cd319efe75b40b9742a2ae7558. > Attempted builds: > - mvn clean install -Dhadoop.version=3.0.3 > - mvn clean install -Dhadoop.version=3.1.1 > Integration tests with Hadoop input format datasource fail. Example stack > trace, taken from hadoop.version 3.1.1 build: > {code:java} > testJobCollectionExecution(org.apache.flink.test.hadoopcompatibility.mapred.WordCountMapredITCase) > Time elapsed: 0.275 sec <<< ERR > OR! > java.lang.NoClassDefFoundError: > org/apache/flink/hadoop/shaded/com/google/re2j/PatternSyntaxException > at java.net.URLClassLoader.findClass(URLClassLoader.java:382) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at org.apache.hadoop.fs.Globber.doGlob(Globber.java:210) > at org.apache.hadoop.fs.Globber.glob(Globber.java:149) > at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:2085) > at > org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:269) > at > org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:239) > at > org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:325) > at > org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:150) > at > org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:58) > at > org.apache.flink.api.common.operators.GenericDataSourceBase.executeOnCollections(GenericDataSourceBase.java:225) > at > org.apache.flink.api.common.operators.CollectionExecutor.executeDataSource(CollectionExecutor.java:219) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:155) > at > org.apache.flink.api.common.operators.CollectionExecutor.executeUnaryOperator(CollectionExecutor.java:229) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:149) > at > org.apache.flink.api.common.operators.CollectionExecutor.executeUnaryOperator(CollectionExecutor.java:229) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:149) > at > org.apache.flink.api.common.operators.CollectionExecutor.executeUnaryOperator(CollectionExecutor.java:229) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:149) > at > org.apache.flink.api.common.operators.CollectionExecutor.executeUnaryOperator(CollectionExecutor.java:229) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:149) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:131) > at > org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:182) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:158) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:131) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:115) > at > org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:38) > at > org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:52) > at > org.apache.flink.test.hadoopcompatibility.mapred.WordCountMapredITCase.internalRun(WordCountMapredITCase.java:121) > at > org.apache.flink.test.hadoopcompatibility.mapred.WordCountMapredITCase.testProgram(WordCountMapredITCase.java:71) > {code} > Maybe hadoop 3.x versions could be added to test matrix as well? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10593) Add support for ALL ROWS PER MATCH
[ https://issues.apache.org/jira/browse/FLINK-10593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xueyu reassigned FLINK-10593: - Assignee: xueyu > Add support for ALL ROWS PER MATCH > -- > > Key: FLINK-10593 > URL: https://issues.apache.org/jira/browse/FLINK-10593 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dawid Wysakowicz >Assignee: xueyu >Priority: Major > > We should properly support it in both {{FINAL}} and {{RUNNING}} modes -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10945) Avoid resource deadlocks for finite stream jobs when resources are limited
[ https://issues.apache.org/jira/browse/FLINK-10945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711471#comment-16711471 ] ASF GitHub Bot commented on FLINK-10945: zentol commented on issue #7252: [FLINK-10945] Add an InputDependencyConstraint to avoid resource dead… URL: https://github.com/apache/flink/pull/7252#issuecomment-444876798 Please open the PR against the master branch. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Avoid resource deadlocks for finite stream jobs when resources are limited > -- > > Key: FLINK-10945 > URL: https://issues.apache.org/jira/browse/FLINK-10945 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.7.1 >Reporter: Zhu Zhu >Assignee: Zhu Zhu >Priority: Major > Labels: pull-request-available > > Currently *resource deadlocks* can happen to finite stream jobs(or batch > jobs) when resources are limited. In 2 cases as below: > # Task Y is a pipelined downstream task of task X. When X takes all > resources(slots), Y cannot acquire slots to start, thus the back pressure > will block X to finish > # Task Y is a upstream task of task X. When X takes all resources(slots) and > Y cannot start, X cannot finish as some of its inputs are not finished > > We can avoid case 1 by setting all edges to be BLOCKING to avoid pipeline > back pressure. However, case 2 cannot be avoided as X(downstream task) will > be launched when any of its input result is ready. > To be detailed, say task X has BLOCKING upstream task Y and Z, X can be > launched when Z finishes, though task Y is not launched yet. This pre-launch > behaviour can be beneficial when there are plenty of resources, thus X can > process data from Z earlier before Y finishes its data processing. However, > resource deadlocks may happen when the resources are limited, e.g. in small > sessions. > > I’d propose introducing a constraint named as *InputDependencyConstraint* to > control the scheduling of vertices. It has 2 values: > # *ANY*. The vertex can be scheduled when any of its inputs is consumable. > # *ALL*. The vertex can be scheduled when all of its inputs are consumable. > > The design doc is here. > [https://docs.google.com/document/d/1jpqC7OW_nLOSVOg06_QCWelicVtV6Au0Krg5m_S4kjY/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
[ https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711469#comment-16711469 ] Edward Rojas commented on FLINK-11087: -- I just tried out, 1.5.3 to 1.6.2. I get a different error, but with the debugging, I see that the reason is the same, the key serializer and the value serializer are swapped in the OperatorBackendStateMetaInfoReaderV2V3 class. The error is the following: {noformat} org.apache.flink.util.StateMigrationException: State migration is currently not supported. at org.apache.flink.util.StateMigrationException.notSupported(StateMigrationException.java:42) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:263) at org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745){noformat} > Broadcast state migration Incompatibility from 1.5.3 to 1.7.0 > - > > Key: FLINK-11087 > URL: https://issues.apache.org/jira/browse/FLINK-11087 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.7.0 > Environment: Migration from Flink 1.5.3 to Flink 1.7.0 >Reporter: Edward Rojas >Priority: Major > Labels: Migration, State, broadcast > > When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast > state throws the following error: > {noformat} > org.apache.flink.util.StateMigrationException: The new key serializer for > broadcast state must not be incompatible. > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238) > at > org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745){noformat} > The broadcast is using a MapState with StringSerializer as key serializer and > a custom JsonSerializer as value serializer. > There was no changes in the TypeSerializers used, only upgrade of version. > > With some debugging I see that at the moment of the validation of the > compatibility of states in the DefaultOperatorStateBackend class, the > "*registeredBroadcastStates*" containing the data about the 'old' state, > contains wrong association of the key and value serializer. This is, > JsonSerializer appears as key serializer and StringSerializer appears as > value serializer. (when it should be the contrary) > > After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" > class is the responsible of this swap here: > https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on issue #7252: [FLINK-10945] Add an InputDependencyConstraint to avoid resource dead…
zentol commented on issue #7252: [FLINK-10945] Add an InputDependencyConstraint to avoid resource dead… URL: https://github.com/apache/flink/pull/7252#issuecomment-444876798 Please open the PR against the master branch. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711468#comment-16711468 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on issue #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#issuecomment-444876607 Will merge this today or tomorrow depending on how much time I need for cleaning up various smaller things. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on issue #6850: [FLINK-10252] Handle oversized metric messges
zentol commented on issue #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#issuecomment-444876607 Will merge this today or tomorrow depending on how much time I need for cleaning up various smaller things. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore
[ https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711450#comment-16711450 ] ASF GitHub Bot commented on FLINK-11048: mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r239451409 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig } } - @Override - public JobExecutionResult execute(String jobName) throws ProgramInvocationException { - StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); - transformations.clear(); - return executeRemotely(streamGraph, jarFiles); + protected List getJarFiles() throws ProgramInvocationException { + return jarFiles; } /** * Executes the remote job. * -* @param streamGraph -*Stream Graph to execute +* @param streamExecutionEnvironment +*Execution Environment with Stream Graph to execute * @param jarFiles *List of jar file URLs to ship to the cluster * @return The result of the job execution, containing elapsed time and accumulators. */ - protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List jarFiles) throws ProgramInvocationException { + public static JobExecutionResult executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment, Review comment: If we make this `static` it will also break the `@Public` contract. We can introduce a new static method. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Ability to programmatically execute streaming pipeline with savepoint restore > > --- > > Key: FLINK-11048 > URL: https://issues.apache.org/jira/browse/FLINK-11048 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.7.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Minor > Labels: pull-request-available > > RemoteStreamEnvironment.execute doesn't support restore from savepoint, > though the underlying ClusterClient does. Add an explicit "execute remotely" > that can be used by downstream projects. > [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E] > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] maqingxiang opened a new pull request #7254: [hotfix][tableApi] fix tEnv in tableApi docs
maqingxiang opened a new pull request #7254: [hotfix][tableApi] fix tEnv in tableApi docs URL: https://github.com/apache/flink/pull/7254 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore
[ https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711449#comment-16711449 ] ASF GitHub Bot commented on FLINK-11048: mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r239452431 ## File path: flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteStreamEnvironment.java ## @@ -68,30 +67,21 @@ public ScalaShellRemoteStreamEnvironment( this.flinkILoop = flinkILoop; } - /** -* Executes the remote job. -* -* @param streamGraph -*Stream Graph to execute -* @param jarFiles -*List of jar file URLs to ship to the cluster -* @return The result of the job execution, containing elapsed time and accumulators. -*/ - @Override - protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List jarFiles) throws ProgramInvocationException { + protected List getJarFiles() throws ProgramInvocationException { URL jarUrl; try { jarUrl = flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL(); } catch (MalformedURLException e) { + // TODO: we don't have the actual jobID at this point throw new ProgramInvocationException("Could not write the user code classes to disk.", - streamGraph.getJobGraph().getJobID(), e); + new JobID(), e); Review comment: That will generate a random `JobID`, not sure about this. Let's use the constructor without the JobID. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Ability to programmatically execute streaming pipeline with savepoint restore > > --- > > Key: FLINK-11048 > URL: https://issues.apache.org/jira/browse/FLINK-11048 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.7.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Minor > Labels: pull-request-available > > RemoteStreamEnvironment.execute doesn't support restore from savepoint, > though the underlying ClusterClient does. Add an explicit "execute remotely" > that can be used by downstream projects. > [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E] > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore
[ https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711451#comment-16711451 ] ASF GitHub Bot commented on FLINK-11048: mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r239451677 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig } } - @Override - public JobExecutionResult execute(String jobName) throws ProgramInvocationException { - StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); - transformations.clear(); - return executeRemotely(streamGraph, jarFiles); + protected List getJarFiles() throws ProgramInvocationException { + return jarFiles; } /** * Executes the remote job. * -* @param streamGraph -*Stream Graph to execute +* @param streamExecutionEnvironment +*Execution Environment with Stream Graph to execute * @param jarFiles *List of jar file URLs to ship to the cluster * @return The result of the job execution, containing elapsed time and accumulators. */ - protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List jarFiles) throws ProgramInvocationException { + public static JobExecutionResult executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment, + List jarFiles, Review comment: The "most readable option" depends a lot on preference :) But let's stick with what is commonly used nowadays. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Ability to programmatically execute streaming pipeline with savepoint restore > > --- > > Key: FLINK-11048 > URL: https://issues.apache.org/jira/browse/FLINK-11048 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.7.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Minor > Labels: pull-request-available > > RemoteStreamEnvironment.execute doesn't support restore from savepoint, > though the underlying ClusterClient does. Add an explicit "execute remotely" > that can be used by downstream projects. > [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E] > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore
mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r239451409 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig } } - @Override - public JobExecutionResult execute(String jobName) throws ProgramInvocationException { - StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); - transformations.clear(); - return executeRemotely(streamGraph, jarFiles); + protected List getJarFiles() throws ProgramInvocationException { + return jarFiles; } /** * Executes the remote job. * -* @param streamGraph -*Stream Graph to execute +* @param streamExecutionEnvironment +*Execution Environment with Stream Graph to execute * @param jarFiles *List of jar file URLs to ship to the cluster * @return The result of the job execution, containing elapsed time and accumulators. */ - protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List jarFiles) throws ProgramInvocationException { + public static JobExecutionResult executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment, Review comment: If we make this `static` it will also break the `@Public` contract. We can introduce a new static method. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore
[ https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711448#comment-16711448 ] ASF GitHub Bot commented on FLINK-11048: mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r239453780 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig } } - @Override - public JobExecutionResult execute(String jobName) throws ProgramInvocationException { - StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); - transformations.clear(); - return executeRemotely(streamGraph, jarFiles); + protected List getJarFiles() throws ProgramInvocationException { + return jarFiles; } /** * Executes the remote job. * -* @param streamGraph -*Stream Graph to execute +* @param streamExecutionEnvironment +*Execution Environment with Stream Graph to execute * @param jarFiles *List of jar file URLs to ship to the cluster * @return The result of the job execution, containing elapsed time and accumulators. */ - protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List jarFiles) throws ProgramInvocationException { + public static JobExecutionResult executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment, Review comment: Question: Why don't we pass the required dependencies directly here? i.e. `StreamGraph` and `ExecutionConfig` IMHO there is no need to pass the entire environment. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Ability to programmatically execute streaming pipeline with savepoint restore > > --- > > Key: FLINK-11048 > URL: https://issues.apache.org/jira/browse/FLINK-11048 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.7.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Minor > Labels: pull-request-available > > RemoteStreamEnvironment.execute doesn't support restore from savepoint, > though the underlying ClusterClient does. Add an explicit "execute remotely" > that can be used by downstream projects. > [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E] > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore
[ https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711447#comment-16711447 ] ASF GitHub Bot commented on FLINK-11048: mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r239451179 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig } } - @Override - public JobExecutionResult execute(String jobName) throws ProgramInvocationException { - StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); - transformations.clear(); - return executeRemotely(streamGraph, jarFiles); + protected List getJarFiles() throws ProgramInvocationException { + return jarFiles; } /** * Executes the remote job. * -* @param streamGraph -*Stream Graph to execute +* @param streamExecutionEnvironment +*Execution Environment with Stream Graph to execute * @param jarFiles *List of jar file URLs to ship to the cluster * @return The result of the job execution, containing elapsed time and accumulators. */ - protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List jarFiles) throws ProgramInvocationException { + public static JobExecutionResult executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment, Review comment: I agree that we can't do this. The method is `protected` but it would still break subclasses relying on the `@Public` contract. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Ability to programmatically execute streaming pipeline with savepoint restore > > --- > > Key: FLINK-11048 > URL: https://issues.apache.org/jira/browse/FLINK-11048 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.7.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Minor > Labels: pull-request-available > > RemoteStreamEnvironment.execute doesn't support restore from savepoint, > though the underlying ClusterClient does. Add an explicit "execute remotely" > that can be used by downstream projects. > [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E] > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore
mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r239452431 ## File path: flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteStreamEnvironment.java ## @@ -68,30 +67,21 @@ public ScalaShellRemoteStreamEnvironment( this.flinkILoop = flinkILoop; } - /** -* Executes the remote job. -* -* @param streamGraph -*Stream Graph to execute -* @param jarFiles -*List of jar file URLs to ship to the cluster -* @return The result of the job execution, containing elapsed time and accumulators. -*/ - @Override - protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List jarFiles) throws ProgramInvocationException { + protected List getJarFiles() throws ProgramInvocationException { URL jarUrl; try { jarUrl = flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL(); } catch (MalformedURLException e) { + // TODO: we don't have the actual jobID at this point throw new ProgramInvocationException("Could not write the user code classes to disk.", - streamGraph.getJobGraph().getJobID(), e); + new JobID(), e); Review comment: That will generate a random `JobID`, not sure about this. Let's use the constructor without the JobID. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore
mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r239451179 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig } } - @Override - public JobExecutionResult execute(String jobName) throws ProgramInvocationException { - StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); - transformations.clear(); - return executeRemotely(streamGraph, jarFiles); + protected List getJarFiles() throws ProgramInvocationException { + return jarFiles; } /** * Executes the remote job. * -* @param streamGraph -*Stream Graph to execute +* @param streamExecutionEnvironment +*Execution Environment with Stream Graph to execute * @param jarFiles *List of jar file URLs to ship to the cluster * @return The result of the job execution, containing elapsed time and accumulators. */ - protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List jarFiles) throws ProgramInvocationException { + public static JobExecutionResult executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment, Review comment: I agree that we can't do this. The method is `protected` but it would still break subclasses relying on the `@Public` contract. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore
mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r239453780 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig } } - @Override - public JobExecutionResult execute(String jobName) throws ProgramInvocationException { - StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); - transformations.clear(); - return executeRemotely(streamGraph, jarFiles); + protected List getJarFiles() throws ProgramInvocationException { + return jarFiles; } /** * Executes the remote job. * -* @param streamGraph -*Stream Graph to execute +* @param streamExecutionEnvironment +*Execution Environment with Stream Graph to execute * @param jarFiles *List of jar file URLs to ship to the cluster * @return The result of the job execution, containing elapsed time and accumulators. */ - protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List jarFiles) throws ProgramInvocationException { + public static JobExecutionResult executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment, Review comment: Question: Why don't we pass the required dependencies directly here? i.e. `StreamGraph` and `ExecutionConfig` IMHO there is no need to pass the entire environment. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore
mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r239451677 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig } } - @Override - public JobExecutionResult execute(String jobName) throws ProgramInvocationException { - StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); - transformations.clear(); - return executeRemotely(streamGraph, jarFiles); + protected List getJarFiles() throws ProgramInvocationException { + return jarFiles; } /** * Executes the remote job. * -* @param streamGraph -*Stream Graph to execute +* @param streamExecutionEnvironment +*Execution Environment with Stream Graph to execute * @param jarFiles *List of jar file URLs to ship to the cluster * @return The result of the job execution, containing elapsed time and accumulators. */ - protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List jarFiles) throws ProgramInvocationException { + public static JobExecutionResult executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment, + List jarFiles, Review comment: The "most readable option" depends a lot on preference :) But let's stick with what is commonly used nowadays. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9740) Support group windows over intervals of months
[ https://issues.apache.org/jira/browse/FLINK-9740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711442#comment-16711442 ] xuqianjin commented on FLINK-9740: -- hi [~twalthr] The attachment has been submitted > Support group windows over intervals of months > --- > > Key: FLINK-9740 > URL: https://issues.apache.org/jira/browse/FLINK-9740 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Timo Walther >Assignee: Renjie Liu >Priority: Major > Labels: pull-request-available > Attachments: About [FLINK-9740] Support group windows over intervals > of months.pdf > > > Currently, time-based group windows can be defined using intervals of > milliseconds such as {{.window(Tumble over 10.minutes on 'rowtime as 'w)}}. > For some use cases it might useful to define windows of months (esp. in > event-time) that work even with leap years and other special time cases. > The following should be supported in Table API & SQL: > {{.window(Tumble over 1.month on 'rowtime as 'w)}} > {{.window(Tumble over 1.quarter on 'rowtime as 'w)}} > {{.window(Tumble over 1.year on 'rowtime as 'w)}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9740) Support group windows over intervals of months
[ https://issues.apache.org/jira/browse/FLINK-9740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711440#comment-16711440 ] xuqianjin commented on FLINK-9740: -- @ > Support group windows over intervals of months > --- > > Key: FLINK-9740 > URL: https://issues.apache.org/jira/browse/FLINK-9740 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Timo Walther >Assignee: Renjie Liu >Priority: Major > Labels: pull-request-available > Attachments: About [FLINK-9740] Support group windows over intervals > of months.pdf > > > Currently, time-based group windows can be defined using intervals of > milliseconds such as {{.window(Tumble over 10.minutes on 'rowtime as 'w)}}. > For some use cases it might useful to define windows of months (esp. in > event-time) that work even with leap years and other special time cases. > The following should be supported in Table API & SQL: > {{.window(Tumble over 1.month on 'rowtime as 'w)}} > {{.window(Tumble over 1.quarter on 'rowtime as 'w)}} > {{.window(Tumble over 1.year on 'rowtime as 'w)}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Issue Comment Deleted] (FLINK-9740) Support group windows over intervals of months
[ https://issues.apache.org/jira/browse/FLINK-9740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xuqianjin updated FLINK-9740: - Comment: was deleted (was: @) > Support group windows over intervals of months > --- > > Key: FLINK-9740 > URL: https://issues.apache.org/jira/browse/FLINK-9740 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Timo Walther >Assignee: Renjie Liu >Priority: Major > Labels: pull-request-available > Attachments: About [FLINK-9740] Support group windows over intervals > of months.pdf > > > Currently, time-based group windows can be defined using intervals of > milliseconds such as {{.window(Tumble over 10.minutes on 'rowtime as 'w)}}. > For some use cases it might useful to define windows of months (esp. in > event-time) that work even with leap years and other special time cases. > The following should be supported in Table API & SQL: > {{.window(Tumble over 1.month on 'rowtime as 'w)}} > {{.window(Tumble over 1.quarter on 'rowtime as 'w)}} > {{.window(Tumble over 1.year on 'rowtime as 'w)}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9740) Support group windows over intervals of months
[ https://issues.apache.org/jira/browse/FLINK-9740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xuqianjin updated FLINK-9740: - Attachment: About [FLINK-9740] Support group windows over intervals of months.pdf > Support group windows over intervals of months > --- > > Key: FLINK-9740 > URL: https://issues.apache.org/jira/browse/FLINK-9740 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Timo Walther >Assignee: Renjie Liu >Priority: Major > Labels: pull-request-available > Attachments: About [FLINK-9740] Support group windows over intervals > of months.pdf > > > Currently, time-based group windows can be defined using intervals of > milliseconds such as {{.window(Tumble over 10.minutes on 'rowtime as 'w)}}. > For some use cases it might useful to define windows of months (esp. in > event-time) that work even with leap years and other special time cases. > The following should be supported in Table API & SQL: > {{.window(Tumble over 1.month on 'rowtime as 'w)}} > {{.window(Tumble over 1.quarter on 'rowtime as 'w)}} > {{.window(Tumble over 1.year on 'rowtime as 'w)}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
[ https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711437#comment-16711437 ] Chesnay Schepler commented on FLINK-11087: -- Have you tried migration from 1.5 to 1.6 to 1.7? > Broadcast state migration Incompatibility from 1.5.3 to 1.7.0 > - > > Key: FLINK-11087 > URL: https://issues.apache.org/jira/browse/FLINK-11087 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.7.0 > Environment: Migration from Flink 1.5.3 to Flink 1.7.0 >Reporter: Edward Rojas >Priority: Major > Labels: Migration, State, broadcast > > When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast > state throws the following error: > {noformat} > org.apache.flink.util.StateMigrationException: The new key serializer for > broadcast state must not be incompatible. > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238) > at > org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745){noformat} > The broadcast is using a MapState with StringSerializer as key serializer and > a custom JsonSerializer as value serializer. > There was no changes in the TypeSerializers used, only upgrade of version. > > With some debugging I see that at the moment of the validation of the > compatibility of states in the DefaultOperatorStateBackend class, the > "*registeredBroadcastStates*" containing the data about the 'old' state, > contains wrong association of the key and value serializer. This is, > JsonSerializer appears as key serializer and StringSerializer appears as > value serializer. (when it should be the contrary) > > After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" > class is the responsible of this swap here: > https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11085) NoClassDefFoundError in presto-s3 filesystem
[ https://issues.apache.org/jira/browse/FLINK-11085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711433#comment-16711433 ] Chesnay Schepler commented on FLINK-11085: -- [~StephanEwen] What's your take on this? > NoClassDefFoundError in presto-s3 filesystem > > > Key: FLINK-11085 > URL: https://issues.apache.org/jira/browse/FLINK-11085 > Project: Flink > Issue Type: Bug > Components: FileSystem >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Priority: Major > > A user has reporter an issue on the ML where using the presto-s3 filesystem > fails with an exception due to a missing class. The missing class is indeed > filtered out in the shade-plugin configuration. > {code:java} > java.lang.NoClassDefFoundError: > org/apache/flink/fs/s3presto/shaded/com/facebook/presto/hadoop/HadoopFileStatus > at > org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.directory(PrestoS3FileSystem.java:446) > at > org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.delete(PrestoS3FileSystem.java:423) > at > org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147) > at > org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:80) > at > org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:250) > at > org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219) > at > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756) > at > org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$8(JobMaster.java:680) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
Edward Rojas created FLINK-11087: Summary: Broadcast state migration Incompatibility from 1.5.3 to 1.7.0 Key: FLINK-11087 URL: https://issues.apache.org/jira/browse/FLINK-11087 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.7.0 Environment: Migration from Flink 1.5.3 to Flink 1.7.0 Reporter: Edward Rojas When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast state throws the following error: {noformat} org.apache.flink.util.StateMigrationException: The new key serializer for broadcast state must not be incompatible. at org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238) at org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:745){noformat} The broadcast is using a MapState with StringSerializer as key serializer and a custom JsonSerializer as value serializer. There was no changes in the TypeSerializers used, only upgrade of version. With some debugging I see that at the moment of the validation of the compatibility of states in the DefaultOperatorStateBackend class, the "*registeredBroadcastStates*" containing the data about the 'old' state, contains wrong association of the key and value serializer. This is, JsonSerializer appears as key serializer and StringSerializer appears as value serializer. (when it should be the contrary) After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" class is the responsible of this swap here: https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11086) flink-hadoop-compatibility tests fail for 3.x hadoop versions
Sebastian Klemke created FLINK-11086: Summary: flink-hadoop-compatibility tests fail for 3.x hadoop versions Key: FLINK-11086 URL: https://issues.apache.org/jira/browse/FLINK-11086 Project: Flink Issue Type: Bug Components: YARN Reporter: Sebastian Klemke All builds using maven 3.2.5 on commithash ed8ff14ed39d08cd319efe75b40b9742a2ae7558. Attempted builds: - mvn clean install -Dhadoop.version=3.0.3 - mvn clean install -Dhadoop.version=3.1.1 Integration tests with Hadoop input format datasource fail. Example stack trace, taken from hadoop.version 3.1.1 build: {code:java} testJobCollectionExecution(org.apache.flink.test.hadoopcompatibility.mapred.WordCountMapredITCase) Time elapsed: 0.275 sec <<< ERR OR! java.lang.NoClassDefFoundError: org/apache/flink/hadoop/shaded/com/google/re2j/PatternSyntaxException at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.hadoop.fs.Globber.doGlob(Globber.java:210) at org.apache.hadoop.fs.Globber.glob(Globber.java:149) at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:2085) at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:269) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:239) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:325) at org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:150) at org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:58) at org.apache.flink.api.common.operators.GenericDataSourceBase.executeOnCollections(GenericDataSourceBase.java:225) at org.apache.flink.api.common.operators.CollectionExecutor.executeDataSource(CollectionExecutor.java:219) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:155) at org.apache.flink.api.common.operators.CollectionExecutor.executeUnaryOperator(CollectionExecutor.java:229) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:149) at org.apache.flink.api.common.operators.CollectionExecutor.executeUnaryOperator(CollectionExecutor.java:229) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:149) at org.apache.flink.api.common.operators.CollectionExecutor.executeUnaryOperator(CollectionExecutor.java:229) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:149) at org.apache.flink.api.common.operators.CollectionExecutor.executeUnaryOperator(CollectionExecutor.java:229) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:149) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:131) at org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:182) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:158) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:131) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:115) at org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:38) at org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:52) at org.apache.flink.test.hadoopcompatibility.mapred.WordCountMapredITCase.internalRun(WordCountMapredITCase.java:121) at org.apache.flink.test.hadoopcompatibility.mapred.WordCountMapredITCase.testProgram(WordCountMapredITCase.java:71) {code} Maybe hadoop 3.x versions could be added to test matrix as well? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11085) NoClassDefFoundError in presto-s3 filesystem
[ https://issues.apache.org/jira/browse/FLINK-11085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-11085: - Summary: NoClassDefFoundError in presto-s3 filesystem (was: flink-s3-fs-presto) > NoClassDefFoundError in presto-s3 filesystem > > > Key: FLINK-11085 > URL: https://issues.apache.org/jira/browse/FLINK-11085 > Project: Flink > Issue Type: Bug > Components: FileSystem >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Priority: Major > > A user has reporter an issue on the ML where using the presto-s3 filesystem > fails with an exception due to a missing class. The missing class is indeed > filtered out in the shade-plugin configuration. > {code:java} > java.lang.NoClassDefFoundError: > org/apache/flink/fs/s3presto/shaded/com/facebook/presto/hadoop/HadoopFileStatus > at > org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.directory(PrestoS3FileSystem.java:446) > at > org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.delete(PrestoS3FileSystem.java:423) > at > org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147) > at > org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:80) > at > org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:250) > at > org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219) > at > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756) > at > org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$8(JobMaster.java:680) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11085) flink-s3-fs-presto
Chesnay Schepler created FLINK-11085: Summary: flink-s3-fs-presto Key: FLINK-11085 URL: https://issues.apache.org/jira/browse/FLINK-11085 Project: Flink Issue Type: Bug Components: FileSystem Affects Versions: 1.7.0 Reporter: Chesnay Schepler A user has reporter an issue on the ML where using the presto-s3 filesystem fails with an exception due to a missing class. The missing class is indeed filtered out in the shade-plugin configuration. {code:java} java.lang.NoClassDefFoundError: org/apache/flink/fs/s3presto/shaded/com/facebook/presto/hadoop/HadoopFileStatus at org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.directory(PrestoS3FileSystem.java:446) at org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.delete(PrestoS3FileSystem.java:423) at org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147) at org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:80) at org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:250) at org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219) at org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756) at org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$8(JobMaster.java:680) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end
[ https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711387#comment-16711387 ] Chesnay Schepler commented on FLINK-10832: -- The project you attached works for me locally in the IDE, and also when submitting the built project to a 1.6.2, 1.7.0 (taken from [https://dist.apache.org/repos/dist/release/flink)] and 1.8-SNAPSHOT (locally built) cluster. I did not make _any_ modifications to the project. What we can see in the logs you provided is that the JobMaster is not shutting down. The following line is present in my logs, but missing in the attached logs: {code:java} 13:25:56.709 [flink-akka.actor.default-dispatcher-3] INFO o.a.f.runtime.jobmaster.JobMaster - Stopping the JobMaster for job Simple Test(e8a889dc0394b0c664dcce46f36ed0cc).{code} Beyond that the main difference between the logs is this: {code:java} DEBUG o.a.f.runtime.jobmaster.JobMaster - Close ResourceManager connection 6ac59614b23655af1e13f08e16b96ea4. org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null{code} {code:java} DEBUG o.a.f.runtime.jobmaster.JobMaster - Close ResourceManager connection 966422750006990ed1f304766e210161. org.apache.flink.util.FlinkException: JobManager is shutting down.{code} [~till.rohrmann] Do you have any idea? > StreamExecutionEnvironment.execute() does not return when all sources end > - > > Key: FLINK-10832 > URL: https://issues.apache.org/jira/browse/FLINK-10832 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.5, 1.6.2 >Reporter: Arnaud Linz >Priority: Critical > Attachments: flink-10832.zip, log.txt > > > In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), > This code never ends : > *public* *void* testFlink() *throws* Exception { > // get the execution environment > *final* StreamExecutionEnvironment env = > StreamExecutionEnvironment._getExecutionEnvironment_(); > // get input data > *final* DataStreamSource text = env.addSource(*new* > +SourceFunction()+ { > @Override > *public* *void* run(*final* SourceContext ctx) *throws* > Exception { > *for* (*int* count = 0; count < 5; count++) { > ctx.collect(String._valueOf_(count)); > } > } > @Override > *public* *void* cancel() { > } > }); > text.print().setParallelism(1); > env.execute("Simple Test"); > // Never ends ! > } > > It's critical for us as we heavily rely on this "source exhaustion stop" > mechanism to achieve proper stop of streaming applications from their own > code, so it prevents us from using the last flink versions. > > The log extract shows that the local cluster tried to shut down, but could > not do it for no apparent reason: > > {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. > Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to > RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using > default (Memory / JobManager) MemoryStateBackend (data in heap memory / > checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', > asynchronous: TRUE, maxStateSize: 5242880) > (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}} > {{0}} > {{1}} > {{2}} > {{3}} > {{4}} > {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}} > {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom > Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). > (org.apache.flink.runtime.taskmanager.Task:818)}} > {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed > for task Source: Custom Source -> Sink: Print to Std. Out (1/1) > (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] > (org.apache.flink.runtime.taskmanager.Task:845)}} > {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final > execution state FINISHED to JobManager for task Source: Custom Source -> > Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}} > {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,907] INFO Job Simple Test >
[jira] [Commented] (FLINK-7208) Refactor build-in agg(MaxWithRetractAccumulator and MinWithRetractAccumulator) using the DataView
[ https://issues.apache.org/jira/browse/FLINK-7208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711394#comment-16711394 ] ASF GitHub Bot commented on FLINK-7208: --- dianfu commented on a change in pull request #7201: [FLINK-7208] [table] Optimize Min/MaxWithRetractAggFunction with DataView URL: https://github.com/apache/flink/pull/7201#discussion_r239438568 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala ## @@ -203,13 +203,13 @@ class AggregateITCase extends StreamingWithStateTestBase { .groupBy('b) .select('a.count as 'cnt, 'b) .groupBy('cnt) - .select('cnt, 'b.count as 'freq) + .select('cnt, 'b.count as 'freq, 'b.min as 'min, 'b.max as 'max) val results = t.toRetractStream[Row](queryConfig) results.addSink(new RetractingSink) env.execute() -val expected = List("1,1", "2,1", "3,1", "4,1", "5,1", "6,1") +val expected = List("1,1,1,1", "2,1,2,2", "3,1,3,3", "4,1,4,4", "5,1,5,5", "6,1,6,6") Review comment: @walterddr @twalthr Thanks for the suggestion and I have created a PR for FLINK-11074: https://github.com/apache/flink/pull/7253. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Refactor build-in agg(MaxWithRetractAccumulator and > MinWithRetractAccumulator) using the DataView > - > > Key: FLINK-7208 > URL: https://issues.apache.org/jira/browse/FLINK-7208 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: kaibo.zhou >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > > Refactor build-in agg(MaxWithRetractAccumulator and > MinWithRetractAccumulator) using the DataView. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] dianfu commented on a change in pull request #7201: [FLINK-7208] [table] Optimize Min/MaxWithRetractAggFunction with DataView
dianfu commented on a change in pull request #7201: [FLINK-7208] [table] Optimize Min/MaxWithRetractAggFunction with DataView URL: https://github.com/apache/flink/pull/7201#discussion_r239438568 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala ## @@ -203,13 +203,13 @@ class AggregateITCase extends StreamingWithStateTestBase { .groupBy('b) .select('a.count as 'cnt, 'b) .groupBy('cnt) - .select('cnt, 'b.count as 'freq) + .select('cnt, 'b.count as 'freq, 'b.min as 'min, 'b.max as 'max) val results = t.toRetractStream[Row](queryConfig) results.addSink(new RetractingSink) env.execute() -val expected = List("1,1", "2,1", "3,1", "4,1", "5,1", "6,1") +val expected = List("1,1,1,1", "2,1,2,2", "3,1,3,3", "4,1,4,4", "5,1,5,5", "6,1,6,6") Review comment: @walterddr @twalthr Thanks for the suggestion and I have created a PR for FLINK-11074: https://github.com/apache/flink/pull/7253. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11074) Improve the harness test to make it possible test with state backend
[ https://issues.apache.org/jira/browse/FLINK-11074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711388#comment-16711388 ] ASF GitHub Bot commented on FLINK-11074: dianfu opened a new pull request #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction URL: https://github.com/apache/flink/pull/7253 ## What is the purpose of the change *This pull request enables the harness tests to test with RocksdbStateBackend and adds harness test for CollectAggFunction* ## Brief change log - *Adds AggFunctionHarnessTest* - *Fix an issue in CollectAggFunction* ## Verifying this change - *Added tests in AggFunctionHarnessTest* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve the harness test to make it possible test with state backend > > > Key: FLINK-11074 > URL: https://issues.apache.org/jira/browse/FLINK-11074 > Project: Flink > Issue Type: Test > Components: Table API SQL >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > > Currently, the harness test can only test without state backend. If you use a > DataView in the accumulator of the aggregate function, the DataView is a java > object and held in heap, not replaced with StateMapView/StateListView which > values are actually held in the state backend. We should improve the harness > test to make it possible to test with state backend. Otherwise, issues such > as FLINK-10674 could have never been found. With this harness test available, > we could test the built-in aggregate functions which use the DataView more > fine grained. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11074) Improve the harness test to make it possible test with state backend
[ https://issues.apache.org/jira/browse/FLINK-11074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11074: --- Labels: pull-request-available (was: ) > Improve the harness test to make it possible test with state backend > > > Key: FLINK-11074 > URL: https://issues.apache.org/jira/browse/FLINK-11074 > Project: Flink > Issue Type: Test > Components: Table API SQL >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > > Currently, the harness test can only test without state backend. If you use a > DataView in the accumulator of the aggregate function, the DataView is a java > object and held in heap, not replaced with StateMapView/StateListView which > values are actually held in the state backend. We should improve the harness > test to make it possible to test with state backend. Otherwise, issues such > as FLINK-10674 could have never been found. With this harness test available, > we could test the built-in aggregate functions which use the DataView more > fine grained. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] dianfu opened a new pull request #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction
dianfu opened a new pull request #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction URL: https://github.com/apache/flink/pull/7253 ## What is the purpose of the change *This pull request enables the harness tests to test with RocksdbStateBackend and adds harness test for CollectAggFunction* ## Brief change log - *Adds AggFunctionHarnessTest* - *Fix an issue in CollectAggFunction* ## Verifying this change - *Added tests in AggFunctionHarnessTest* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10945) Avoid resource deadlocks for finite stream jobs when resources are limited
[ https://issues.apache.org/jira/browse/FLINK-10945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711363#comment-16711363 ] ASF GitHub Bot commented on FLINK-10945: zhuzhurk opened a new pull request #7252: [FLINK-10945] Add an InputDependencyConstraint to avoid resource dead… URL: https://github.com/apache/flink/pull/7252 …locks for finite stream jobs when resources are limited ## What is the purpose of the change This PR add a job config InputDependencyConstraint, which helps to avoid resource deadlocks in scheduling when resources are limited, as described in [FLINK-10945](https://issues.apache.org/jira/browse/FLINK-10945). ## Brief change log - *Add InputDependencyConstraint to ExecutionConfig* - *Adjust isConsumable interface in IntermediateResultPartition to fit for the data actual consumable definition* - Change current execution lazy scheduling logic(in Execution.scheduleOrUpdateConsumers(edges)) to schedule tasks only if the InputDependencyConstraint is satisfied(an interface ExecutionVertex.checkInputDependencyConstraints is added to serve this purpose). ## Verifying this change This change added tests and can be verified as follows: - *Added IntermediateResultPartitionTest to validate IntermediateResultPartition changes* - *Added ExecutionVertexInputConstraintTest to validate the constraint check logic in ExecutionVertex* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Avoid resource deadlocks for finite stream jobs when resources are limited > -- > > Key: FLINK-10945 > URL: https://issues.apache.org/jira/browse/FLINK-10945 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.7.1 >Reporter: Zhu Zhu >Assignee: Zhu Zhu >Priority: Major > Labels: pull-request-available > > Currently *resource deadlocks* can happen to finite stream jobs(or batch > jobs) when resources are limited. In 2 cases as below: > # Task Y is a pipelined downstream task of task X. When X takes all > resources(slots), Y cannot acquire slots to start, thus the back pressure > will block X to finish > # Task Y is a upstream task of task X. When X takes all resources(slots) and > Y cannot start, X cannot finish as some of its inputs are not finished > > We can avoid case 1 by setting all edges to be BLOCKING to avoid pipeline > back pressure. However, case 2 cannot be avoided as X(downstream task) will > be launched when any of its input result is ready. > To be detailed, say task X has BLOCKING upstream task Y and Z, X can be > launched when Z finishes, though task Y is not launched yet. This pre-launch > behaviour can be beneficial when there are plenty of resources, thus X can > process data from Z earlier before Y finishes its data processing. However, > resource deadlocks may happen when the resources are limited, e.g. in small > sessions. > > I’d propose introducing a constraint named as *InputDependencyConstraint* to > control the scheduling of vertices. It has 2 values: > # *ANY*. The vertex can be scheduled when any of its inputs is consumable. > # *ALL*. The vertex can be scheduled when all of its inputs are consumable. > > The design doc is here. > [https://docs.google.com/document/d/1jpqC7OW_nLOSVOg06_QCWelicVtV6Au0Krg5m_S4kjY/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11026) Rework creation of sql-client connector/format jars
[ https://issues.apache.org/jira/browse/FLINK-11026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11026: --- Labels: pull-request-available (was: ) > Rework creation of sql-client connector/format jars > --- > > Key: FLINK-11026 > URL: https://issues.apache.org/jira/browse/FLINK-11026 > Project: Flink > Issue Type: Improvement > Components: Build System, SQL Client >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > > For the SQL client we currently have a separate {{sql-jars}} profile in > various connectors/formats that create an additional fat jar with a separate > classifier. > One of the core maven mantras is "One artifact per module.", and we see the > importance of this mantra as our current packaging strategy makes it > impossible to provide different NOTICE files for the created jars (regular > and sql-jar). > Currently we would be forced to provide the same file for both jars, which > will cause problems for any downstream users that wants to handle NOTICE > files properly. We would cause the same issue we had with netty, which > categorically claims to be bundling dependencies although it doesn't, forcing > us to manually cut out the valid parts. > My proposal is to move custom packaging strategies into their own module that > depend on the original module. > I will use {{flink-connector-elasticsearch6}} as an example, which packages > both a regular jar without any included dependencies, and a sql jar bundling > everything. > * create a separate > {{flink-sql-connector-elasticsearch6/}}{{flink-connector-elasticsearch6-uber}}{{}} > module > * this module depends on {{flink-connector-elasticsearch6}}, and bundles all > dependencies > * move the current shading logic for the sql jar out of the {{sql-jars}} > profile{{}} > * add a {{sql-jars}} profile to {{flink-connectors}} for skipping the > creation of these jars -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zhuzhurk opened a new pull request #7252: [FLINK-10945] Add an InputDependencyConstraint to avoid resource dead…
zhuzhurk opened a new pull request #7252: [FLINK-10945] Add an InputDependencyConstraint to avoid resource dead… URL: https://github.com/apache/flink/pull/7252 …locks for finite stream jobs when resources are limited ## What is the purpose of the change This PR add a job config InputDependencyConstraint, which helps to avoid resource deadlocks in scheduling when resources are limited, as described in [FLINK-10945](https://issues.apache.org/jira/browse/FLINK-10945). ## Brief change log - *Add InputDependencyConstraint to ExecutionConfig* - *Adjust isConsumable interface in IntermediateResultPartition to fit for the data actual consumable definition* - Change current execution lazy scheduling logic(in Execution.scheduleOrUpdateConsumers(edges)) to schedule tasks only if the InputDependencyConstraint is satisfied(an interface ExecutionVertex.checkInputDependencyConstraints is added to serve this purpose). ## Verifying this change This change added tests and can be verified as follows: - *Added IntermediateResultPartitionTest to validate IntermediateResultPartition changes* - *Added ExecutionVertexInputConstraintTest to validate the constraint check logic in ExecutionVertex* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11026) Rework creation of sql-client connector/format jars
[ https://issues.apache.org/jira/browse/FLINK-11026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711361#comment-16711361 ] ASF GitHub Bot commented on FLINK-11026: zentol opened a new pull request #7251: [FLINK-11026][ES6] Rework creation of fat sql-client jars URL: https://github.com/apache/flink/pull/7251 Based on #7247. ## What is the purpose of the change This PR is a PoC for reworking the packaging of jars specific to the sql-client (which basically are just fat-jars). Only the `flink-connector-elasticsearch6` module is covered here; if accepted the same principle should be applied to the kafka connectors (0.10, 0.11, 2) and all formats. Instead of defining separate shade-plugin execution with a custom artifactSuffix this PR adds a dedicated `flink-sql-connector-elasticsearch6` module which only contains the packaging logic. This is a similar approach that we've already been using for `flink-shaded-hadoop2-uber`. The main motivation for this is licensing; for accurate notice files it is necessary to be able to supply each artifact with distinct NOTICE files. This cannot be done within a single module in a reasonable way. We would have to un-package each created jar, add the appropriate license files, and re-pack them again. We'd end up with tightly-coupled plugin definitions (since the names have to match!) and an overall more complicated (and slower!) build. ## Brief change log * add new `flink-sql-connector-elasticsearch6` module containing the sql-client-specific shade-plugin configuration and apply the following modifications * set `executionId` to `shade-flink` * disable `shadedArtifactAttached` so only a single jar is deployed * remove sql-jar suffix as it is no longer necessary * remove sqlJars profile from `flink-connector-elasticsearch6` * add `sqlJars` profile to `flink-connectors` to support skipping the creation of sql jars ## Verifying this change Covered by sql-client E2E test. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes) ## Documentation I have not checked the documentation yet for references that would have to be changed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Rework creation of sql-client connector/format jars > --- > > Key: FLINK-11026 > URL: https://issues.apache.org/jira/browse/FLINK-11026 > Project: Flink > Issue Type: Improvement > Components: Build System, SQL Client >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > > For the SQL client we currently have a separate {{sql-jars}} profile in > various connectors/formats that create an additional fat jar with a separate > classifier. > One of the core maven mantras is "One artifact per module.", and we see the > importance of this mantra as our current packaging strategy makes it > impossible to provide different NOTICE files for the created jars (regular > and sql-jar). > Currently we would be forced to provide the same file for both jars, which > will cause problems for any downstream users that wants to handle NOTICE > files properly. We would cause the same issue we had with netty, which > categorically claims to be bundling dependencies although it doesn't, forcing > us to manually cut out the valid parts. > My proposal is to move custom packaging strategies into their own module that > depend on the original module. > I will use {{flink-connector-elasticsearch6}} as an example, which packages > both a regular jar without any included dependencies, and a sql jar bundling > everything. > * create a separate > {{flink-sql-connector-elasticsearch6/}}{{flink-connector-elasticsearch6-uber}}{{}} > module > * this module depends on {{flink-connector-elasticsearch6}}, and bundles all > dependencies > * move the current shading logic for the sql jar out of the {{sql-jars}} > profile{{}} > * add a {{sql-jars}} profile to {{flink-connectors}} for skipping the > creation of these jars -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol opened a new pull request #7251: [FLINK-11026][ES6] Rework creation of fat sql-client jars
zentol opened a new pull request #7251: [FLINK-11026][ES6] Rework creation of fat sql-client jars URL: https://github.com/apache/flink/pull/7251 Based on #7247. ## What is the purpose of the change This PR is a PoC for reworking the packaging of jars specific to the sql-client (which basically are just fat-jars). Only the `flink-connector-elasticsearch6` module is covered here; if accepted the same principle should be applied to the kafka connectors (0.10, 0.11, 2) and all formats. Instead of defining separate shade-plugin execution with a custom artifactSuffix this PR adds a dedicated `flink-sql-connector-elasticsearch6` module which only contains the packaging logic. This is a similar approach that we've already been using for `flink-shaded-hadoop2-uber`. The main motivation for this is licensing; for accurate notice files it is necessary to be able to supply each artifact with distinct NOTICE files. This cannot be done within a single module in a reasonable way. We would have to un-package each created jar, add the appropriate license files, and re-pack them again. We'd end up with tightly-coupled plugin definitions (since the names have to match!) and an overall more complicated (and slower!) build. ## Brief change log * add new `flink-sql-connector-elasticsearch6` module containing the sql-client-specific shade-plugin configuration and apply the following modifications * set `executionId` to `shade-flink` * disable `shadedArtifactAttached` so only a single jar is deployed * remove sql-jar suffix as it is no longer necessary * remove sqlJars profile from `flink-connector-elasticsearch6` * add `sqlJars` profile to `flink-connectors` to support skipping the creation of sql jars ## Verifying this change Covered by sql-client E2E test. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes) ## Documentation I have not checked the documentation yet for references that would have to be changed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-10333) Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, CompletedCheckpoints)
[ https://issues.apache.org/jira/browse/FLINK-10333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16710963#comment-16710963 ] TisonKun edited comment on FLINK-10333 at 12/6/18 11:49 AM: Hi [~StephanEwen] and [~till.rohrmann] With an offline discuss with [~xiaogang.shi] we see ZK has a transactional mechanism so that we can ensure only the leader writes ZK. Given this knowledge and the inconsistencies [~till.rohrmann] noticed, before go into reimplementation, I did a survey of the usage of ZK based stores in flink. Ideally there is exact one role who writes a specific znode. There are four types of znodes that flink writes. Besides {{SubmittedJobGraphStore}} written by {{Dispatcher}}, {{CompletedCheckpointStore}} written by {{JobMaster}} and {{MesosWorkerStore}} written by {{MesosResourceManager}}, there is the {{RunningJobsRegistry}} that also has a ZK based implementation. All of the first three write ZK with a heavy “store lock”, but as [~xiaogang.shi] pointing out, it still lacks of atomicity. And with the solution based on ZK transaction — for example, a current {{Dispatcher}} leader {{setData}} with {{check}} for {{election-node-path}} — we can ensure the atomicity while getting rid of the lock. For the last one, {{RunningJobsRegistry}}, situation becomes a bit more complex. {{JobManagerRunner}} is responsible for {{setJobRunning}} and {{setJobFinished}} and {{Dispatcher}} is responsible for {{clearJob}}. This is against the ideal that one role for one znode. Also I notice that the gap between the semantic of {{DONE}} and that of clear is ambiguous. We might try to prevent the same job be processed by an approach other than check an ephemeral {{DONE}} status. What if we replace {{setJobFinished}} with clearing {{RunningJobsRegistry}}? was (Author: tison): Hi [~StephanEwen] and [~till.rohrmann] With an offline discuss with [~xiaogang.shi] we see ZK has a transactional mechanism so that we can ensure only the leader writes ZK. Given this knowledge and the inconsistencies [~till.rohrmann] noticed, before go into reimplementation, I did a survey of the usage of ZK based stores in flink. Ideally there is exact one role who writes a specific znode. There are four types of znodes that flink writes. Besides {{SubmittedJobGraphStore}} written by {{Dispatcher}}, {{CompletedCheckpointStore}} written by {{JobMaster}} and {{MesosWorkerStore}} written by {{MesosResourceManager}}, there is the {{RunningJobsRegistry}} that also has a ZK based implementation. All of the first three write ZK with a heavy “store lock”, but as [~xiaogang.shi] pointing out, it still lacks of atomicity. And with the solution based on ZK transaction — for example, a current {{Dispatcher}} leader {{setData}} with {{check}} for {{election-node-path}} — we can ensure the atomicity while getting rid of the lock. For the last one, {{RunningJobsRegistry}}, situation becomes a bit more complex. {{JobManagerRunner}} is responsible for {{setJobRunning}} and {{setJobFinished}} and {{Dispatcher}} is responsible for {{clearJob}}. This is against the ideal that one role for one znode. Also I notice that the gap between the semantic of {{DONE}} and that of clear is ambiguous. {{JobSchedulingStatus}} becomes {{DONE}} only if an {{ArchivedExecutionGraph}} generated so that we can prevent the same job be processed by an approach other than check an ephemeral {{DONE}} status. What if we replace {{setJobFinished}} with clearing {{RunningJobsRegistry}}? > Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, > CompletedCheckpoints) > - > > Key: FLINK-10333 > URL: https://issues.apache.org/jira/browse/FLINK-10333 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Priority: Major > Fix For: 1.8.0 > > > While going over the ZooKeeper based stores > ({{ZooKeeperSubmittedJobGraphStore}}, {{ZooKeeperMesosWorkerStore}}, > {{ZooKeeperCompletedCheckpointStore}}) and the underlying > {{ZooKeeperStateHandleStore}} I noticed several inconsistencies which were > introduced with past incremental changes. > * Depending whether {{ZooKeeperStateHandleStore#getAllSortedByNameAndLock}} > or {{ZooKeeperStateHandleStore#getAllAndLock}} is called, deserialization > problems will either lead to removing the Znode or not > * {{ZooKeeperStateHandleStore}} leaves inconsistent state in case of > exceptions (e.g. {{#getAllAndLock}} won't release the acquired locks in case > of a failure) > * {{ZooKeeperStateHandleStore}} has too many responsibilities. It would be > better to move {{RetrievableStateStorageHelper}} out of it for a better > separation
[jira] [Updated] (FLINK-10945) Avoid resource deadlocks for finite stream jobs when resources are limited
[ https://issues.apache.org/jira/browse/FLINK-10945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10945: --- Labels: pull-request-available (was: ) > Avoid resource deadlocks for finite stream jobs when resources are limited > -- > > Key: FLINK-10945 > URL: https://issues.apache.org/jira/browse/FLINK-10945 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.7.1 >Reporter: Zhu Zhu >Assignee: Zhu Zhu >Priority: Major > Labels: pull-request-available > > Currently *resource deadlocks* can happen to finite stream jobs(or batch > jobs) when resources are limited. In 2 cases as below: > # Task Y is a pipelined downstream task of task X. When X takes all > resources(slots), Y cannot acquire slots to start, thus the back pressure > will block X to finish > # Task Y is a upstream task of task X. When X takes all resources(slots) and > Y cannot start, X cannot finish as some of its inputs are not finished > > We can avoid case 1 by setting all edges to be BLOCKING to avoid pipeline > back pressure. However, case 2 cannot be avoided as X(downstream task) will > be launched when any of its input result is ready. > To be detailed, say task X has BLOCKING upstream task Y and Z, X can be > launched when Z finishes, though task Y is not launched yet. This pre-launch > behaviour can be beneficial when there are plenty of resources, thus X can > process data from Z earlier before Y finishes its data processing. However, > resource deadlocks may happen when the resources are limited, e.g. in small > sessions. > > I’d propose introducing a constraint named as *InputDependencyConstraint* to > control the scheduling of vertices. It has 2 values: > # *ANY*. The vertex can be scheduled when any of its inputs is consumable. > # *ALL*. The vertex can be scheduled when all of its inputs are consumable. > > The design doc is here. > [https://docs.google.com/document/d/1jpqC7OW_nLOSVOg06_QCWelicVtV6Au0Krg5m_S4kjY/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10945) Avoid resource deadlocks for finite stream jobs when resources are limited
[ https://issues.apache.org/jira/browse/FLINK-10945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711338#comment-16711338 ] ASF GitHub Bot commented on FLINK-10945: zhuzhurk opened a new pull request #7250: [FLINK-10945] Add an InputDependencyConstraint to avoid resource dead… URL: https://github.com/apache/flink/pull/7250 …locks for finite stream jobs when resources are limited ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Avoid resource deadlocks for finite stream jobs when resources are limited > -- > > Key: FLINK-10945 > URL: https://issues.apache.org/jira/browse/FLINK-10945 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.7.1 >Reporter: Zhu Zhu >Assignee: Zhu Zhu >Priority: Major > Labels: pull-request-available > > Currently *resource deadlocks* can happen to finite stream jobs(or batch > jobs) when resources are limited. In 2 cases as below: > # Task Y is a pipelined downstream task of task X. When X takes all > resources(slots), Y cannot acquire slots to start, thus the back pressure > will block X to finish > # Task Y is a upstream task of task X. When X takes all resources(slots) and > Y cannot start, X cannot finish as some of its inputs are not finished > > We can avoid case 1 by setting all edges to be BLOCKING to avoid pipeline > back pressure. However, case 2 cannot be avoided as X(downstream task) will > be launched when any of its input result is ready. > To be detailed, say task X has BLOCKING upstream task Y and Z, X can be > launched when Z finishes, though task Y is not launched yet. This pre-launch > behaviour can be beneficial when there are plenty of resources, thus X can > process data from Z earlier before Y finishes its data processing. However, > resource deadlocks may happen when the resources are limited, e.g. in small > sessions. > > I’d propose introducing a constraint named as *InputDependencyConstraint* to > control the scheduling of vertices. It has 2 values: > # *ANY*. The vertex can be scheduled when any of its inputs is consumable. > # *ALL*. The vertex can be scheduled when all of its inputs are consumable. > > The design doc is here. >
[jira] [Commented] (FLINK-10945) Avoid resource deadlocks for finite stream jobs when resources are limited
[ https://issues.apache.org/jira/browse/FLINK-10945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711340#comment-16711340 ] ASF GitHub Bot commented on FLINK-10945: zhuzhurk closed pull request #7250: [FLINK-10945] Add an InputDependencyConstraint to avoid resource dead… URL: https://github.com/apache/flink/pull/7250 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 3b3132b3d11..6def1dfa488 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -159,6 +159,9 @@ /** Determines if a task fails or not if there is an error in writing its checkpoint data. Default: true */ private boolean failTaskOnCheckpointError = true; + /** The input dependency constraint to schedule tasks. */ + private InputDependencyConstraint inputDependencyConstraint = InputDependencyConstraint.ANY; + // --- User code values private GlobalJobParameters globalJobParameters; @@ -518,6 +521,30 @@ public ExecutionMode getExecutionMode() { return executionMode; } + /** +* Sets the input dependency constraint for vertex scheduling. It indicates when a task +* should be scheduled considering its inputs status. +* +* The default constraint is {@link InputDependencyConstraint#ANY}. +* +* @param inputDependencyConstraint The input dependency constraint. +*/ + public void setInputDependencyConstraint(InputDependencyConstraint inputDependencyConstraint) { + this.inputDependencyConstraint = inputDependencyConstraint; + } + + /** +* Gets the input dependency constraint for vertex scheduling. It indicates when a task +* should be scheduled considering its inputs status. +* +* The default constraint is {@link InputDependencyConstraint#ANY}. +* +* @return The input dependency constraint of this job. +*/ + public InputDependencyConstraint getInputDependencyConstraint() { + return inputDependencyConstraint; + } + /** * Force TypeExtractor to use Kryo serializer for POJOS even though we could analyze as POJO. * In some cases this might be preferable. For example, when using interfaces diff --git a/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java b/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java new file mode 100644 index 000..c7a1e6a3519 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common; + +/** + * This constraint indicates when a task should be scheduled considering its inputs status. + */ +public enum InputDependencyConstraint { + + /** +* Schedule the task if any input is consumable. +*/ + ANY, + + /** +* Schedule the task if all the inputs are consumable. +*/ + ALL +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java index 7c2b30db32a..15bdb36f1d3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java @@ -101,7 +101,8 @@ public String
[GitHub] zhuzhurk closed pull request #7250: [FLINK-10945] Add an InputDependencyConstraint to avoid resource dead…
zhuzhurk closed pull request #7250: [FLINK-10945] Add an InputDependencyConstraint to avoid resource dead… URL: https://github.com/apache/flink/pull/7250 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 3b3132b3d11..6def1dfa488 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -159,6 +159,9 @@ /** Determines if a task fails or not if there is an error in writing its checkpoint data. Default: true */ private boolean failTaskOnCheckpointError = true; + /** The input dependency constraint to schedule tasks. */ + private InputDependencyConstraint inputDependencyConstraint = InputDependencyConstraint.ANY; + // --- User code values private GlobalJobParameters globalJobParameters; @@ -518,6 +521,30 @@ public ExecutionMode getExecutionMode() { return executionMode; } + /** +* Sets the input dependency constraint for vertex scheduling. It indicates when a task +* should be scheduled considering its inputs status. +* +* The default constraint is {@link InputDependencyConstraint#ANY}. +* +* @param inputDependencyConstraint The input dependency constraint. +*/ + public void setInputDependencyConstraint(InputDependencyConstraint inputDependencyConstraint) { + this.inputDependencyConstraint = inputDependencyConstraint; + } + + /** +* Gets the input dependency constraint for vertex scheduling. It indicates when a task +* should be scheduled considering its inputs status. +* +* The default constraint is {@link InputDependencyConstraint#ANY}. +* +* @return The input dependency constraint of this job. +*/ + public InputDependencyConstraint getInputDependencyConstraint() { + return inputDependencyConstraint; + } + /** * Force TypeExtractor to use Kryo serializer for POJOS even though we could analyze as POJO. * In some cases this might be preferable. For example, when using interfaces diff --git a/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java b/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java new file mode 100644 index 000..c7a1e6a3519 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common; + +/** + * This constraint indicates when a task should be scheduled considering its inputs status. + */ +public enum InputDependencyConstraint { + + /** +* Schedule the task if any input is consumable. +*/ + ANY, + + /** +* Schedule the task if all the inputs are consumable. +*/ + ALL +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java index 7c2b30db32a..15bdb36f1d3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java @@ -101,7 +101,8 @@ public String toString() { final ResultPartitionLocation partitionLocation; // The producing task needs to be RUNNING or already FINISHED - if (consumedPartition.isConsumable() &&
[GitHub] zhuzhurk opened a new pull request #7250: [FLINK-10945] Add an InputDependencyConstraint to avoid resource dead…
zhuzhurk opened a new pull request #7250: [FLINK-10945] Add an InputDependencyConstraint to avoid resource dead… URL: https://github.com/apache/flink/pull/7250 …locks for finite stream jobs when resources are limited ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10964) sql-client throws exception when paging through finished batch query
[ https://issues.apache.org/jira/browse/FLINK-10964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711334#comment-16711334 ] vinoyang commented on FLINK-10964: -- [~twalthr] Thank you for providing more information. I personally do not tend to introduce the timer, there are already a lot of configuration items, we can delete the result of the job directly when calling cacelQuery. > sql-client throws exception when paging through finished batch query > - > > Key: FLINK-10964 > URL: https://issues.apache.org/jira/browse/FLINK-10964 > Project: Flink > Issue Type: Bug > Components: SQL Client >Reporter: Seth Wiesman >Assignee: vinoyang >Priority: Major > > When paging through a batch query in state 'Finished' the sql client throws > the following exception: > {code:java} > org.apache.flink.table.client.gateway.SqlExecutionException: Could not find a > result with result identifier '0c7dce30d287fdd13b934fbefe5a38d1'.{code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11011) Elasticsearch 6 sink end-to-end test unstable
[ https://issues.apache.org/jira/browse/FLINK-11011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711289#comment-16711289 ] ASF GitHub Bot commented on FLINK-11011: tillrohrmann closed pull request #7216: [FLINK-11011][E2E][JM] Log error messages about null CheckpointCoordinator only if job is running URL: https://github.com/apache/flink/pull/7216 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 40a675aca31..0b245e74ff4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -683,8 +683,12 @@ public void acknowledgeCheckpoint( } }); } else { - log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator", - jobGraph.getJobID()); + String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator"; + if (executionGraph.getState() == JobStatus.RUNNING) { + log.error(errorMessage, jobGraph.getJobID()); + } else { + log.debug(errorMessage, jobGraph.getJobID()); + } } } @@ -702,8 +706,12 @@ public void declineCheckpoint(DeclineCheckpoint decline) { } }); } else { - log.error("Received DeclineCheckpoint message for job {} with no CheckpointCoordinator", - jobGraph.getJobID()); + String errorMessage = "Received DeclineCheckpoint message for job {} with no CheckpointCoordinator"; + if (executionGraph.getState() == JobStatus.RUNNING) { + log.error(errorMessage, jobGraph.getJobID()); + } else { + log.debug(errorMessage, jobGraph.getJobID()); + } } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Elasticsearch 6 sink end-to-end test unstable > - > > Key: FLINK-11011 > URL: https://issues.apache.org/jira/browse/FLINK-11011 > Project: Flink > Issue Type: Bug > Components: E2E Tests >Affects Versions: 1.8.0, 1.7.1 >Reporter: Timo Walther >Assignee: Andrey Zagrebin >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1 > > > The log contains errors: > {code} > 2018-11-26 12:55:02,363 ERROR org.apache.flink.runtime.jobmaster.JobMaster - > Received DeclineCheckpoint message for job 1a7516a04fb0cc85bdb3aa21548bd9bb > with no CheckpointCoordinator > {code} > > See also: https://api.travis-ci.org/v3/job/459693461/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10482) java.lang.IllegalArgumentException: Negative number of in progress checkpoints
[ https://issues.apache.org/jira/browse/FLINK-10482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-10482: - Assignee: Andrey Zagrebin > java.lang.IllegalArgumentException: Negative number of in progress checkpoints > -- > > Key: FLINK-10482 > URL: https://issues.apache.org/jira/browse/FLINK-10482 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.6.1 >Reporter: Julio Biason >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.3, 1.8.0, 1.7.1 > > > Recently I found the following log on my JobManager log: > {noformat} > 2018-10-02 17:44:50,090 [flink-akka.actor.default-dispatcher-4117] ERROR > org.apache.flink.runtime.rest.handler.job.JobDetailsHandler - Implementation > error: Unhandled exception. > java.lang.IllegalArgumentException: Negative number of in progress > checkpoints > at > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139) > at > org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.(CheckpointStatsCounts.java:72) > at > org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.createSnapshot(CheckpointStatsCounts.java:177) > at > org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.createSnapshot(CheckpointStatsTracker.java:166) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.getCheckpointStatsSnapshot(ExecutionGraph.java:553) > at > org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:340) > at > org.apache.flink.runtime.jobmaster.JobMaster.requestJob(JobMaster.java:923) > at sun.reflect.GeneratedMethodAccessor101.invoke(Unknown Source) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247) > > > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) > > > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) > > > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) > > > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > > > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > > > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > > > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > > > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {noformat} > Related: The job details don't appear,
[jira] [Updated] (FLINK-10482) java.lang.IllegalArgumentException: Negative number of in progress checkpoints
[ https://issues.apache.org/jira/browse/FLINK-10482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-10482: -- Fix Version/s: 1.7.1 1.6.3 > java.lang.IllegalArgumentException: Negative number of in progress checkpoints > -- > > Key: FLINK-10482 > URL: https://issues.apache.org/jira/browse/FLINK-10482 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.6.1 >Reporter: Julio Biason >Priority: Major > Labels: pull-request-available > Fix For: 1.6.3, 1.8.0, 1.7.1 > > > Recently I found the following log on my JobManager log: > {noformat} > 2018-10-02 17:44:50,090 [flink-akka.actor.default-dispatcher-4117] ERROR > org.apache.flink.runtime.rest.handler.job.JobDetailsHandler - Implementation > error: Unhandled exception. > java.lang.IllegalArgumentException: Negative number of in progress > checkpoints > at > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139) > at > org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.(CheckpointStatsCounts.java:72) > at > org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.createSnapshot(CheckpointStatsCounts.java:177) > at > org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.createSnapshot(CheckpointStatsTracker.java:166) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.getCheckpointStatsSnapshot(ExecutionGraph.java:553) > at > org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:340) > at > org.apache.flink.runtime.jobmaster.JobMaster.requestJob(JobMaster.java:923) > at sun.reflect.GeneratedMethodAccessor101.invoke(Unknown Source) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247) > > > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) > > > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) > > > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) > > > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > > > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > > > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > > > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > > > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {noformat} > Related: The job details don't appear, the screen shows only the
[jira] [Resolved] (FLINK-11011) Elasticsearch 6 sink end-to-end test unstable
[ https://issues.apache.org/jira/browse/FLINK-11011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-11011. --- Resolution: Fixed Fixed via 1.8.0: https://github.com/apache/flink/commit/64f6d0b06f974649c21f10cd4bf97a6a9d742aa6 1.7.1: https://github.com/apache/flink/commit/1c1de7415002fc889510e8c84f807efbce149e5a 1.6.3: https://github.com/apache/flink/commit/d9c1bf7a2cda85a0540f6f6c69607d0fd8c2598d 1.5.6: https://github.com/apache/flink/commit/47b2351159b09396d8bf288f25bc3ebfab99f50f > Elasticsearch 6 sink end-to-end test unstable > - > > Key: FLINK-11011 > URL: https://issues.apache.org/jira/browse/FLINK-11011 > Project: Flink > Issue Type: Bug > Components: E2E Tests >Affects Versions: 1.8.0, 1.7.1 >Reporter: Timo Walther >Assignee: Andrey Zagrebin >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1 > > > The log contains errors: > {code} > 2018-11-26 12:55:02,363 ERROR org.apache.flink.runtime.jobmaster.JobMaster - > Received DeclineCheckpoint message for job 1a7516a04fb0cc85bdb3aa21548bd9bb > with no CheckpointCoordinator > {code} > > See also: https://api.travis-ci.org/v3/job/459693461/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11011) Elasticsearch 6 sink end-to-end test unstable
[ https://issues.apache.org/jira/browse/FLINK-11011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711287#comment-16711287 ] ASF GitHub Bot commented on FLINK-11011: tillrohrmann closed pull request #7223: [FLINK-11011][E2E][JM] Log error messages about null CheckpointCoordinator only if job is running URL: https://github.com/apache/flink/pull/7223 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index c8a8e882913..a89079dea25 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -681,8 +681,12 @@ public void acknowledgeCheckpoint( } }); } else { - log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator", - jobGraph.getJobID()); + String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator"; + if (executionGraph.getState() == JobStatus.RUNNING) { + log.error(errorMessage, jobGraph.getJobID()); + } else { + log.debug(errorMessage, jobGraph.getJobID()); + } } } @@ -700,8 +704,12 @@ public void declineCheckpoint(DeclineCheckpoint decline) { } }); } else { - log.error("Received DeclineCheckpoint message for job {} with no CheckpointCoordinator", - jobGraph.getJobID()); + String errorMessage = "Received DeclineCheckpoint message for job {} with no CheckpointCoordinator"; + if (executionGraph.getState() == JobStatus.RUNNING) { + log.error(errorMessage, jobGraph.getJobID()); + } else { + log.debug(errorMessage, jobGraph.getJobID()); + } } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Elasticsearch 6 sink end-to-end test unstable > - > > Key: FLINK-11011 > URL: https://issues.apache.org/jira/browse/FLINK-11011 > Project: Flink > Issue Type: Bug > Components: E2E Tests >Affects Versions: 1.8.0, 1.7.1 >Reporter: Timo Walther >Assignee: Andrey Zagrebin >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1 > > > The log contains errors: > {code} > 2018-11-26 12:55:02,363 ERROR org.apache.flink.runtime.jobmaster.JobMaster - > Received DeclineCheckpoint message for job 1a7516a04fb0cc85bdb3aa21548bd9bb > with no CheckpointCoordinator > {code} > > See also: https://api.travis-ci.org/v3/job/459693461/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11011) Elasticsearch 6 sink end-to-end test unstable
[ https://issues.apache.org/jira/browse/FLINK-11011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711288#comment-16711288 ] ASF GitHub Bot commented on FLINK-11011: tillrohrmann closed pull request #7224: [FLINK-11011][E2E][JM] Log error messages about null CheckpointCoordinator only if job is running URL: https://github.com/apache/flink/pull/7224 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 40a675aca31..0b245e74ff4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -683,8 +683,12 @@ public void acknowledgeCheckpoint( } }); } else { - log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator", - jobGraph.getJobID()); + String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator"; + if (executionGraph.getState() == JobStatus.RUNNING) { + log.error(errorMessage, jobGraph.getJobID()); + } else { + log.debug(errorMessage, jobGraph.getJobID()); + } } } @@ -702,8 +706,12 @@ public void declineCheckpoint(DeclineCheckpoint decline) { } }); } else { - log.error("Received DeclineCheckpoint message for job {} with no CheckpointCoordinator", - jobGraph.getJobID()); + String errorMessage = "Received DeclineCheckpoint message for job {} with no CheckpointCoordinator"; + if (executionGraph.getState() == JobStatus.RUNNING) { + log.error(errorMessage, jobGraph.getJobID()); + } else { + log.debug(errorMessage, jobGraph.getJobID()); + } } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Elasticsearch 6 sink end-to-end test unstable > - > > Key: FLINK-11011 > URL: https://issues.apache.org/jira/browse/FLINK-11011 > Project: Flink > Issue Type: Bug > Components: E2E Tests >Affects Versions: 1.8.0, 1.7.1 >Reporter: Timo Walther >Assignee: Andrey Zagrebin >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1 > > > The log contains errors: > {code} > 2018-11-26 12:55:02,363 ERROR org.apache.flink.runtime.jobmaster.JobMaster - > Received DeclineCheckpoint message for job 1a7516a04fb0cc85bdb3aa21548bd9bb > with no CheckpointCoordinator > {code} > > See also: https://api.travis-ci.org/v3/job/459693461/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann closed pull request #7216: [FLINK-11011][E2E][JM] Log error messages about null CheckpointCoordinator only if job is running
tillrohrmann closed pull request #7216: [FLINK-11011][E2E][JM] Log error messages about null CheckpointCoordinator only if job is running URL: https://github.com/apache/flink/pull/7216 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 40a675aca31..0b245e74ff4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -683,8 +683,12 @@ public void acknowledgeCheckpoint( } }); } else { - log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator", - jobGraph.getJobID()); + String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator"; + if (executionGraph.getState() == JobStatus.RUNNING) { + log.error(errorMessage, jobGraph.getJobID()); + } else { + log.debug(errorMessage, jobGraph.getJobID()); + } } } @@ -702,8 +706,12 @@ public void declineCheckpoint(DeclineCheckpoint decline) { } }); } else { - log.error("Received DeclineCheckpoint message for job {} with no CheckpointCoordinator", - jobGraph.getJobID()); + String errorMessage = "Received DeclineCheckpoint message for job {} with no CheckpointCoordinator"; + if (executionGraph.getState() == JobStatus.RUNNING) { + log.error(errorMessage, jobGraph.getJobID()); + } else { + log.debug(errorMessage, jobGraph.getJobID()); + } } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann closed pull request #7224: [FLINK-11011][E2E][JM] Log error messages about null CheckpointCoordinator only if job is running
tillrohrmann closed pull request #7224: [FLINK-11011][E2E][JM] Log error messages about null CheckpointCoordinator only if job is running URL: https://github.com/apache/flink/pull/7224 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 40a675aca31..0b245e74ff4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -683,8 +683,12 @@ public void acknowledgeCheckpoint( } }); } else { - log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator", - jobGraph.getJobID()); + String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator"; + if (executionGraph.getState() == JobStatus.RUNNING) { + log.error(errorMessage, jobGraph.getJobID()); + } else { + log.debug(errorMessage, jobGraph.getJobID()); + } } } @@ -702,8 +706,12 @@ public void declineCheckpoint(DeclineCheckpoint decline) { } }); } else { - log.error("Received DeclineCheckpoint message for job {} with no CheckpointCoordinator", - jobGraph.getJobID()); + String errorMessage = "Received DeclineCheckpoint message for job {} with no CheckpointCoordinator"; + if (executionGraph.getState() == JobStatus.RUNNING) { + log.error(errorMessage, jobGraph.getJobID()); + } else { + log.debug(errorMessage, jobGraph.getJobID()); + } } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann closed pull request #7223: [FLINK-11011][E2E][JM] Log error messages about null CheckpointCoordinator only if job is running
tillrohrmann closed pull request #7223: [FLINK-11011][E2E][JM] Log error messages about null CheckpointCoordinator only if job is running URL: https://github.com/apache/flink/pull/7223 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index c8a8e882913..a89079dea25 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -681,8 +681,12 @@ public void acknowledgeCheckpoint( } }); } else { - log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator", - jobGraph.getJobID()); + String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator"; + if (executionGraph.getState() == JobStatus.RUNNING) { + log.error(errorMessage, jobGraph.getJobID()); + } else { + log.debug(errorMessage, jobGraph.getJobID()); + } } } @@ -700,8 +704,12 @@ public void declineCheckpoint(DeclineCheckpoint decline) { } }); } else { - log.error("Received DeclineCheckpoint message for job {} with no CheckpointCoordinator", - jobGraph.getJobID()); + String errorMessage = "Received DeclineCheckpoint message for job {} with no CheckpointCoordinator"; + if (executionGraph.getState() == JobStatus.RUNNING) { + log.error(errorMessage, jobGraph.getJobID()); + } else { + log.debug(errorMessage, jobGraph.getJobID()); + } } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11011) Elasticsearch 6 sink end-to-end test unstable
[ https://issues.apache.org/jira/browse/FLINK-11011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711285#comment-16711285 ] ASF GitHub Bot commented on FLINK-11011: tillrohrmann closed pull request #7222: [FLINK-11011][E2E][JM] Log error messages about null CheckpointCoordinator only if job is running URL: https://github.com/apache/flink/pull/7222 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 269f23e6781..0b0b82baa95 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -680,8 +680,12 @@ public void acknowledgeCheckpoint( } }); } else { - log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator", - jobGraph.getJobID()); + String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator"; + if (executionGraph.getState() == JobStatus.RUNNING) { + log.error(errorMessage, jobGraph.getJobID()); + } else { + log.debug(errorMessage, jobGraph.getJobID()); + } } } @@ -699,8 +703,12 @@ public void declineCheckpoint(DeclineCheckpoint decline) { } }); } else { - log.error("Received DeclineCheckpoint message for job {} with no CheckpointCoordinator", - jobGraph.getJobID()); + String errorMessage = "Received DeclineCheckpoint message for job {} with no CheckpointCoordinator"; + if (executionGraph.getState() == JobStatus.RUNNING) { + log.error(errorMessage, jobGraph.getJobID()); + } else { + log.debug(errorMessage, jobGraph.getJobID()); + } } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Elasticsearch 6 sink end-to-end test unstable > - > > Key: FLINK-11011 > URL: https://issues.apache.org/jira/browse/FLINK-11011 > Project: Flink > Issue Type: Bug > Components: E2E Tests >Affects Versions: 1.8.0, 1.7.1 >Reporter: Timo Walther >Assignee: Andrey Zagrebin >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1 > > > The log contains errors: > {code} > 2018-11-26 12:55:02,363 ERROR org.apache.flink.runtime.jobmaster.JobMaster - > Received DeclineCheckpoint message for job 1a7516a04fb0cc85bdb3aa21548bd9bb > with no CheckpointCoordinator > {code} > > See also: https://api.travis-ci.org/v3/job/459693461/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann closed pull request #7222: [FLINK-11011][E2E][JM] Log error messages about null CheckpointCoordinator only if job is running
tillrohrmann closed pull request #7222: [FLINK-11011][E2E][JM] Log error messages about null CheckpointCoordinator only if job is running URL: https://github.com/apache/flink/pull/7222 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 269f23e6781..0b0b82baa95 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -680,8 +680,12 @@ public void acknowledgeCheckpoint( } }); } else { - log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator", - jobGraph.getJobID()); + String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator"; + if (executionGraph.getState() == JobStatus.RUNNING) { + log.error(errorMessage, jobGraph.getJobID()); + } else { + log.debug(errorMessage, jobGraph.getJobID()); + } } } @@ -699,8 +703,12 @@ public void declineCheckpoint(DeclineCheckpoint decline) { } }); } else { - log.error("Received DeclineCheckpoint message for job {} with no CheckpointCoordinator", - jobGraph.getJobID()); + String errorMessage = "Received DeclineCheckpoint message for job {} with no CheckpointCoordinator"; + if (executionGraph.getState() == JobStatus.RUNNING) { + log.error(errorMessage, jobGraph.getJobID()); + } else { + log.debug(errorMessage, jobGraph.getJobID()); + } } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-10997) Avro-confluent-registry does not bundle any dependency
[ https://issues.apache.org/jira/browse/FLINK-10997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-10997. Resolution: Fixed Fix Version/s: 1.7.1 1.8.0 1.6.3 master: 339ec175f558d23aaabffa3e856809518f238f2e 1.7: 5b945f73ea47364badb12ca0b9c45604b8028e73 1.6: 4b9b367ef1bb1ddff4bf015de2a15c561b134135 > Avro-confluent-registry does not bundle any dependency > -- > > Key: FLINK-10997 > URL: https://issues.apache.org/jira/browse/FLINK-10997 > Project: Flink > Issue Type: Bug > Components: Formats, Table API SQL >Affects Versions: 1.6.2, 1.7.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.3, 1.8.0, 1.7.1 > > > The {{flink-avro-confluent-registry}} is not bundling any dependencies, yet > defines a relocation for the transitive jackson dependency pulled in by > {{kafka-schema-registry-client}}. > It is like that the registry-client should be included in the jar. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore
[ https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711264#comment-16711264 ] ASF GitHub Bot commented on FLINK-11048: zentol commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r239406192 ## File path: flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteStreamEnvironment.java ## @@ -68,30 +67,21 @@ public ScalaShellRemoteStreamEnvironment( this.flinkILoop = flinkILoop; } - /** -* Executes the remote job. -* -* @param streamGraph -*Stream Graph to execute -* @param jarFiles -*List of jar file URLs to ship to the cluster -* @return The result of the job execution, containing elapsed time and accumulators. -*/ - @Override - protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List jarFiles) throws ProgramInvocationException { + protected List getJarFiles() throws ProgramInvocationException { Review comment: missing `@Override`, also applies to other classes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Ability to programmatically execute streaming pipeline with savepoint restore > > --- > > Key: FLINK-11048 > URL: https://issues.apache.org/jira/browse/FLINK-11048 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.7.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Minor > Labels: pull-request-available > > RemoteStreamEnvironment.execute doesn't support restore from savepoint, > though the underlying ClusterClient does. Add an explicit "execute remotely" > that can be used by downstream projects. > [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E] > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10997) Avro-confluent-registry does not bundle any dependency
[ https://issues.apache.org/jira/browse/FLINK-10997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711269#comment-16711269 ] ASF GitHub Bot commented on FLINK-10997: zentol closed pull request #7170: [FLINK-10997][formats] Bundle kafka-scheme-registry-client URL: https://github.com/apache/flink/pull/7170 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-formats/flink-avro-confluent-registry/pom.xml b/flink-formats/flink-avro-confluent-registry/pom.xml index 72a74ac11ba..4c99f71e544 100644 --- a/flink-formats/flink-avro-confluent-registry/pom.xml +++ b/flink-formats/flink-avro-confluent-registry/pom.xml @@ -78,10 +78,31 @@ under the License. shade + false + + + io.confluent:* + com.fasterxml.jackson.core:* + org.apache.zookeeper:zookeeper + com.101tec:zkclient + + - com.fasterxml.jackson.core - org.apache.flink.formats.avro.registry.confluent.shaded.com.fasterxml.jackson.core + com.fasterxml.jackson + org.apache.flink.formats.avro.registry.confluent.shaded.com.fasterxml.jackson + + + org.apache.zookeeper + org.apache.flink.formats.avro.registry.confluent.shaded.org.apache.zookeeper + + + org.apache.jute + org.apache.flink.formats.avro.registry.confluent.shaded.org.apache.jute + + + org.I0Itec.zkclient + org.apache.flink.formats.avro.registry.confluent.shaded.org.101tec diff --git a/flink-formats/flink-avro-confluent-registry/src/main/resources/META-INF/NOTICE b/flink-formats/flink-avro-confluent-registry/src/main/resources/META-INF/NOTICE new file mode 100644 index 000..24f2ff84657 --- /dev/null +++ b/flink-formats/flink-avro-confluent-registry/src/main/resources/META-INF/NOTICE @@ -0,0 +1,15 @@ +flink-avro-confluent-registry +Copyright 2014-2018 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) + +- com.101tec:zkclient:0.10 +- com.fasterxml.jackson.core:jackson-databind:2.8.4 +- com.fasterxml.jackson.core:jackson-annotations:2.8.0 +- com.fasterxml.jackson.core:jackson-core:2.8.4 +- io.confluent:common-utils:3.3.1 +- io.confluent:kafka-schema-registry-client:3.3.1 +- org.apache.zookeeper:zookeeper:3.4.10 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact
[GitHub] zentol closed pull request #7170: [FLINK-10997][formats] Bundle kafka-scheme-registry-client
zentol closed pull request #7170: [FLINK-10997][formats] Bundle kafka-scheme-registry-client URL: https://github.com/apache/flink/pull/7170 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-formats/flink-avro-confluent-registry/pom.xml b/flink-formats/flink-avro-confluent-registry/pom.xml index 72a74ac11ba..4c99f71e544 100644 --- a/flink-formats/flink-avro-confluent-registry/pom.xml +++ b/flink-formats/flink-avro-confluent-registry/pom.xml @@ -78,10 +78,31 @@ under the License. shade + false + + + io.confluent:* + com.fasterxml.jackson.core:* + org.apache.zookeeper:zookeeper + com.101tec:zkclient + + - com.fasterxml.jackson.core - org.apache.flink.formats.avro.registry.confluent.shaded.com.fasterxml.jackson.core + com.fasterxml.jackson + org.apache.flink.formats.avro.registry.confluent.shaded.com.fasterxml.jackson + + + org.apache.zookeeper + org.apache.flink.formats.avro.registry.confluent.shaded.org.apache.zookeeper + + + org.apache.jute + org.apache.flink.formats.avro.registry.confluent.shaded.org.apache.jute + + + org.I0Itec.zkclient + org.apache.flink.formats.avro.registry.confluent.shaded.org.101tec diff --git a/flink-formats/flink-avro-confluent-registry/src/main/resources/META-INF/NOTICE b/flink-formats/flink-avro-confluent-registry/src/main/resources/META-INF/NOTICE new file mode 100644 index 000..24f2ff84657 --- /dev/null +++ b/flink-formats/flink-avro-confluent-registry/src/main/resources/META-INF/NOTICE @@ -0,0 +1,15 @@ +flink-avro-confluent-registry +Copyright 2014-2018 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) + +- com.101tec:zkclient:0.10 +- com.fasterxml.jackson.core:jackson-databind:2.8.4 +- com.fasterxml.jackson.core:jackson-annotations:2.8.0 +- com.fasterxml.jackson.core:jackson-core:2.8.4 +- io.confluent:common-utils:3.3.1 +- io.confluent:kafka-schema-registry-client:3.3.1 +- org.apache.zookeeper:zookeeper:3.4.10 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore
[ https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711265#comment-16711265 ] ASF GitHub Bot commented on FLINK-11048: zentol commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r239406325 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -262,4 +281,5 @@ public int getPort() { public Configuration getClientConfiguration() { return clientConfiguration; } + Review comment: revert This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Ability to programmatically execute streaming pipeline with savepoint restore > > --- > > Key: FLINK-11048 > URL: https://issues.apache.org/jira/browse/FLINK-11048 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.7.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Minor > Labels: pull-request-available > > RemoteStreamEnvironment.execute doesn't support restore from savepoint, > though the underlying ClusterClient does. Add an explicit "execute remotely" > that can be used by downstream projects. > [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E] > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore
zentol commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r239406325 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -262,4 +281,5 @@ public int getPort() { public Configuration getClientConfiguration() { return clientConfiguration; } + Review comment: revert This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10973) Add Map operator to Table API
[ https://issues.apache.org/jira/browse/FLINK-10973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711263#comment-16711263 ] ASF GitHub Bot commented on FLINK-10973: twalthr commented on issue #7167: [FLINK-10973] [table] Add support for map to table API URL: https://github.com/apache/flink/pull/7167#issuecomment-444829802 Thanks for the reminder @dianfu. I will try to add my feedback until tomorrow. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Map operator to Table API > - > > Key: FLINK-10973 > URL: https://issues.apache.org/jira/browse/FLINK-10973 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add Map operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr] > The usage: > {code:java} > val res = tab >.map(fun: ScalarFunction) // output has columns 'a, 'b, 'c >.select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore
zentol commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r239406192 ## File path: flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteStreamEnvironment.java ## @@ -68,30 +67,21 @@ public ScalaShellRemoteStreamEnvironment( this.flinkILoop = flinkILoop; } - /** -* Executes the remote job. -* -* @param streamGraph -*Stream Graph to execute -* @param jarFiles -*List of jar file URLs to ship to the cluster -* @return The result of the job execution, containing elapsed time and accumulators. -*/ - @Override - protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List jarFiles) throws ProgramInvocationException { + protected List getJarFiles() throws ProgramInvocationException { Review comment: missing `@Override`, also applies to other classes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on issue #7167: [FLINK-10973] [table] Add support for map to table API
twalthr commented on issue #7167: [FLINK-10973] [table] Add support for map to table API URL: https://github.com/apache/flink/pull/7167#issuecomment-444829802 Thanks for the reminder @dianfu. I will try to add my feedback until tomorrow. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore
[ https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711262#comment-16711262 ] ASF GitHub Bot commented on FLINK-11048: zentol commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r239405800 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig } } - @Override - public JobExecutionResult execute(String jobName) throws ProgramInvocationException { - StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); - transformations.clear(); - return executeRemotely(streamGraph, jarFiles); + protected List getJarFiles() throws ProgramInvocationException { + return jarFiles; } /** * Executes the remote job. * -* @param streamGraph -*Stream Graph to execute +* @param streamExecutionEnvironment +*Execution Environment with Stream Graph to execute * @param jarFiles *List of jar file URLs to ship to the cluster * @return The result of the job execution, containing elapsed time and accumulators. */ - protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List jarFiles) throws ProgramInvocationException { + public static JobExecutionResult executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment, Review comment: This goes back a bit to the discussion on the ML. Remote execution shouldn't be an intrinsic part of the environment, as regardless of which environment you're using it is still possible to submit it remotely provided you have the host/port arguments. With how this PR sets things up you don't have to change your `getExecutionEnvironment` call every time you want to change the submission method; instead you only replace the `execute()` call which imo makes a lot more sense. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Ability to programmatically execute streaming pipeline with savepoint restore > > --- > > Key: FLINK-11048 > URL: https://issues.apache.org/jira/browse/FLINK-11048 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.7.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Minor > Labels: pull-request-available > > RemoteStreamEnvironment.execute doesn't support restore from savepoint, > though the underlying ClusterClient does. Add an explicit "execute remotely" > that can be used by downstream projects. > [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E] > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore
zentol commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r239405800 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig } } - @Override - public JobExecutionResult execute(String jobName) throws ProgramInvocationException { - StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); - transformations.clear(); - return executeRemotely(streamGraph, jarFiles); + protected List getJarFiles() throws ProgramInvocationException { + return jarFiles; } /** * Executes the remote job. * -* @param streamGraph -*Stream Graph to execute +* @param streamExecutionEnvironment +*Execution Environment with Stream Graph to execute * @param jarFiles *List of jar file URLs to ship to the cluster * @return The result of the job execution, containing elapsed time and accumulators. */ - protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List jarFiles) throws ProgramInvocationException { + public static JobExecutionResult executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment, Review comment: This goes back a bit to the discussion on the ML. Remote execution shouldn't be an intrinsic part of the environment, as regardless of which environment you're using it is still possible to submit it remotely provided you have the host/port arguments. With how this PR sets things up you don't have to change your `getExecutionEnvironment` call every time you want to change the submission method; instead you only replace the `execute()` call which imo makes a lot more sense. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dawidwys closed pull request #7212: [hotfix][docs] Fix invalid link in schema_evolution doc
dawidwys closed pull request #7212: [hotfix][docs] Fix invalid link in schema_evolution doc URL: https://github.com/apache/flink/pull/7212 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/stream/state/schema_evolution.md b/docs/dev/stream/state/schema_evolution.md index 2fb10a74ff1..e4c2d4aa441 100644 --- a/docs/dev/stream/state/schema_evolution.md +++ b/docs/dev/stream/state/schema_evolution.md @@ -74,7 +74,7 @@ serialization schema than the previous serializer; if so, the previous serialize and written back to bytes again with the new serializer. Further details about the migration process is out of the scope of this documentation; please refer to -[here]({{ site.baseurl }}/dev/stream/state/custom_serialization). +[here]({{ site.baseurl }}/dev/stream/state/custom_serialization.html). ## Supported data types for schema evolution This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11036) Streaming classloader end-to-end test does not work on source releases
[ https://issues.apache.org/jira/browse/FLINK-11036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-11036: -- Labels: test-stability (was: ) > Streaming classloader end-to-end test does not work on source releases > -- > > Key: FLINK-11036 > URL: https://issues.apache.org/jira/browse/FLINK-11036 > Project: Flink > Issue Type: Bug > Components: E2E Tests >Affects Versions: 1.7.0 >Reporter: Timo Walther >Priority: Critical > Labels: test-stability > Fix For: 1.8.0 > > > A Flink distribution that has been built from a source release has no git > commit id associated. The web UI shows {{unkown}} as the version commit. > Therefore, the {{test_streaming_classloader.sh}} can not be executed on a > source release. Either we change the test setup or we skip the test if the > commit id is not present. > {code} > == > Running 'class loading end-to-end test' > == > TEST_DATA_DIR: > /Users/twalthr/flink/tests/flink1.7/flink-1.7.0/flink-end-to-end-tests/test-scripts/temp-test-directory-30N > Flink dist directory: > /Users/twalthr/flink/tests/flink1.7/flink-1.7.0/flink-dist/target/flink-1.7.0-bin/flink-1.7.0 > Testing parent-first class loading > Starting cluster. > Starting standalonesession daemon on host TTMACBOOK. > Starting taskexecutor daemon on host TTMACBOOK. > Waiting for dispatcher REST endpoint to come up... > Waiting for dispatcher REST endpoint to come up... > Waiting for dispatcher REST endpoint to come up... > Dispatcher REST endpoint is up. > Starting execution of program > Program execution finished > Job with JobID d69f02237884c6119699302497586dbf has finished. > Job Runtime: 334 ms > Stopping taskexecutor daemon (pid: 1598) on host TTMACBOOK. > Stopping standalonesession daemon (pid: 1180) on host TTMACBOOK. > Output from Flink program does not match expected output. > EXPECTED: NoSuchMethodError:[0-9a-f]{6,}:[0-9a-f]{6,}hello-there-42 > ACTUAL: NoSuchMethodError:hello-there-42:hello-there-42 > [FAIL] Test script contains errors. > Checking for errors... > No errors in log files. > Checking for exceptions... > No exceptions in log files. > Checking for non-empty .out files... > No non-empty .out files. > [FAIL] 'class loading end-to-end test' failed after 0 minutes and 9 seconds! > Test exited with exit code 1 > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10964) sql-client throws exception when paging through finished batch query
[ https://issues.apache.org/jira/browse/FLINK-10964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711232#comment-16711232 ] Timo Walther commented on FLINK-10964: -- I can give more information here. The issue is rather a conceptual problem. Currently, we only maintain results until a job has finished. If the job is bounded (for batch or streaming with bounded sources), the result store is cleaned up even though the CLI still wants to page through the result. A solution would be: - keep results in {{ResultStore}} even if query has been stopped - delete result when {{Executor#cancelQuery}} is called - or by a timeout that has been specified using the "execution" properties (meaning we need a timer service in the result store). > sql-client throws exception when paging through finished batch query > - > > Key: FLINK-10964 > URL: https://issues.apache.org/jira/browse/FLINK-10964 > Project: Flink > Issue Type: Bug > Components: SQL Client >Reporter: Seth Wiesman >Assignee: vinoyang >Priority: Major > > When paging through a batch query in state 'Finished' the sql client throws > the following exception: > {code:java} > org.apache.flink.table.client.gateway.SqlExecutionException: Could not find a > result with result identifier '0c7dce30d287fdd13b934fbefe5a38d1'.{code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)