[jira] [Commented] (FLINK-10884) Flink on yarn TM container will be killed by nodemanager because of the exceeded physical memory.
[ https://issues.apache.org/jira/browse/FLINK-10884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687577#comment-16687577 ] chunpinghe commented on FLINK-10884: what's your solution? yarn will check the physical memory used by container by default, you can disable it by set {color:#6a8759}yarn.nodemanager.pmem-check-enabled {color:#33}to false. in your example, if your container use too much offheap memory(directory memory , or jni malloc) lead to total memory exceeds 3g then the container will be killed anyhow.{color} {color} {color:#6a8759}{color:#33}so, if your container was always killed by nodemanager you shoud check if the total memory you provided for it is not sufficient or your code has memory leak (mainly native memory leak){color}{color} > Flink on yarn TM container will be killed by nodemanager because of the > exceeded physical memory. > > > Key: FLINK-10884 > URL: https://issues.apache.org/jira/browse/FLINK-10884 > Project: Flink > Issue Type: Bug > Components: Cluster Management, Core >Affects Versions: 1.6.2 > Environment: version : 1.6.2 > module : flink on yarn > centos jdk1.8 > hadoop 2.7 >Reporter: wgcn >Priority: Major > Labels: yarn > > TM container will be killed by nodemanager because of the exceeded > [physical|http://www.baidu.com/link?url=Y4LyfMDH59n9-Ey16Fo6EFAYltN1e9anB3y2ynhVmdvuIBCkJGdH0hTExKDZRvXNr6hqhwIXs8JjYqesYbx0BOpQDD0o1VjbVQlOC-9MgXi] > memory. I found the lanuch context lanuching TM container that > "container memory = heap memory+ offHeapSizeMB" at the class > org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters > from line 160 to 166 I set a safety margin for the whole memory container > using. For example if the container limit 3g memory, the sum memory that > "heap memory+ offHeapSizeMB" is equal to 2.4g to prevent the container > being killed.Do we have the > [ready-made|http://www.baidu.com/link?url=ylC8cEafGU6DWAdU9ADcJPNugkjbx6IjtqIIxJ9foX4_Yfgc7ctWmpEpQRettVmBiOy7Wfph7S1UvN5LiJj-G1Rsb--oDw4Z2OEbA5Fj0bC] > solution or I can commit my solution -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10891) Upgrade Kafka client version to 2.0.1
[ https://issues.apache.org/jira/browse/FLINK-10891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687555#comment-16687555 ] ASF GitHub Bot commented on FLINK-10891: yanghua commented on issue #7101: [FLINK-10891] Upgrade Kafka client version to 2.0.1 URL: https://github.com/apache/flink/pull/7101#issuecomment-438935809 @twalthr This PR also blocked by FLINK-10624 which contains Kafka server version (2.0.0). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Upgrade Kafka client version to 2.0.1 > - > > Key: FLINK-10891 > URL: https://issues.apache.org/jira/browse/FLINK-10891 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > Since the modern kafka connector only keeps track of the latest version of > the kafka client. With the release of Kafka 2.0.1, we should upgrade the > version of the kafka client maven dependency. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #7101: [FLINK-10891] Upgrade Kafka client version to 2.0.1
yanghua commented on issue #7101: [FLINK-10891] Upgrade Kafka client version to 2.0.1 URL: https://github.com/apache/flink/pull/7101#issuecomment-438935809 @twalthr This PR also blocked by FLINK-10624 which contains Kafka server version (2.0.0). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10872) Extend SQL client end-to-end to test KafkaTableSink for kafka connector 0.11
[ https://issues.apache.org/jira/browse/FLINK-10872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687554#comment-16687554 ] ASF GitHub Bot commented on FLINK-10872: yanghua commented on issue #7100: [FLINK-10872] Extend SQL client end-to-end to test KafkaTableSink for kafka connector 0.11 URL: https://github.com/apache/flink/pull/7100#issuecomment-43893 @twalthr I cherry-picked your refactor about FLINK-10624 and squashed them so that I can process this issue in parallel. But I split it into a single commit. But in essence, it is still blocked by FLINK-10624. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extend SQL client end-to-end to test KafkaTableSink for kafka connector 0.11 > > > Key: FLINK-10872 > URL: https://issues.apache.org/jira/browse/FLINK-10872 > Project: Flink > Issue Type: Test > Components: E2E Tests >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10872) Extend SQL client end-to-end to test KafkaTableSink for kafka connector 0.11
[ https://issues.apache.org/jira/browse/FLINK-10872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10872: --- Labels: pull-request-available (was: ) > Extend SQL client end-to-end to test KafkaTableSink for kafka connector 0.11 > > > Key: FLINK-10872 > URL: https://issues.apache.org/jira/browse/FLINK-10872 > Project: Flink > Issue Type: Test > Components: E2E Tests >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #7100: [FLINK-10872] Extend SQL client end-to-end to test KafkaTableSink for kafka connector 0.11
yanghua commented on issue #7100: [FLINK-10872] Extend SQL client end-to-end to test KafkaTableSink for kafka connector 0.11 URL: https://github.com/apache/flink/pull/7100#issuecomment-43893 @twalthr I cherry-picked your refactor about FLINK-10624 and squashed them so that I can process this issue in parallel. But I split it into a single commit. But in essence, it is still blocked by FLINK-10624. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10891) Upgrade Kafka client version to 2.0.1
[ https://issues.apache.org/jira/browse/FLINK-10891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10891: --- Labels: pull-request-available (was: ) > Upgrade Kafka client version to 2.0.1 > - > > Key: FLINK-10891 > URL: https://issues.apache.org/jira/browse/FLINK-10891 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > Since the modern kafka connector only keeps track of the latest version of > the kafka client. With the release of Kafka 2.0.1, we should upgrade the > version of the kafka client maven dependency. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua opened a new pull request #7101: [FLINK-10891] Upgrade Kafka client version to 2.0.1
yanghua opened a new pull request #7101: [FLINK-10891] Upgrade Kafka client version to 2.0.1 URL: https://github.com/apache/flink/pull/7101 ## What is the purpose of the change *This pull request upgrades Kafka client version to 2.0.1* ## Brief change log - *Upgrade Kafka client version to 2.0.1* ## Verifying this change This change is already covered by existing tests*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**yes** / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10891) Upgrade Kafka client version to 2.0.1
[ https://issues.apache.org/jira/browse/FLINK-10891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687538#comment-16687538 ] ASF GitHub Bot commented on FLINK-10891: yanghua opened a new pull request #7101: [FLINK-10891] Upgrade Kafka client version to 2.0.1 URL: https://github.com/apache/flink/pull/7101 ## What is the purpose of the change *This pull request upgrades Kafka client version to 2.0.1* ## Brief change log - *Upgrade Kafka client version to 2.0.1* ## Verifying this change This change is already covered by existing tests*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**yes** / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Upgrade Kafka client version to 2.0.1 > - > > Key: FLINK-10891 > URL: https://issues.apache.org/jira/browse/FLINK-10891 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > Since the modern kafka connector only keeps track of the latest version of > the kafka client. With the release of Kafka 2.0.1, we should upgrade the > version of the kafka client maven dependency. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10891) Upgrade Kafka client version to 2.0.1
vinoyang created FLINK-10891: Summary: Upgrade Kafka client version to 2.0.1 Key: FLINK-10891 URL: https://issues.apache.org/jira/browse/FLINK-10891 Project: Flink Issue Type: Sub-task Components: Kafka Connector Reporter: vinoyang Assignee: vinoyang Since the modern kafka connector only keeps track of the latest version of the kafka client. With the release of Kafka 2.0.1, we should upgrade the version of the kafka client maven dependency. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10890) CLONE - Add DataStream HBase Sink
[ https://issues.apache.org/jira/browse/FLINK-10890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jocean@gamil.com closed FLINK-10890. Resolution: Won't Do > CLONE - Add DataStream HBase Sink > - > > Key: FLINK-10890 > URL: https://issues.apache.org/jira/browse/FLINK-10890 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: jocean@gamil.com >Assignee: Shimin Yang >Priority: Major > Labels: pull-request-available > > Design documentation: > [https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687524#comment-16687524 ] Dian Fu commented on FLINK-8159: Thanks a lot for the reply and advice. I will submit a PR later today. > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10890) CLONE - Add DataStream HBase Sink
jocean@gamil.com created FLINK-10890: Summary: CLONE - Add DataStream HBase Sink Key: FLINK-10890 URL: https://issues.apache.org/jira/browse/FLINK-10890 Project: Flink Issue Type: Sub-task Components: Streaming Connectors Reporter: jocean@gamil.com Assignee: Shimin Yang Design documentation: [https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9718) Add enviroment variable in start-scala-shell.sh & flink to enable remote debug
[ https://issues.apache.org/jira/browse/FLINK-9718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687507#comment-16687507 ] ASF GitHub Bot commented on FLINK-9718: --- zjffdu commented on issue #6245: [FLINK-9718][scala-shell]. Add enviroment variable in start-scala-shell.sh and flink to enable remote debug URL: https://github.com/apache/flink/pull/6245#issuecomment-438922942 ping @yanghua This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add enviroment variable in start-scala-shell.sh & flink to enable remote debug > -- > > Key: FLINK-9718 > URL: https://issues.apache.org/jira/browse/FLINK-9718 > Project: Flink > Issue Type: Improvement >Reporter: Jeff Zhang >Assignee: Jeff Zhang >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zjffdu commented on issue #6245: [FLINK-9718][scala-shell]. Add enviroment variable in start-scala-shell.sh and flink to enable remote debug
zjffdu commented on issue #6245: [FLINK-9718][scala-shell]. Add enviroment variable in start-scala-shell.sh and flink to enable remote debug URL: https://github.com/apache/flink/pull/6245#issuecomment-438922942 ping @yanghua This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10887) Add source watermark tracking to the JobMaster
[ https://issues.apache.org/jira/browse/FLINK-10887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687500#comment-16687500 ] ASF GitHub Bot commented on FLINK-10887: tweise commented on a change in pull request #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster URL: https://github.com/apache/flink/pull/7099#discussion_r233714795 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/watermark/SourceWatermark.java ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.watermark; + +import java.io.Serializable; + +/** + * This represents the watermark for a single source partition. + */ +public class SourceWatermark implements Serializable { Review comment: I wonder if it should be qualified as `SourceWatermark` vs. just `Watermark`? Perhaps there are use cases for exchanging watermarks across subtasks that don't necessarily belong to a source. One such example could be operators that perform asynchronous operations. Related, do we want to allow for an identifier for the watermark so that within an application multiple independent groupings could be formed? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add source watermark tracking to the JobMaster > -- > > Key: FLINK-10887 > URL: https://issues.apache.org/jira/browse/FLINK-10887 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: Jamie Grier >Assignee: Jamie Grier >Priority: Major > Labels: pull-request-available > Original Estimate: 24h > Remaining Estimate: 24h > > We need to add a new RPC to the JobMaster such that the current watermark for > every source sub-task can be reported and the current global minimum/maximum > watermark can be retrieved so that each source can adjust their partition > read rates in an attempt to keep sources roughly aligned in event time. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tweise commented on a change in pull request #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster
tweise commented on a change in pull request #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster URL: https://github.com/apache/flink/pull/7099#discussion_r233715375 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/watermark/SourceWatermark.java ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.watermark; + +import java.io.Serializable; + +/** + * This represents the watermark for a single source partition. + */ +public class SourceWatermark implements Serializable { Review comment: This may be a bit far fetched, but can it be generalized further to something like a named counter/metric? Currently there isn't anything watermark specific here? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10887) Add source watermark tracking to the JobMaster
[ https://issues.apache.org/jira/browse/FLINK-10887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687502#comment-16687502 ] ASF GitHub Bot commented on FLINK-10887: tweise commented on a change in pull request #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster URL: https://github.com/apache/flink/pull/7099#discussion_r233715375 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/watermark/SourceWatermark.java ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.watermark; + +import java.io.Serializable; + +/** + * This represents the watermark for a single source partition. + */ +public class SourceWatermark implements Serializable { Review comment: This may be a bit far fetched, but can it be generalized further to something like a named counter/metric? Currently there isn't anything watermark specific here? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add source watermark tracking to the JobMaster > -- > > Key: FLINK-10887 > URL: https://issues.apache.org/jira/browse/FLINK-10887 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: Jamie Grier >Assignee: Jamie Grier >Priority: Major > Labels: pull-request-available > Original Estimate: 24h > Remaining Estimate: 24h > > We need to add a new RPC to the JobMaster such that the current watermark for > every source sub-task can be reported and the current global minimum/maximum > watermark can be retrieved so that each source can adjust their partition > read rates in an attempt to keep sources roughly aligned in event time. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tweise commented on a change in pull request #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster
tweise commented on a change in pull request #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster URL: https://github.com/apache/flink/pull/7099#discussion_r233714795 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/watermark/SourceWatermark.java ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.watermark; + +import java.io.Serializable; + +/** + * This represents the watermark for a single source partition. + */ +public class SourceWatermark implements Serializable { Review comment: I wonder if it should be qualified as `SourceWatermark` vs. just `Watermark`? Perhaps there are use cases for exchanging watermarks across subtasks that don't necessarily belong to a source. One such example could be operators that perform asynchronous operations. Related, do we want to allow for an identifier for the watermark so that within an application multiple independent groupings could be formed? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tweise commented on a change in pull request #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster
tweise commented on a change in pull request #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster URL: https://github.com/apache/flink/pull/7099#discussion_r233714283 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/watermark/SourceWatermark.java ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.watermark; + +import java.io.Serializable; + +/** + * This represents the watermark for a single source partition. + */ +public class SourceWatermark implements Serializable { + + private static final long serialVersionUID = 1L; + private long timestamp; Review comment: What does the timestamp represent? Is it when the watermark last changed or when it was last communicated by the subtask (even if it did not change, for example because the subtask is just reading a lot of data under the same watermark). We will need a way to detect that a source subtask is idle so we can avoid waiting for it (similar to how we has to identify idle within a subtask). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10887) Add source watermark tracking to the JobMaster
[ https://issues.apache.org/jira/browse/FLINK-10887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687496#comment-16687496 ] ASF GitHub Bot commented on FLINK-10887: tweise commented on a change in pull request #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster URL: https://github.com/apache/flink/pull/7099#discussion_r233714283 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/watermark/SourceWatermark.java ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.watermark; + +import java.io.Serializable; + +/** + * This represents the watermark for a single source partition. + */ +public class SourceWatermark implements Serializable { + + private static final long serialVersionUID = 1L; + private long timestamp; Review comment: What does the timestamp represent? Is it when the watermark last changed or when it was last communicated by the subtask (even if it did not change, for example because the subtask is just reading a lot of data under the same watermark). We will need a way to detect that a source subtask is idle so we can avoid waiting for it (similar to how we has to identify idle within a subtask). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add source watermark tracking to the JobMaster > -- > > Key: FLINK-10887 > URL: https://issues.apache.org/jira/browse/FLINK-10887 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: Jamie Grier >Assignee: Jamie Grier >Priority: Major > Labels: pull-request-available > Original Estimate: 24h > Remaining Estimate: 24h > > We need to add a new RPC to the JobMaster such that the current watermark for > every source sub-task can be reported and the current global minimum/maximum > watermark can be retrieved so that each source can adjust their partition > read rates in an attempt to keep sources roughly aligned in event time. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10774) connection leak when partition discovery is disabled and open throws exception
[ https://issues.apache.org/jira/browse/FLINK-10774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687465#comment-16687465 ] ASF GitHub Bot commented on FLINK-10774: stevenzwu commented on issue #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an… URL: https://github.com/apache/flink/pull/7020#issuecomment-438914628 Got the evidence that it is thread safety problem ``` 2018-11-15 04:24:17,795 ERROR org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - failed to close partitionDiscoverer java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1627) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1526) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1506) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.closeConnections(Kafka09PartitionDiscoverer.java:97) at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.close(AbstractPartitionDiscoverer.java:101) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:673) at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:108) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:100) at org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:369) at org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1481) at java.lang.Thread.run(Thread.java:748) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > connection leak when partition discovery is disabled and open throws exception > -- > > Key: FLINK-10774 > URL: https://issues.apache.org/jira/browse/FLINK-10774 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.2, 1.5.5, 1.6.2 >Reporter: Steven Zhen Wu >Assignee: Steven Zhen Wu >Priority: Major > Labels: pull-request-available > > Here is the scenario to reproduce the issue > * partition discovery is disabled > * open method throws an exception (e.g. when broker SSL authorization denies > request) > In this scenario, run method won't be executed. As a result, > _partitionDiscoverer.close()_ won't be called. that caused the connection > leak, because KafkaConsumer is initialized but not closed. That has caused > outage that brought down our Kafka cluster, when a high-parallelism job got > into a restart/failure loop. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] stevenzwu commented on issue #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…
stevenzwu commented on issue #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an… URL: https://github.com/apache/flink/pull/7020#issuecomment-438914628 Got the evidence that it is thread safety problem ``` 2018-11-15 04:24:17,795 ERROR org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - failed to close partitionDiscoverer java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1627) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1526) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1506) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.closeConnections(Kafka09PartitionDiscoverer.java:97) at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.close(AbstractPartitionDiscoverer.java:101) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:673) at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:108) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:100) at org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:369) at org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1481) at java.lang.Thread.run(Thread.java:748) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-10889) Semantic inconsistency between DataSet#print and DataStream#print
[ https://issues.apache.org/jira/browse/FLINK-10889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-10889: Assignee: vinoyang > Semantic inconsistency between DataSet#print and DataStream#print > - > > Key: FLINK-10889 > URL: https://issues.apache.org/jira/browse/FLINK-10889 > Project: Flink > Issue Type: Improvement > Components: DataSet API, DataStream API >Reporter: Jeff Zhang >Assignee: vinoyang >Priority: Major > > DataSet#print will print the result on client side, while DataStream#print > will print the result on TM. This inconsistency will confuse users. IMHO, we > should make the behavior consistency between DataSet and DataStream, I prefer > to print the result on client side. Regarding DataStream#print, we can use > DataStreamUtils#collect to print it on client side. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10889) Semantic inconsistency between DataSet#print and DataStream#print
Jeff Zhang created FLINK-10889: -- Summary: Semantic inconsistency between DataSet#print and DataStream#print Key: FLINK-10889 URL: https://issues.apache.org/jira/browse/FLINK-10889 Project: Flink Issue Type: Improvement Components: DataSet API, DataStream API Reporter: Jeff Zhang DataSet#print will print the result on client side, while DataStream#print will print the result on TM. This inconsistency will confuse users. IMHO, we should make the behavior consistency between DataSet and DataStream, I prefer to print the result on client side. Regarding DataStream#print, we can use DataStreamUtils#collect to print it on client side. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua opened a new pull request #7100: [Flink 10872] Extend SQL client end-to-end to test KafkaTableSink for kafka connector 0.11
yanghua opened a new pull request #7100: [Flink 10872] Extend SQL client end-to-end to test KafkaTableSink for kafka connector 0.11 URL: https://github.com/apache/flink/pull/7100 ## What is the purpose of the change *This pull request extends SQL client end-to-end to test KafkaTableSink for kafka connector 0.11* ## Brief change log - *Extend SQL client end-to-end to test KafkaTableSink for kafka connector 0.11* ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-10873) Remove tableEnv in DataSetConversions#toTable and DataStreamConversions#toTable
[ https://issues.apache.org/jira/browse/FLINK-10873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng reassigned FLINK-10873: --- Assignee: Hequn Cheng > Remove tableEnv in DataSetConversions#toTable and > DataStreamConversions#toTable > --- > > Key: FLINK-10873 > URL: https://issues.apache.org/jira/browse/FLINK-10873 > Project: Flink > Issue Type: Improvement >Reporter: Jeff Zhang >Assignee: Hequn Cheng >Priority: Major > > What I would like to achieve is to change the following code > {code} > val table = data.flatMap(line=>line.split("\\s")) > .map(w => (w, 1)) > .toTable(tEnv, 'word, 'count) > {code} > to this > {code} > val table = data.flatMap(line=>line.split("\\s")) > .map(w => (w, 1)) > .toTable('word, 'count) > {code} > The only change is that tableEnv is removed in method toTable. I think the > second piece of code is more readable. We can create TableEnvironment based > on the ExecutionEnvironment of DataSet/DataStream rather than asking user to > pass it explicitly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10886) Event time synchronization across sources
[ https://issues.apache.org/jira/browse/FLINK-10886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687333#comment-16687333 ] Jamie Grier commented on FLINK-10886: - ML discussion: [http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-td24489.html] > Event time synchronization across sources > - > > Key: FLINK-10886 > URL: https://issues.apache.org/jira/browse/FLINK-10886 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Jamie Grier >Assignee: Jamie Grier >Priority: Major > Original Estimate: 336h > Remaining Estimate: 336h > > When reading from a source with many parallel partitions, especially when > reading lots of historical data (or recovering from downtime and there is a > backlog to read), it's quite common for there to develop an event-time skew > across those partitions. > > When doing event-time windowing -- or in fact any event-time driven > processing -- the event time skew across partitions results directly in > increased buffering in Flink and of course the corresponding state/checkpoint > size growth. > > As the event-time skew and state size grows larger this can have a major > effect on application performance and in some cases result in a "death > spiral" where the application performance get's worse and worse as the state > size grows and grows. > > So, one solution to this problem, outside of core changes in Flink itself, > seems to be to try to coordinate sources across partitions so that they make > progress through event time at roughly the same rate. In fact if there is > large skew the idea would be to slow or even stop reading from some > partitions with newer data while first reading the partitions with older > data. Anyway, to do this we need to share state somehow amongst sub-tasks. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10887) Add source watermark tracking to the JobMaster
[ https://issues.apache.org/jira/browse/FLINK-10887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687311#comment-16687311 ] ASF GitHub Bot commented on FLINK-10887: jgrier opened a new pull request #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster URL: https://github.com/apache/flink/pull/7099 ## What is the purpose of the change This commit adds a JobMaster RPC endpoint that is used to share information across source subtasks regarding event time progress. This will be used implement event time source synchronization across sources. ## Brief change log - New RPC endpoint on JobMaster to track event time progress across source sub-tasks - Updated JobMaster Tests ## Verifying this change This change added tests and can be verified as follows: - Added testCase to ensure watermark stats are computed correctly to JobMasterTest ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): No - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: No - The serializers: No - The runtime per-record code paths (performance sensitive): No - Anything that affects deployment or recovery: No - The S3 file system connector: No ## Documentation - Does this pull request introduce a new feature? No - If yes, how is the feature documented? not applicable This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add source watermark tracking to the JobMaster > -- > > Key: FLINK-10887 > URL: https://issues.apache.org/jira/browse/FLINK-10887 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: Jamie Grier >Assignee: Jamie Grier >Priority: Major > Labels: pull-request-available > Original Estimate: 24h > Remaining Estimate: 24h > > We need to add a new RPC to the JobMaster such that the current watermark for > every source sub-task can be reported and the current global minimum/maximum > watermark can be retrieved so that each source can adjust their partition > read rates in an attempt to keep sources roughly aligned in event time. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10887) Add source watermark tracking to the JobMaster
[ https://issues.apache.org/jira/browse/FLINK-10887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10887: --- Labels: pull-request-available (was: ) > Add source watermark tracking to the JobMaster > -- > > Key: FLINK-10887 > URL: https://issues.apache.org/jira/browse/FLINK-10887 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: Jamie Grier >Assignee: Jamie Grier >Priority: Major > Labels: pull-request-available > Original Estimate: 24h > Remaining Estimate: 24h > > We need to add a new RPC to the JobMaster such that the current watermark for > every source sub-task can be reported and the current global minimum/maximum > watermark can be retrieved so that each source can adjust their partition > read rates in an attempt to keep sources roughly aligned in event time. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] jgrier opened a new pull request #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster
jgrier opened a new pull request #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster URL: https://github.com/apache/flink/pull/7099 ## What is the purpose of the change This commit adds a JobMaster RPC endpoint that is used to share information across source subtasks regarding event time progress. This will be used implement event time source synchronization across sources. ## Brief change log - New RPC endpoint on JobMaster to track event time progress across source sub-tasks - Updated JobMaster Tests ## Verifying this change This change added tests and can be verified as follows: - Added testCase to ensure watermark stats are computed correctly to JobMasterTest ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): No - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: No - The serializers: No - The runtime per-record code paths (performance sensitive): No - Anything that affects deployment or recovery: No - The S3 file system connector: No ## Documentation - Does this pull request introduce a new feature? No - If yes, how is the feature documented? not applicable This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10774) connection leak when partition discovery is disabled and open throws exception
[ https://issues.apache.org/jira/browse/FLINK-10774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687278#comment-16687278 ] ASF GitHub Bot commented on FLINK-10774: stevenzwu edited a comment on issue #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an… URL: https://github.com/apache/flink/pull/7020#issuecomment-438544127 we saw some exception regarding redundant calls to close method. will need to fix it. ``` 2018-11-13 18:26:54,928 ERROR org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - failed to close partitionDiscoverer java.lang.IllegalStateException: This consumer has already been closed. at org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1613) at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1624) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1526) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1506) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.closeConnections(Kafka09PartitionDiscoverer.java:97) at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.close(AbstractPartitionDiscoverer.java:101) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:673) at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:108) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:100) at org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:369) at org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1481) at java.lang.Thread.run(Thread.java:748) ``` Can `run` method and `cancel` method executed in different threads? if so, we need to add `synchronized` to this method in `Kafka09PartitionDiscoverer`. ``` @Override protected void closeConnections() throws Exception { if (this.kafkaConsumer != null) { this.kafkaConsumer.close(); // de-reference the consumer to avoid closing multiple times this.kafkaConsumer = null; } } ``` I looked at the `KafkaConsumer` code again. its close method calls `acquire`. I thought it is acquire lock, which is not the case. ``` private void acquire() { this.ensureNotClosed(); long threadId = Thread.currentThread().getId(); if (threadId != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, threadId)) { throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access"); } else { this.refcount.incrementAndGet(); } } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > connection leak when partition discovery is disabled and open throws exception > -- > > Key: FLINK-10774 > URL: https://issues.apache.org/jira/browse/FLINK-10774 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.2, 1.5.5, 1.6.2 >Reporter: Steven Zhen Wu >Assignee: Steven Zhen Wu >Priority: Major > Labels: pull-request-available > > Here is the scenario to reproduce the issue > * partition discovery is disabled > * open method throws an exception (e.g. when broker SSL authorization denies > request) > In this scenario, run method won't be executed. As a result, > _partitionDiscoverer.close()_ won't be called. that caused the connection > leak, because KafkaConsumer is initialized but not closed. That has caused > outage that brought down our Kafka cluster, when a high-parallelism job got > into a restart/failure loop. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] stevenzwu edited a comment on issue #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…
stevenzwu edited a comment on issue #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an… URL: https://github.com/apache/flink/pull/7020#issuecomment-438544127 we saw some exception regarding redundant calls to close method. will need to fix it. ``` 2018-11-13 18:26:54,928 ERROR org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - failed to close partitionDiscoverer java.lang.IllegalStateException: This consumer has already been closed. at org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1613) at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1624) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1526) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1506) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.closeConnections(Kafka09PartitionDiscoverer.java:97) at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.close(AbstractPartitionDiscoverer.java:101) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:673) at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:108) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:100) at org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:369) at org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1481) at java.lang.Thread.run(Thread.java:748) ``` Can `run` method and `cancel` method executed in different threads? if so, we need to add `synchronized` to this method in `Kafka09PartitionDiscoverer`. ``` @Override protected void closeConnections() throws Exception { if (this.kafkaConsumer != null) { this.kafkaConsumer.close(); // de-reference the consumer to avoid closing multiple times this.kafkaConsumer = null; } } ``` I looked at the `KafkaConsumer` code again. its close method calls `acquire`. I thought it is acquire lock, which is not the case. ``` private void acquire() { this.ensureNotClosed(); long threadId = Thread.currentThread().getId(); if (threadId != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, threadId)) { throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access"); } else { this.refcount.incrementAndGet(); } } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10888) Expose new global watermark RPC to sources
Jamie Grier created FLINK-10888: --- Summary: Expose new global watermark RPC to sources Key: FLINK-10888 URL: https://issues.apache.org/jira/browse/FLINK-10888 Project: Flink Issue Type: Sub-task Components: Streaming Connectors Reporter: Jamie Grier Assignee: Jamie Grier Expose new JobMaster RPC for watermark tracking to Source implementations so it can be used to align reads across sources. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10887) Add source watermark tracking to the JobMaster
[ https://issues.apache.org/jira/browse/FLINK-10887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jamie Grier updated FLINK-10887: Summary: Add source watermark tracking to the JobMaster (was: Add source watermarking tracking to the JobMaster) > Add source watermark tracking to the JobMaster > -- > > Key: FLINK-10887 > URL: https://issues.apache.org/jira/browse/FLINK-10887 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: Jamie Grier >Assignee: Jamie Grier >Priority: Major > Original Estimate: 24h > Remaining Estimate: 24h > > We need to add a new RPC to the JobMaster such that the current watermark for > every source sub-task can be reported and the current global minimum/maximum > watermark can be retrieved so that each source can adjust their partition > read rates in an attempt to keep sources roughly aligned in event time. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10887) Add source watermarking tracking to the JobMaster
Jamie Grier created FLINK-10887: --- Summary: Add source watermarking tracking to the JobMaster Key: FLINK-10887 URL: https://issues.apache.org/jira/browse/FLINK-10887 Project: Flink Issue Type: Sub-task Components: JobManager Reporter: Jamie Grier Assignee: Jamie Grier We need to add a new RPC to the JobMaster such that the current watermark for every source sub-task can be reported and the current global minimum/maximum watermark can be retrieved so that each source can adjust their partition read rates in an attempt to keep sources roughly aligned in event time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10886) Event time synchronization across sources
Jamie Grier created FLINK-10886: --- Summary: Event time synchronization across sources Key: FLINK-10886 URL: https://issues.apache.org/jira/browse/FLINK-10886 Project: Flink Issue Type: Improvement Components: Streaming Connectors Reporter: Jamie Grier Assignee: Jamie Grier When reading from a source with many parallel partitions, especially when reading lots of historical data (or recovering from downtime and there is a backlog to read), it's quite common for there to develop an event-time skew across those partitions. When doing event-time windowing -- or in fact any event-time driven processing -- the event time skew across partitions results directly in increased buffering in Flink and of course the corresponding state/checkpoint size growth. As the event-time skew and state size grows larger this can have a major effect on application performance and in some cases result in a "death spiral" where the application performance get's worse and worse as the state size grows and grows. So, one solution to this problem, outside of core changes in Flink itself, seems to be to try to coordinate sources across partitions so that they make progress through event time at roughly the same rate. In fact if there is large skew the idea would be to slow or even stop reading from some partitions with newer data while first reading the partitions with older data. Anyway, to do this we need to share state somehow amongst sub-tasks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10869) Update S3 testing settings
[ https://issues.apache.org/jira/browse/FLINK-10869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686940#comment-16686940 ] ASF GitHub Bot commented on FLINK-10869: StephanEwen commented on issue #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables. URL: https://github.com/apache/flink/pull/7098#issuecomment-438763465 Thanks for the reviews. The migration of credentials is not yet complete. End-to-end tests are missing and the Yarn ITCase as well. The old credentials still work, will be phased out once we migrated everything. +1 to adding a test utility to capture access to the env variables and the path construction. Will add a commit here that refactors that. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Update S3 testing settings > -- > > Key: FLINK-10869 > URL: https://issues.apache.org/jira/browse/FLINK-10869 > Project: Flink > Issue Type: Improvement > Components: Tests >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Labels: pull-request-available > > Currently S3 tests go against a bucket hosted by 'data Artisans'. > As part of reworking the AWS permission setup, we need to adapt the > credentials and buckets for these tests. > Future tests should refer to the following environment variables for S3 tests: > * `IT_CASE_S3_BUCKET` > * `IT_CASE_S3_ACCESS_KEY=AKIAIQKDG4KW5QA6TFGA` > * `IT_CASE_S3_SECRET_KEY` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StephanEwen commented on issue #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables.
StephanEwen commented on issue #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables. URL: https://github.com/apache/flink/pull/7098#issuecomment-438763465 Thanks for the reviews. The migration of credentials is not yet complete. End-to-end tests are missing and the Yarn ITCase as well. The old credentials still work, will be phased out once we migrated everything. +1 to adding a test utility to capture access to the env variables and the path construction. Will add a commit here that refactors that. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10869) Update S3 testing settings
[ https://issues.apache.org/jira/browse/FLINK-10869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686891#comment-16686891 ] ASF GitHub Bot commented on FLINK-10869: zentol commented on issue #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables. URL: https://github.com/apache/flink/pull/7098#issuecomment-438748425 btw, for verifying this change we can push it to a separate branch in the apache repo. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Update S3 testing settings > -- > > Key: FLINK-10869 > URL: https://issues.apache.org/jira/browse/FLINK-10869 > Project: Flink > Issue Type: Improvement > Components: Tests >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Labels: pull-request-available > > Currently S3 tests go against a bucket hosted by 'data Artisans'. > As part of reworking the AWS permission setup, we need to adapt the > credentials and buckets for these tests. > Future tests should refer to the following environment variables for S3 tests: > * `IT_CASE_S3_BUCKET` > * `IT_CASE_S3_ACCESS_KEY=AKIAIQKDG4KW5QA6TFGA` > * `IT_CASE_S3_SECRET_KEY` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on issue #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables.
zentol commented on issue #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables. URL: https://github.com/apache/flink/pull/7098#issuecomment-438748425 btw, for verifying this change we can push it to a separate branch in the apache repo. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10869) Update S3 testing settings
[ https://issues.apache.org/jira/browse/FLINK-10869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686887#comment-16686887 ] ASF GitHub Bot commented on FLINK-10869: zentol commented on issue #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables. URL: https://github.com/apache/flink/pull/7098#issuecomment-438746732 @dawidwys * all S3 tests already check whether credentials are provided, and if not are skipped via assumptions * this does not change anything in terms of behavior; these tests were always only run in the Apache Flink Travis This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Update S3 testing settings > -- > > Key: FLINK-10869 > URL: https://issues.apache.org/jira/browse/FLINK-10869 > Project: Flink > Issue Type: Improvement > Components: Tests >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Labels: pull-request-available > > Currently S3 tests go against a bucket hosted by 'data Artisans'. > As part of reworking the AWS permission setup, we need to adapt the > credentials and buckets for these tests. > Future tests should refer to the following environment variables for S3 tests: > * `IT_CASE_S3_BUCKET` > * `IT_CASE_S3_ACCESS_KEY=AKIAIQKDG4KW5QA6TFGA` > * `IT_CASE_S3_SECRET_KEY` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on issue #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables.
zentol commented on issue #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables. URL: https://github.com/apache/flink/pull/7098#issuecomment-438746732 @dawidwys * all S3 tests already check whether credentials are provided, and if not are skipped via assumptions * this does not change anything in terms of behavior; these tests were always only run in the Apache Flink Travis This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10531) State TTL RocksDb backend end-to-end test failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-10531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686867#comment-16686867 ] ASF GitHub Bot commented on FLINK-10531: kl0u commented on issue #7036: [FLINK-10531][e2e] Fix unstable TTL end-to-end test. URL: https://github.com/apache/flink/pull/7036#issuecomment-438735965 @azagrebin Please have another look and let me know what you think! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > State TTL RocksDb backend end-to-end test failed on Travis > -- > > Key: FLINK-10531 > URL: https://issues.apache.org/jira/browse/FLINK-10531 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.6.1 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.7.0 > > > The {{State TTL RocksDb backend end-to-end test}} end-to-end test failed on > Travis. > https://travis-ci.org/apache/flink/jobs/438226190 > https://api.travis-ci.org/v3/job/438226190/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] kl0u commented on issue #7036: [FLINK-10531][e2e] Fix unstable TTL end-to-end test.
kl0u commented on issue #7036: [FLINK-10531][e2e] Fix unstable TTL end-to-end test. URL: https://github.com/apache/flink/pull/7036#issuecomment-438735965 @azagrebin Please have another look and let me know what you think! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10851) sqlUpdate support complex insert grammar
[ https://issues.apache.org/jira/browse/FLINK-10851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686774#comment-16686774 ] Dawid Wysakowicz commented on FLINK-10851: -- [~frank wang] have you resolved your issue? If so can we close this ticket and corresponding PR? > sqlUpdate support complex insert grammar > > > Key: FLINK-10851 > URL: https://issues.apache.org/jira/browse/FLINK-10851 > Project: Flink > Issue Type: Bug >Reporter: frank wang >Priority: Major > Labels: pull-request-available > > my code is > {{tableEnv.sqlUpdate("insert into kafka.sdkafka.product_4 select filedName1, > filedName2 from kafka.sdkafka.order_4");}} > but flink give me error info, said kafka "No table was registered under the > name kafka" > i modify the code ,that is ok now > TableEnvironment.scala > {code:java} > def sqlUpdate(stmt: String, config: QueryConfig): Unit = { > val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, > getTypeFactory) > // parse the sql query > val parsed = planner.parse(stmt) > parsed match { > case insert: SqlInsert => > // validate the SQL query > val query = insert.getSource > val validatedQuery = planner.validate(query) > // get query result as Table > val queryResult = new Table(this, > LogicalRelNode(planner.rel(validatedQuery).rel)) > // get name of sink table > val targetTableName = > insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0) > // insert query result into sink table > insertInto(queryResult, targetTableName, config) > case _ => > throw new TableException( > "Unsupported SQL query! sqlUpdate() only accepts SQL statements of > type INSERT.") > } > } > {code} > should modify to this > {code:java} > def sqlUpdate(stmt: String, config: QueryConfig): Unit = { > val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, > getTypeFactory) > // parse the sql query > val parsed = planner.parse(stmt) > parsed match { > case insert: SqlInsert => > // validate the SQL query > val query = insert.getSource > val validatedQuery = planner.validate(query) > // get query result as Table > val queryResult = new Table(this, > LogicalRelNode(planner.rel(validatedQuery).rel)) > // get name of sink table > //val targetTableName = > insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0) > val targetTableName = insert.getTargetTable.toString > // insert query result into sink table > insertInto(queryResult, targetTableName, config) > case _ => > throw new TableException( > "Unsupported SQL query! sqlUpdate() only accepts SQL statements of > type INSERT.") > } > } > {code} > > i hope this can be acceptted, thx -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10869) Update S3 testing settings
[ https://issues.apache.org/jira/browse/FLINK-10869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686764#comment-16686764 ] ASF GitHub Bot commented on FLINK-10869: dawidwys commented on issue #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables. URL: https://github.com/apache/flink/pull/7098#issuecomment-438713201 Does it mean that those tests will always fail on user's travis(unless they setup AWS credentials)? If so I think we should at least skip those tests if credentials are not provided? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Update S3 testing settings > -- > > Key: FLINK-10869 > URL: https://issues.apache.org/jira/browse/FLINK-10869 > Project: Flink > Issue Type: Improvement > Components: Tests >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Labels: pull-request-available > > Currently S3 tests go against a bucket hosted by 'data Artisans'. > As part of reworking the AWS permission setup, we need to adapt the > credentials and buckets for these tests. > Future tests should refer to the following environment variables for S3 tests: > * `IT_CASE_S3_BUCKET` > * `IT_CASE_S3_ACCESS_KEY=AKIAIQKDG4KW5QA6TFGA` > * `IT_CASE_S3_SECRET_KEY` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] dawidwys commented on issue #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables.
dawidwys commented on issue #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables. URL: https://github.com/apache/flink/pull/7098#issuecomment-438713201 Does it mean that those tests will always fail on user's travis(unless they setup AWS credentials)? If so I think we should at least skip those tests if credentials are not provided? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686705#comment-16686705 ] Dawid Wysakowicz commented on FLINK-8159: - Ad. 1 Agree with that approach with adding additional abstract class. But my original question was about using a new {{PatternSelectFunction}}, but I don't think we should support dynamically changing it, that should be possible only for {{IterativeCondition}} Ad. 2 Agreed Ad. 3 I know we could, but I don't think we should expose everything. actually I would limit its capabilities to a minimum, e.g. prohibit state creation, accumulators etc. This is of great importance especially for {{IterativeCondition}} I think as we have somewhat similar view on that issue, I think we could start with the implementation if you are still interested ;). Also last but not least sorry for the late response. > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10864) Support multiple Main classes per jar
[ https://issues.apache.org/jira/browse/FLINK-10864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686655#comment-16686655 ] Flavio Pompermaier commented on FLINK-10864: If you look at the JarListInfo, the class returned by the /jars REST API, you can see that it was meant to support multiple main classes in a single jar (in contains a List). If I had to write this API I would also add a description of the job parameters/options. I understand that removing complexity from the code is always good but this looks like a step back to me. Why not supporting 2 different flavors of *Main-Class*? One that does not implement any interface (and thus does not provide any info like it is now) and one that implement a "brand new" interface FlinkJob which has at least 2 methods: getDescrition() and List getJobParameters()? In this way you can get rid of the legacy Program and ProgramDescription interfaces and read only a the Main-Class attribute from the Manifest.. > Support multiple Main classes per jar > - > > Key: FLINK-10864 > URL: https://issues.apache.org/jira/browse/FLINK-10864 > Project: Flink > Issue Type: Improvement > Components: Job-Submission >Affects Versions: 1.6.2 >Reporter: Flavio Pompermaier >Priority: Major > > Right now all the REST API and job submission system assumes that a jar > contains only a single main class. In my experience this is rarely the case > in real scenario: a jar contains multiple jobs (with similar dependencies) > that performs different tasks. > In our use case, for example, the shaded jar is around 200 MB and 10 jobs > within it... -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10885) Avro Confluent Schema Registry E2E test failed on Travis
Chesnay Schepler created FLINK-10885: Summary: Avro Confluent Schema Registry E2E test failed on Travis Key: FLINK-10885 URL: https://issues.apache.org/jira/browse/FLINK-10885 Project: Flink Issue Type: Bug Components: Batch Connectors and Input/Output Formats, E2E Tests, Table API SQL Affects Versions: 1.7.0 Reporter: Chesnay Schepler https://travis-ci.org/zentol/flink/jobs/454943551 {code} Waiting for schema registry... [2018-11-14 12:20:59,394] ERROR Server died unexpectedly: (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:51) org.apache.kafka.common.config.ConfigException: No supported Kafka endpoints are configured. Either kafkastore.bootstrap.servers must have at least one endpoint matching kafkastore.security.protocol or broker endpoints loaded from ZooKeeper must have at least one endpoint matching kafkastore.security.protocol. at io.confluent.kafka.schemaregistry.storage.KafkaStore.endpointsToBootstrapServers(KafkaStore.java:313) at io.confluent.kafka.schemaregistry.storage.KafkaStore.(KafkaStore.java:130) at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.(KafkaSchemaRegistry.java:144) at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:53) at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:37) at io.confluent.rest.Application.createServer(Application.java:149) at io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain.main(SchemaRegistryMain.java:43) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10884) Flink on yarn TM container will be killed by nodemanager because of the exceeded physical memory.
[ https://issues.apache.org/jira/browse/FLINK-10884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wgcn updated FLINK-10884: - Labels: yarn (was: ) > Flink on yarn TM container will be killed by nodemanager because of the > exceeded physical memory. > > > Key: FLINK-10884 > URL: https://issues.apache.org/jira/browse/FLINK-10884 > Project: Flink > Issue Type: Bug > Components: Cluster Management, Core >Affects Versions: 1.6.2 > Environment: version : 1.6.2 > module : flink on yarn > centos jdk1.8 > hadoop 2.7 >Reporter: wgcn >Priority: Major > Labels: yarn > > TM container will be killed by nodemanager because of the exceeded > [physical|http://www.baidu.com/link?url=Y4LyfMDH59n9-Ey16Fo6EFAYltN1e9anB3y2ynhVmdvuIBCkJGdH0hTExKDZRvXNr6hqhwIXs8JjYqesYbx0BOpQDD0o1VjbVQlOC-9MgXi] > memory. I found the lanuch context lanuching TM container that > "container memory = heap memory+ offHeapSizeMB" at the class > org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters > from line 160 to 166 I set a safety margin for the whole memory container > using. For example if the container limit 3g memory, the sum memory that > "heap memory+ offHeapSizeMB" is equal to 2.4g to prevent the container > being killed.Do we have the > [ready-made|http://www.baidu.com/link?url=ylC8cEafGU6DWAdU9ADcJPNugkjbx6IjtqIIxJ9foX4_Yfgc7ctWmpEpQRettVmBiOy7Wfph7S1UvN5LiJj-G1Rsb--oDw4Z2OEbA5Fj0bC] > solution or I can commit my solution -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10884) Flink on yarn TM container will be killed by nodemanager because of the exceeded physical memory.
wgcn created FLINK-10884: Summary: Flink on yarn TM container will be killed by nodemanager because of the exceeded physical memory. Key: FLINK-10884 URL: https://issues.apache.org/jira/browse/FLINK-10884 Project: Flink Issue Type: Bug Components: Cluster Management, Core Affects Versions: 1.6.2 Environment: version : 1.6.2 module : flink on yarn centos jdk1.8 hadoop 2.7 Reporter: wgcn TM container will be killed by nodemanager because of the exceeded [physical|http://www.baidu.com/link?url=Y4LyfMDH59n9-Ey16Fo6EFAYltN1e9anB3y2ynhVmdvuIBCkJGdH0hTExKDZRvXNr6hqhwIXs8JjYqesYbx0BOpQDD0o1VjbVQlOC-9MgXi] memory. I found the lanuch context lanuching TM container that "container memory = heap memory+ offHeapSizeMB" at the class org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters from line 160 to 166 I set a safety margin for the whole memory container using. For example if the container limit 3g memory, the sum memory that "heap memory+ offHeapSizeMB" is equal to 2.4g to prevent the container being killed.Do we have the [ready-made|http://www.baidu.com/link?url=ylC8cEafGU6DWAdU9ADcJPNugkjbx6IjtqIIxJ9foX4_Yfgc7ctWmpEpQRettVmBiOy7Wfph7S1UvN5LiJj-G1Rsb--oDw4Z2OEbA5Fj0bC] solution or I can commit my solution -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10864) Support multiple Main classes per jar
[ https://issues.apache.org/jira/browse/FLINK-10864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686532#comment-16686532 ] Chesnay Schepler commented on FLINK-10864: -- Personally I'd throw that one out in the long-run. The {{program-class}} thingies looks like some ancient legacy stuff we kept for backwards compatibility; but as far as i can tell it is entirely subsumed by {{Main-Class}}. > Support multiple Main classes per jar > - > > Key: FLINK-10864 > URL: https://issues.apache.org/jira/browse/FLINK-10864 > Project: Flink > Issue Type: Improvement > Components: Job-Submission >Affects Versions: 1.6.2 >Reporter: Flavio Pompermaier >Priority: Major > > Right now all the REST API and job submission system assumes that a jar > contains only a single main class. In my experience this is rarely the case > in real scenario: a jar contains multiple jobs (with similar dependencies) > that performs different tasks. > In our use case, for example, the shaded jar is around 200 MB and 10 jobs > within it... -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10882) Misleading job/task state for scheduled jobs
[ https://issues.apache.org/jira/browse/FLINK-10882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-10882: - Description: When submitting a job when not enough resources are available currently cuases the job stay in a {{CREATE/SCHEDULED}} state. There are 2 issues with how this is presented in the UI. The {{Running Jobs}} page incorrectly states that the job is running. (see list_view attachment) EDIT: Actually, from a runtime perspective the job is in fact in a RUNNING state. The state display for individual tasks either # States the task is in a CREATED state, when it is actually SCHEDULED # States the task is in a CREATED state, but the count for all state boxes is zero. (see task_view attachment) was: When submitting a job when not enough resources are available currently cuases the job stay in a {{CREATE/SCHEDULED}} state. There are 2 issues with how this is presented in the UI. The {{Running Jobs}} page incorrectly states that the job is running. (see list_view attachment) The state display for individual tasks either # States the task is in a CREATED state, when it is actually SCHEDULED # States the task is in a CREATED state, but the count for all state boxes is zero. (see task_view attachment) > Misleading job/task state for scheduled jobs > > > Key: FLINK-10882 > URL: https://issues.apache.org/jira/browse/FLINK-10882 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Priority: Major > Attachments: list_view.png, task_view.png > > > When submitting a job when not enough resources are available currently > cuases the job stay in a {{CREATE/SCHEDULED}} state. > There are 2 issues with how this is presented in the UI. > The {{Running Jobs}} page incorrectly states that the job is running. > (see list_view attachment) > EDIT: Actually, from a runtime perspective the job is in fact in a RUNNING > state. > The state display for individual tasks either > # States the task is in a CREATED state, when it is actually SCHEDULED > # States the task is in a CREATED state, but the count for all state boxes is > zero. > (see task_view attachment) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10864) Support multiple Main classes per jar
[ https://issues.apache.org/jira/browse/FLINK-10864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686497#comment-16686497 ] Chesnay Schepler commented on FLINK-10864: -- When you suggest a separate _manifest_ entry i assumed you meant a custom manifest file that is packaged with the jar. > Support multiple Main classes per jar > - > > Key: FLINK-10864 > URL: https://issues.apache.org/jira/browse/FLINK-10864 > Project: Flink > Issue Type: Improvement > Components: Job-Submission >Affects Versions: 1.6.2 >Reporter: Flavio Pompermaier >Priority: Major > > Right now all the REST API and job submission system assumes that a jar > contains only a single main class. In my experience this is rarely the case > in real scenario: a jar contains multiple jobs (with similar dependencies) > that performs different tasks. > In our use case, for example, the shaded jar is around 200 MB and 10 jobs > within it... -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10880) Failover strategies should not be applied to Batch Execution
[ https://issues.apache.org/jira/browse/FLINK-10880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686519#comment-16686519 ] TisonKun commented on FLINK-10880: -- Hi [~StephanEwen], I'd like to meet the consensus of where to document this message. My opinion is the option description at {{JobManagerOptions#EXECUTION_FAILOVER_STRATEGY}} and Java Doc at {{RestartPipelinedRegionStrategy}}. What do you think? Are these enough? > Failover strategies should not be applied to Batch Execution > > > Key: FLINK-10880 > URL: https://issues.apache.org/jira/browse/FLINK-10880 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.6.2 >Reporter: Stephan Ewen >Assignee: TisonKun >Priority: Blocker > Fix For: 1.6.3, 1.7.0 > > > When configuring a failover strategy other than "full", DataSet/Batch > execution is currently not correct. > This is expected, the failover region strategy is an experimental WIP feature > for streaming that has not been extended to the DataSet API. > We need to document this and prevent execution of DataSet features with other > failover strategies than "full". -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10880) Failover strategies should not be applied to Batch Execution
[ https://issues.apache.org/jira/browse/FLINK-10880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TisonKun reassigned FLINK-10880: Assignee: TisonKun > Failover strategies should not be applied to Batch Execution > > > Key: FLINK-10880 > URL: https://issues.apache.org/jira/browse/FLINK-10880 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.6.2 >Reporter: Stephan Ewen >Assignee: TisonKun >Priority: Blocker > Fix For: 1.6.3, 1.7.0 > > > When configuring a failover strategy other than "full", DataSet/Batch > execution is currently not correct. > This is expected, the failover region strategy is an experimental WIP feature > for streaming that has not been extended to the DataSet API. > We need to document this and prevent execution of DataSet features with other > failover strategies than "full". -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10883) Submitting a jobs without enough slots times out due to a unspecified timeout
[ https://issues.apache.org/jira/browse/FLINK-10883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-10883: - Description: When submitting a job without enough slots being available the job will stay in a SCHEDULED/CREATED state. After some time (a few minutes) the job execution will fail with the following timeout exception: {code} 2018-11-14 13:38:26,614 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Pending slot request [SlotRequestId{d9c0c94b6b81eae406f3d6cb6150fee4}] timed out. 2018-11-14 13:38:26,615 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN DataSource (at getDefaultTextLineDataSet(WordCountData.java:70) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:76)) -> Combine (SUM(1), at main(WordCount.java:79) (1/$java.util.concurrent.TimeoutException at org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:795) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} That the job submission may time out is not documented, neither is which timeout is responsible in the first place nor how/whether this can be disabled. was: When submitting a job without enough slots being available the job will stay in a SCHEDULED/CREATED state. After some time (a few minutes) the job execution will fail with the following timeout exception: {code} 2018-11-14 13:38:26,615 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN DataSource (at getDefaultTextLineDataSet(WordCountData.java:70) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:76)) -> Combine (SUM(1), at main(WordCount.java:79) (1/$java.util.concurrent.TimeoutException at org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:795) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} That the job submission may time out is not documented, neither is which timeout is responsible in the first place nor how/whether this can be disabled. > Submitting a jobs without enough slots times out due to a unspecified timeout > - > > Key: FLINK-10883 > URL: https://issues.apache.org/jira/browse/FLINK-10883 > Project: Flink > Issue Type: Improvement > Components: Job-Submission >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Priority: Major > > When submitting a job without enough slots being available the job will stay > in a SCHEDULED/CREATED state. After some time (a few minutes) the job > execution will fail with the following timeout exception: > {code} > 2018-11-14 13:38:26,614 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Pending slot > request [SlotRequestId{d9c0c94b6b81eae406f3d6cb6150fee4}] timed out. > 2018-11-14 13:38:26,615 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN > DataSource (at getDefaultTextLineDataSet(WordCountData.java:70) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at > main(WordCount.java:76)) -> Combine (SUM(1), at main(WordCount.java:79) > (1/$java.util.concurrent.TimeoutException > at > org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:795) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >
[jira] [Updated] (FLINK-10883) Submitting a jobs without enough slots times out due to a unspecified timeout
[ https://issues.apache.org/jira/browse/FLINK-10883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-10883: - Description: When submitting a job without enough slots being available the job will stay in a SCHEDULED/CREATED state. After some time (a few minutes) the job execution will fail with the following timeout exception: {code} 2018-11-14 13:38:26,615 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN DataSource (at getDefaultTextLineDataSet(WordCountData.java:70) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:76)) -> Combine (SUM(1), at main(WordCount.java:79) (1/$java.util.concurrent.TimeoutException at org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:795) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} That the job submission may time out is not documented, neither is which timeout is responsible in the first place nor how/whether this can be disabled. was: When submitting a job without enough slots being available the job will stay in a SCHEDULED/CREATED state. After some time (a few minutes) the job execution will fail with the following timeout exception: {code} 2018-11-14 13:38:26,615 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN DataSource (at getDefaultTextLineDataSet(WordCountData.java:70) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:76)) -> Combine (SUM(1), at main(WordCount.java:79) (1/$java.util.concurrent.TimeoutException at org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:795) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} That the job submission will time out is not documented, neither is which timeout is responsible in the first place or how/whether this can be disabled. > Submitting a jobs without enough slots times out due to a unspecified timeout > - > > Key: FLINK-10883 > URL: https://issues.apache.org/jira/browse/FLINK-10883 > Project: Flink > Issue Type: Improvement > Components: Job-Submission >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Priority: Major > > When submitting a job without enough slots being available the job will stay > in a SCHEDULED/CREATED state. After some time (a few minutes) the job > execution will fail with the following timeout exception: > {code} > 2018-11-14 13:38:26,615 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN > DataSource (at getDefaultTextLineDataSet(WordCountData.java:70) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at > main(WordCount.java:76)) -> Combine (SUM(1), at main(WordCount.java:79) > (1/$java.util.concurrent.TimeoutException > at > org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:795) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) >
[jira] [Created] (FLINK-10882) Misleading job/task state for scheduled jobs
Chesnay Schepler created FLINK-10882: Summary: Misleading job/task state for scheduled jobs Key: FLINK-10882 URL: https://issues.apache.org/jira/browse/FLINK-10882 Project: Flink Issue Type: Bug Components: Webfrontend Affects Versions: 1.7.0 Reporter: Chesnay Schepler Attachments: list_view.png, task_view.png When submitting a job when not enough resources are available currently cuases the job stay in a {{CREATE/SCHEDULED}} state. There are 2 issues with how this is presented in the UI. The {{Running Jobs}} page incorrectly states that the job is running. (see list_view attachment) The state display for individual tasks either # States the task is in a CREATED state, when it is actually SCHEDULED # States the task is in a CREATED state, but the count for all state boxes is zero. (see task_view attachment) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10883) Submitting a jobs without enough slots times out due to a unspecified timeout
Chesnay Schepler created FLINK-10883: Summary: Submitting a jobs without enough slots times out due to a unspecified timeout Key: FLINK-10883 URL: https://issues.apache.org/jira/browse/FLINK-10883 Project: Flink Issue Type: Improvement Components: Job-Submission Affects Versions: 1.7.0 Reporter: Chesnay Schepler When submitting a job without enough slots being available the job will stay in a SCHEDULED/CREATED state. After some time (a few minutes) the job execution will fail with the following timeout exception: {code} 2018-11-14 13:38:26,615 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN DataSource (at getDefaultTextLineDataSet(WordCountData.java:70) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:76)) -> Combine (SUM(1), at main(WordCount.java:79) (1/$java.util.concurrent.TimeoutException at org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:795) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} That the job submission will time out is not documented, neither is which timeout is responsible in the first place or how/whether this can be disabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10864) Support multiple Main classes per jar
[ https://issues.apache.org/jira/browse/FLINK-10864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686505#comment-16686505 ] Flavio Pompermaier commented on FLINK-10864: No, I suggest to add an attribute to the Manifest. Currently the PackagedProgram class only checks for the existence of program-class or Main-Class. I was arguing that as Flink handles program-class (that is not standard) it could, quite easily, handle also a list of comma-separated classes in another custom attribute. It is a reasonable trade-off I think > Support multiple Main classes per jar > - > > Key: FLINK-10864 > URL: https://issues.apache.org/jira/browse/FLINK-10864 > Project: Flink > Issue Type: Improvement > Components: Job-Submission >Affects Versions: 1.6.2 >Reporter: Flavio Pompermaier >Priority: Major > > Right now all the REST API and job submission system assumes that a jar > contains only a single main class. In my experience this is rarely the case > in real scenario: a jar contains multiple jobs (with similar dependencies) > that performs different tasks. > In our use case, for example, the shaded jar is around 200 MB and 10 jobs > within it... -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10869) Update S3 testing settings
[ https://issues.apache.org/jira/browse/FLINK-10869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10869: --- Labels: pull-request-available (was: ) > Update S3 testing settings > -- > > Key: FLINK-10869 > URL: https://issues.apache.org/jira/browse/FLINK-10869 > Project: Flink > Issue Type: Improvement > Components: Tests >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Labels: pull-request-available > > Currently S3 tests go against a bucket hosted by 'data Artisans'. > As part of reworking the AWS permission setup, we need to adapt the > credentials and buckets for these tests. > Future tests should refer to the following environment variables for S3 tests: > * `IT_CASE_S3_BUCKET` > * `IT_CASE_S3_ACCESS_KEY=AKIAIQKDG4KW5QA6TFGA` > * `IT_CASE_S3_SECRET_KEY` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StephanEwen opened a new pull request #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables.
StephanEwen opened a new pull request #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables. URL: https://github.com/apache/flink/pull/7098 ## What is the purpose of the change This updates the S3 credentials used for Integration Tests that interact with S3. The S3 bucket and account were provided by data Artisans and our permission setup was changed to a cleaner and more isolated model. ## Verifying this change Not possible on the pull request. Only the mast branch builds have access to the environment variables with S3 credentials, to prevent leaking the credentials. After merging, we have to check that the tests still execute the S3 integration tests and do not skip them. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **yes** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10869) Update S3 testing settings
[ https://issues.apache.org/jira/browse/FLINK-10869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686490#comment-16686490 ] ASF GitHub Bot commented on FLINK-10869: StephanEwen opened a new pull request #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables. URL: https://github.com/apache/flink/pull/7098 ## What is the purpose of the change This updates the S3 credentials used for Integration Tests that interact with S3. The S3 bucket and account were provided by data Artisans and our permission setup was changed to a cleaner and more isolated model. ## Verifying this change Not possible on the pull request. Only the mast branch builds have access to the environment variables with S3 credentials, to prevent leaking the credentials. After merging, we have to check that the tests still execute the S3 integration tests and do not skip them. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **yes** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Update S3 testing settings > -- > > Key: FLINK-10869 > URL: https://issues.apache.org/jira/browse/FLINK-10869 > Project: Flink > Issue Type: Improvement > Components: Tests >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Labels: pull-request-available > > Currently S3 tests go against a bucket hosted by 'data Artisans'. > As part of reworking the AWS permission setup, we need to adapt the > credentials and buckets for these tests. > Future tests should refer to the following environment variables for S3 tests: > * `IT_CASE_S3_BUCKET` > * `IT_CASE_S3_ACCESS_KEY=AKIAIQKDG4KW5QA6TFGA` > * `IT_CASE_S3_SECRET_KEY` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10874) Kafka 2.0 connector testMigrateFromAtLeastOnceToExactlyOnce failure
[ https://issues.apache.org/jira/browse/FLINK-10874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10874: --- Labels: pull-request-available (was: ) > Kafka 2.0 connector testMigrateFromAtLeastOnceToExactlyOnce failure > --- > > Key: FLINK-10874 > URL: https://issues.apache.org/jira/browse/FLINK-10874 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.8.0 >Reporter: Piotr Nowojski >Priority: Critical > Labels: pull-request-available > > https://api.travis-ci.org/v3/job/454449444/log.txt > {noformat} > Test > testMigrateFromAtLeastOnceToExactlyOnce(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase) > is running. > > 16:35:07,894 WARN > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer - Property > [transaction.timeout.ms] not specified. Setting it to 360 ms > 16:35:07,903 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer - Starting > FlinkKafkaInternalProducer (1/1) to produce into default topic > testMigrateFromAtLeastOnceToExactlyOnce > 16:35:08,785 ERROR > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase - > > Test > testMigrateFromAtLeastOnceToExactlyOnce(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase) > failed with: > java.lang.Exception: Could not complete snapshot 0 for operator MockTask > (1/1). > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:419) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshotWithLocalState(AbstractStreamOperatorTestHarness.java:505) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:497) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testRecoverWithChangeSemantics(FlinkKafkaProducerITCase.java:591) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testMigrateFromAtLeastOnceToExactlyOnce(FlinkKafkaProducerITCase.java:569) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) > at > org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) > at >
[jira] [Commented] (FLINK-10874) Kafka 2.0 connector testMigrateFromAtLeastOnceToExactlyOnce failure
[ https://issues.apache.org/jira/browse/FLINK-10874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686473#comment-16686473 ] ASF GitHub Bot commented on FLINK-10874: pnowojski opened a new pull request #7097: [FLINK-10874][kafka-docs] Document likely cause of UnknownTopicOrPartitionException URL: https://github.com/apache/flink/pull/7097 This is a change in documentation only. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka 2.0 connector testMigrateFromAtLeastOnceToExactlyOnce failure > --- > > Key: FLINK-10874 > URL: https://issues.apache.org/jira/browse/FLINK-10874 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.8.0 >Reporter: Piotr Nowojski >Priority: Critical > Labels: pull-request-available > > https://api.travis-ci.org/v3/job/454449444/log.txt > {noformat} > Test > testMigrateFromAtLeastOnceToExactlyOnce(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase) > is running. > > 16:35:07,894 WARN > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer - Property > [transaction.timeout.ms] not specified. Setting it to 360 ms > 16:35:07,903 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer - Starting > FlinkKafkaInternalProducer (1/1) to produce into default topic > testMigrateFromAtLeastOnceToExactlyOnce > 16:35:08,785 ERROR > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase - > > Test > testMigrateFromAtLeastOnceToExactlyOnce(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase) > failed with: > java.lang.Exception: Could not complete snapshot 0 for operator MockTask > (1/1). > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:419) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshotWithLocalState(AbstractStreamOperatorTestHarness.java:505) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:497) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testRecoverWithChangeSemantics(FlinkKafkaProducerITCase.java:591) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testMigrateFromAtLeastOnceToExactlyOnce(FlinkKafkaProducerITCase.java:569) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at
[GitHub] pnowojski opened a new pull request #7097: [FLINK-10874][kafka-docs] Document likely cause of UnknownTopicOrPartitionException
pnowojski opened a new pull request #7097: [FLINK-10874][kafka-docs] Document likely cause of UnknownTopicOrPartitionException URL: https://github.com/apache/flink/pull/7097 This is a change in documentation only. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10881) SavepointITCase deadlocks on travis
Chesnay Schepler created FLINK-10881: Summary: SavepointITCase deadlocks on travis Key: FLINK-10881 URL: https://issues.apache.org/jira/browse/FLINK-10881 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing, Tests Affects Versions: 1.7.0 Reporter: Chesnay Schepler https://travis-ci.org/apache/flink/jobs/454898424 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10864) Support multiple Main classes per jar
[ https://issues.apache.org/jira/browse/FLINK-10864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686459#comment-16686459 ] Flavio Pompermaier commented on FLINK-10864: What about program-class attribute? Also that one is not standard but it is used by PackagedProgram.. > Support multiple Main classes per jar > - > > Key: FLINK-10864 > URL: https://issues.apache.org/jira/browse/FLINK-10864 > Project: Flink > Issue Type: Improvement > Components: Job-Submission >Affects Versions: 1.6.2 >Reporter: Flavio Pompermaier >Priority: Major > > Right now all the REST API and job submission system assumes that a jar > contains only a single main class. In my experience this is rarely the case > in real scenario: a jar contains multiple jobs (with similar dependencies) > that performs different tasks. > In our use case, for example, the shaded jar is around 200 MB and 10 jobs > within it... -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twalthr commented on issue #7045: [hotfix] Update nightly master cron jobs
twalthr commented on issue #7045: [hotfix] Update nightly master cron jobs URL: https://github.com/apache/flink/pull/7045#issuecomment-438646738 The changes have been applied to `cron-master-e2e` and `cron-1.7-e2e`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr closed pull request #7045: [hotfix] Update nightly master cron jobs
twalthr closed pull request #7045: [hotfix] Update nightly master cron jobs URL: https://github.com/apache/flink/pull/7045 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/.travis.yml b/.travis.yml index 3a5c6c9f858..cbd8dc0496c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -69,12 +69,12 @@ matrix: - REMOTE="apache" - BRANCH="master" - PROFILE="-Dhadoop.version=2.8.3" - - SCRIPT="split_docker_e2e.sh" + - SCRIPT="split_container.sh" - env: - REMOTE="apache" - BRANCH="master" - PROFILE="-Dhadoop.version=2.8.3" - - SCRIPT="split_kubernetes_e2e.sh" + - SCRIPT="split_heavy.sh" - env: - REMOTE="apache" - BRANCH="master" @@ -99,12 +99,12 @@ matrix: - REMOTE="apache" - BRANCH="master" - PROFILE="-Dhadoop.version=2.8.3 -Dscala-2.12" - - SCRIPT="split_docker_e2e.sh" + - SCRIPT="split_container.sh" - env: - REMOTE="apache" - BRANCH="master" - PROFILE="-Dhadoop.version=2.8.3 -Dscala-2.12" - - SCRIPT="split_kubernetes_e2e.sh" + - SCRIPT="split_heavy.sh" - env: - REMOTE="apache" - BRANCH="master" @@ -129,12 +129,12 @@ matrix: - REMOTE="apache" - BRANCH="master" - PROFILE="-DwithoutHadoop" - - SCRIPT="split_docker_e2e.sh" + - SCRIPT="split_container.sh" - env: - REMOTE="apache" - BRANCH="master" - PROFILE="-DwithoutHadoop" - - SCRIPT="split_kubernetes_e2e.sh" + - SCRIPT="split_heavy.sh" git: depth: 100 diff --git a/nightly.sh b/nightly.sh index 19bc7b53287..c2d9ff6e1fc 100755 --- a/nightly.sh +++ b/nightly.sh @@ -44,7 +44,7 @@ LOG4J_PROPERTIES=${FLINK_DIR}/tools/log4j-travis.properties MVN_LOGGING_OPTIONS="-Dlog.dir=${ARTIFACTS_DIR} -Dlog4j.configuration=file://$LOG4J_PROPERTIES -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn" MVN_COMMON_OPTIONS="-nsu -B -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 -Dfast" -MVN_COMPILE_OPTIONS="-DskipTests" +MVN_COMPILE_OPTIONS="-T1C -DskipTests" git clone --single-branch -b ${BRANCH} https://github.com/${REMOTE}/flink diff --git a/splits/split_checkpoints.sh b/splits/split_checkpoints.sh index 930b5a4bd98..5dfe7e34402 100755 --- a/splits/split_checkpoints.sh +++ b/splits/split_checkpoints.sh @@ -49,9 +49,12 @@ run_test "Resuming Savepoint (file, async, scale up) end-to-end test" "$END_TO_E run_test "Resuming Savepoint (file, sync, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 file false" run_test "Resuming Savepoint (file, async, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 file true" run_test "Resuming Savepoint (file, sync, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 file false" -run_test "Resuming Savepoint (rocks, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 rocks" -run_test "Resuming Savepoint (rocks, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 rocks" -run_test "Resuming Savepoint (rocks, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 rocks" +run_test "Resuming Savepoint (rocks, no parallelism change, heap timers) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 rocks false heap" +run_test "Resuming Savepoint (rocks, scale up, heap timers) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 rocks false heap" +run_test "Resuming Savepoint (rocks, scale down, heap timers) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 rocks false heap" +run_test "Resuming Savepoint (rocks, no parallelism change, rocks timers) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 rocks false rocks" +run_test "Resuming Savepoint (rocks, scale up, rocks timers) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 rocks false rocks" +run_test "Resuming Savepoint (rocks, scale down, rocks timers) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 rocks false rocks" run_test "Resuming Externalized Checkpoint (file, async, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 file true true" run_test "Resuming Externalized Checkpoint (file, sync, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 file false true" diff --git a/splits/split_docker_e2e.sh b/splits/split_container.sh similarity index 95% rename from
[jira] [Reopened] (FLINK-10856) Harden resume from externalized checkpoint E2E test
[ https://issues.apache.org/jira/browse/FLINK-10856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reopened FLINK-10856: -- The latest fix introduced a new instability where we attempt to resume from an incomplete checkpoint (i.e. one that has no {{_metadata_}} file). We have to double-check that the checkpoint was actually completed. https://travis-ci.org/zentol/flink/jobs/454943554 {code} 2018-11-14 12:11:45,071 ERROR org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Failed to submit job 9e7876f96c583177d183ce18b52bbd18. java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager at org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:176) at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308) at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) ... 7 more Caused by: java.io.FileNotFoundException: Cannot find meta data file '_metadata' in directory 'file:/home/travis/build/zentol/flink/flink/flink-end-to-end-tests/test-scripts/temp-test-directory-14313753102/externalized-chckpt-e2e-backend-dir/dfcac75e697394900e5088f962130d57/chk-10'. Please try to load the checkpoint/savepoint directly from the metadata file instead of the directory. at org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:256) at org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpoint(AbstractFsCheckpointStorage.java:109) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1100) at org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1234) at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1158) at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:296) at org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157) ... 10 more {code} > Harden resume from externalized checkpoint E2E test > --- > > Key: FLINK-10856 > URL: https://issues.apache.org/jira/browse/FLINK-10856 > Project: Flink > Issue Type: Bug > Components: E2E Tests, State Backends, Checkpointing >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > The resume from externalized checkpoints E2E test can fail due to > FLINK-10855. We should harden the test script to not expect a single > checkpoint directory being present but to take the checkpoint with the > highest checkpoint counter. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10419) ClassNotFoundException while deserializing user exceptions from checkpointing
[ https://issues.apache.org/jira/browse/FLINK-10419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686446#comment-16686446 ] ASF GitHub Bot commented on FLINK-10419: dawidwys opened a new pull request #7096: [FLINK-10419] Call JobMasterGateway through RpcCheckpointResponder in… URL: https://github.com/apache/flink/pull/7096 Improved test to call `JobMasterGateway` through `RpcCheckpointResponder`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ClassNotFoundException while deserializing user exceptions from checkpointing > - > > Key: FLINK-10419 > URL: https://issues.apache.org/jira/browse/FLINK-10419 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.5.4, 1.6.0, 1.6.1, 1.7.0 >Reporter: Nico Kruber >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > It seems that somewhere in the operator's failure handling, we hand a > user-code exception to the checkpoint coordinator via Java serialization but > it will then fail during the de-serialization because the class is not > available. This will result in the following error shadowing the real one: > {code} > java.lang.ClassNotFoundException: > org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher.loadClass(Launcher.java:338) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at > org.apache.flink.util.InstantiationUtil.resolveClass(InstantiationUtil.java:76) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1859) > at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1745) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2033) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) > at > java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:557) > at java.lang.Throwable.readObject(Throwable.java:914) > at sun.reflect.GeneratedMethodAccessor158.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427) > at > org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.readObject(RemoteRpcInvocation.java:222) > at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:502) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:489) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:477) > at > org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58) > at > org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.deserializeMethodInvocation(RemoteRpcInvocation.java:118) > at >
[GitHub] dawidwys opened a new pull request #7096: [FLINK-10419] Call JobMasterGateway through RpcCheckpointResponder in…
dawidwys opened a new pull request #7096: [FLINK-10419] Call JobMasterGateway through RpcCheckpointResponder in… URL: https://github.com/apache/flink/pull/7096 Improved test to call `JobMasterGateway` through `RpcCheckpointResponder`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-10852) Decremented number of unfinished producers below 0. This is most likely a bug in the execution state/intermediate result partition management
[ https://issues.apache.org/jira/browse/FLINK-10852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-10852. Resolution: Duplicate Fix Version/s: (was: 1.8.0) Not a bug in itself, rather a symptom of [FLINK-10880] > Decremented number of unfinished producers below 0. This is most likely a bug > in the execution state/intermediate result partition management > - > > Key: FLINK-10852 > URL: https://issues.apache.org/jira/browse/FLINK-10852 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.7.0 >Reporter: ouyangzhe >Priority: Major > > > {panel:title=Jobs using DataSet iteration operator, if set > jobmanager.execution.failover-strategy: region, will hang on FAILING state > when failover and has the following exception.} > java.lang.IllegalStateException: Decremented number of unfinished producers > below 0. This is most likely a bug in the execution state/intermediate result > partition management. at > org.apache.flink.runtime.executiongraph.IntermediateResultPartition.markFinished(IntermediateResultPartition.java:103) > at > org.apache.flink.runtime.executiongraph.ExecutionVertex.finishAllBlockingPartitions(ExecutionVertex.java:707) > at > org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:939) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1568) > at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:542) > at sun.reflect.GeneratedMethodAccessor25.invoke(Unknown Source) at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at > akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at > akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at > akka.actor.ActorCell.invoke(ActorCell.scala:495) at > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at > akka.dispatch.Mailbox.run(Mailbox.scala:224) at > akka.dispatch.Mailbox.exec(Mailbox.scala:234) at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {panel} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10852) Decremented number of unfinished producers below 0. This is most likely a bug in the execution state/intermediate result partition management
[ https://issues.apache.org/jira/browse/FLINK-10852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686417#comment-16686417 ] Stephan Ewen commented on FLINK-10852: -- {{jobmanager.execution.failover-strategy}} is very much a "work in progress" internal feature at the moment. It is actually not working correctly with the DataSet API in general, iterations being only one part. So this is expected to not work, it is an unfinished feature. I created [FLINK-10880] to handle this. > Decremented number of unfinished producers below 0. This is most likely a bug > in the execution state/intermediate result partition management > - > > Key: FLINK-10852 > URL: https://issues.apache.org/jira/browse/FLINK-10852 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.7.0 >Reporter: ouyangzhe >Priority: Major > Fix For: 1.8.0 > > > > {panel:title=Jobs using DataSet iteration operator, if set > jobmanager.execution.failover-strategy: region, will hang on FAILING state > when failover and has the following exception.} > java.lang.IllegalStateException: Decremented number of unfinished producers > below 0. This is most likely a bug in the execution state/intermediate result > partition management. at > org.apache.flink.runtime.executiongraph.IntermediateResultPartition.markFinished(IntermediateResultPartition.java:103) > at > org.apache.flink.runtime.executiongraph.ExecutionVertex.finishAllBlockingPartitions(ExecutionVertex.java:707) > at > org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:939) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1568) > at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:542) > at sun.reflect.GeneratedMethodAccessor25.invoke(Unknown Source) at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at > akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at > akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at > akka.actor.ActorCell.invoke(ActorCell.scala:495) at > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at > akka.dispatch.Mailbox.run(Mailbox.scala:224) at > akka.dispatch.Mailbox.exec(Mailbox.scala:234) at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {panel} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10880) Failover strategies should not be applied to Batch Execution
Stephan Ewen created FLINK-10880: Summary: Failover strategies should not be applied to Batch Execution Key: FLINK-10880 URL: https://issues.apache.org/jira/browse/FLINK-10880 Project: Flink Issue Type: Bug Components: Distributed Coordination Affects Versions: 1.6.2 Reporter: Stephan Ewen Fix For: 1.6.3, 1.7.0 When configuring a failover strategy other than "full", DataSet/Batch execution is currently not correct. This is expected, the failover region strategy is an experimental WIP feature for streaming that has not been extended to the DataSet API. We need to document this and prevent execution of DataSet features with other failover strategies than "full". -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10864) Support multiple Main classes per jar
[ https://issues.apache.org/jira/browse/FLINK-10864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686413#comment-16686413 ] Chesnay Schepler commented on FLINK-10864: -- I don't like the idea of introducing a separate manifest mechanism that is entirely decoupled from the standard manifest file. I believe this is way out of the scope of Flink (or really, anything outside the JDK) and will cause many headaches down the line. Uploading multiple jars is a reasonable alternative IMO. One idea I had at times was to actually allow multiple jars to be submitted at once; one containing the job and N for additional dependencies (kind of like a job-scoped /lib directory). This would make it easier to re-use dependencies across jobs. But then you're dealing with the issue of safely conveying that "to run this jar you also have to submit X". > Support multiple Main classes per jar > - > > Key: FLINK-10864 > URL: https://issues.apache.org/jira/browse/FLINK-10864 > Project: Flink > Issue Type: Improvement > Components: Job-Submission >Affects Versions: 1.6.2 >Reporter: Flavio Pompermaier >Priority: Major > > Right now all the REST API and job submission system assumes that a jar > contains only a single main class. In my experience this is rarely the case > in real scenario: a jar contains multiple jobs (with similar dependencies) > that performs different tasks. > In our use case, for example, the shaded jar is around 200 MB and 10 jobs > within it... -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10864) Support multiple Main classes per jar
[ https://issues.apache.org/jira/browse/FLINK-10864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686407#comment-16686407 ] Stephan Ewen commented on FLINK-10864: -- I am skeptical about that, to be honest. This seems very ad-hoc, it is not common manifest entries, there is an arbitrary difference between a main class and other main classes. This feels like pushing functionality into Flink that belongs in operations/deployment tooling. > Support multiple Main classes per jar > - > > Key: FLINK-10864 > URL: https://issues.apache.org/jira/browse/FLINK-10864 > Project: Flink > Issue Type: Improvement > Components: Job-Submission >Affects Versions: 1.6.2 >Reporter: Flavio Pompermaier >Priority: Major > > Right now all the REST API and job submission system assumes that a jar > contains only a single main class. In my experience this is rarely the case > in real scenario: a jar contains multiple jobs (with similar dependencies) > that performs different tasks. > In our use case, for example, the shaded jar is around 200 MB and 10 jobs > within it... -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10879) Align Flink clients on env.execute()
Flavio Pompermaier created FLINK-10879: -- Summary: Align Flink clients on env.execute() Key: FLINK-10879 URL: https://issues.apache.org/jira/browse/FLINK-10879 Project: Flink Issue Type: Improvement Components: Client Affects Versions: 1.6.2 Reporter: Flavio Pompermaier Right now the REST APIs do not support any code after env.execute while the Flink API, CLI client or the code executed within the IDE do. Both clients should behave in the same way (supporting env.execute() to return something and continue the code execution or not). See the discussion on the DEV ML for more details: http://mail-archives.apache.org/mod_mbox/flink-dev/201811.mbox/%3CCAELUF_DhjzL9FECvx040_GE3d85Ykb-HcGVCh0O4y9h-cThq7A%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10862) REST API does not show program descriptions of "simple" ProgramDescription
[ https://issues.apache.org/jira/browse/FLINK-10862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686391#comment-16686391 ] Flavio Pompermaier commented on FLINK-10862: What do you mean with "proper artifact management system"? Is there any suggestion about it? In this case the Job Manager should be able to download (or use) the jar from such an external system (that at the moment is not possible if I'm not wrong..). I think that with a simple enhancement to the REST API Flink will be just fine (at least for ordinary workloads), without the need to introduce another external component to the architecture..I would prefer an improvement in this direction with respect to a deprecation > REST API does not show program descriptions of "simple" ProgramDescription > -- > > Key: FLINK-10862 > URL: https://issues.apache.org/jira/browse/FLINK-10862 > Project: Flink > Issue Type: Bug > Components: REST >Affects Versions: 1.6.2 >Reporter: Flavio Pompermaier >Priority: Major > Labels: rest_api > > When uploading a jar containing a main class implementing ProgramDescription > interface, the REST API doesn't list its description. It works only if the > class implements Program (that I find pretty useless...why should I return > the plan?) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10867) Add a DataSet-based CacheOperator to reuse results between jobs
[ https://issues.apache.org/jira/browse/FLINK-10867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686380#comment-16686380 ] Stephan Ewen commented on FLINK-10867: -- [~mcoimbra] This is an interesting idea. As you describe, the work of a cache operator is somehow deeply interweaved with cluster setup, job and operator scheduling, etc. If we add this to the core APIs, people will expect it to work intuitively in all situations. That would be a more significant effort, in scheduling an the DataSet API. I would personally focus the effort on proper batch/streaming unification under the streaming APIs and operators, and look if we can get concepts of stream caching in there. This is however a longer term effort. If you believe that this extension is significant for your use case and should be available today, what do you think about offering this as a "library/extension" in your own GitHub project? > Add a DataSet-based CacheOperator to reuse results between jobs > --- > > Key: FLINK-10867 > URL: https://issues.apache.org/jira/browse/FLINK-10867 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, DataSet API, Local Runtime >Affects Versions: 1.8.0 >Reporter: Miguel E. Coimbra >Assignee: vinoyang >Priority: Major > Fix For: 1.8.0 > > > *Motivation.* > There are job scenarios where Flink batch processing users may be interested > in processing a large amount of data, outputting results to disk and then > reusing the results for another type of computation in Flink again. > This feature suggestion emerged from my work as a PhD researcher working on > graph stream processing. > [https://arxiv.org/abs/1810.02781] > More specifically, in our use case this would be very useful to maintain an > evolving graph while allowing for specific logic on challenges such as _when_ > and _how_ to integrate updates in the graph and also how to represent it. > Furthermore, it would also be an enabler for rich use-cases that have synergy > with this existing Jira issue pertaining graph partitioning: > FLINK-1536 - Graph partitioning operators for Gelly > *Problem.* > While it would be negligible to write the results to disk and then read them > back in a new job to be sent to the JobManager if they are small, this > becomes prohibitive if there are several gigabytes of data to write/read and > using a distributed storage (e.g. HDFS) is not an option. > Even if there is a distributed storage available, as the number of sequential > jobs increases, even the benefits of the secondary storage being distributed > will diminish. > *Existing alternatives.* > I also considered, as a possibility, to compose the sequence of jobs in a > single big job to submit to the JobManager, thus allowing reuse of results > due to the natural forwarding of results to subsequent operators in dataflow > programing. > However, this becomes difficult due to two reasons: > * The logic to connect the sequence of jobs may depend on factors external > to Flink and not known at the start of the job composition. > This also excludes limited iterative behavior like what is provided in > {{BulkIteration/DeltaIteration;}} > ** Composing a job with "too many" operators and inter-dependencies may lead > to the Optimizer engaging an exponential optimization search space. > This is particularly true for operators with multiple valid execution > strategies, leading to a combinatorics problem. > This leads to the Flink compiler _taking forever_ to even create a plan. > I believe this is the current situation based on a reply I received from > [~fhueske] last year. > His reply was on the 7th of December 2017: > Link: > [http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/raw/%3CCAAdrtT0FprAbTEFmv3qsgy%2BBFEXtxBBxM_0VS%3DfDsMrzAqY9ew%40mail.gmail.com%3E] > Mailing list thread title: "Re: How to perform efficient DataSet reuse > between iterations" > > *Idea.* > Perhaps the better way to describe this *CacheOperator* feature is the > concept of "_job chaining_", where a new type of DataSink would receive data > that will: > - Be available to a subsequent job which somehow makes a reference to the > DataSink of the previous job; > - Have remained available (from the previous job execution) in the exact > same TaskManagers in the cluster. > Likely, the optimal memory distribution will be pretty similar between > chained jobs - if the data was read from disk again between jobs, it would > likely be distributed with the same (automatic or not) strategies, hence the > same distribution would likely be of use to sequential jobs. > *Design.* > Potential conflicts with the current Flink cluster execution model: > - The FlinkMiniCluster used with
[jira] [Commented] (FLINK-10864) Support multiple Main classes per jar
[ https://issues.apache.org/jira/browse/FLINK-10864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686368#comment-16686368 ] Flavio Pompermaier commented on FLINK-10864: I don't suggest to scan the entire jar, just to use a different manifest entry (like *Other-Main-Classes* or *Main-classes*). E.g: *Main-Class:* org.apache.flink.WordCount *Other-Main-Classes*: org.apache.flink.WordCount2,org.apache.flink.WordCount3,org.apache.flink.WordCount4 or *Main-Classes:* org.apache.flink.WordCount,org.apache.flink.WordCount2,org.apache.flink.WordCount3,org.apache.flink.WordCount4 Then, the org.apache.flink.client.program.PackagedProgram class should just look for this extra tag and handle it. > Support multiple Main classes per jar > - > > Key: FLINK-10864 > URL: https://issues.apache.org/jira/browse/FLINK-10864 > Project: Flink > Issue Type: Improvement > Components: Job-Submission >Affects Versions: 1.6.2 >Reporter: Flavio Pompermaier >Priority: Major > > Right now all the REST API and job submission system assumes that a jar > contains only a single main class. In my experience this is rarely the case > in real scenario: a jar contains multiple jobs (with similar dependencies) > that performs different tasks. > In our use case, for example, the shaded jar is around 200 MB and 10 jobs > within it... -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10867) Add a DataSet-based CacheOperator to reuse results between jobs
[ https://issues.apache.org/jira/browse/FLINK-10867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Miguel E. Coimbra updated FLINK-10867: -- Labels: (was: pull-request-available) > Add a DataSet-based CacheOperator to reuse results between jobs > --- > > Key: FLINK-10867 > URL: https://issues.apache.org/jira/browse/FLINK-10867 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, DataSet API, Local Runtime >Affects Versions: 1.8.0 >Reporter: Miguel E. Coimbra >Assignee: vinoyang >Priority: Major > Fix For: 1.8.0 > > > *Motivation.* > There are job scenarios where Flink batch processing users may be interested > in processing a large amount of data, outputting results to disk and then > reusing the results for another type of computation in Flink again. > This feature suggestion emerged from my work as a PhD researcher working on > graph stream processing. > [https://arxiv.org/abs/1810.02781] > More specifically, in our use case this would be very useful to maintain an > evolving graph while allowing for specific logic on challenges such as _when_ > and _how_ to integrate updates in the graph and also how to represent it. > Furthermore, it would also be an enabler for rich use-cases that have synergy > with this existing Jira issue pertaining graph partitioning: > FLINK-1536 - Graph partitioning operators for Gelly > *Problem.* > While it would be negligible to write the results to disk and then read them > back in a new job to be sent to the JobManager if they are small, this > becomes prohibitive if there are several gigabytes of data to write/read and > using a distributed storage (e.g. HDFS) is not an option. > Even if there is a distributed storage available, as the number of sequential > jobs increases, even the benefits of the secondary storage being distributed > will diminish. > *Existing alternatives.* > I also considered, as a possibility, to compose the sequence of jobs in a > single big job to submit to the JobManager, thus allowing reuse of results > due to the natural forwarding of results to subsequent operators in dataflow > programing. > However, this becomes difficult due to two reasons: > * The logic to connect the sequence of jobs may depend on factors external > to Flink and not known at the start of the job composition. > This also excludes limited iterative behavior like what is provided in > {{BulkIteration/DeltaIteration;}} > ** Composing a job with "too many" operators and inter-dependencies may lead > to the Optimizer engaging an exponential optimization search space. > This is particularly true for operators with multiple valid execution > strategies, leading to a combinatorics problem. > This leads to the Flink compiler _taking forever_ to even create a plan. > I believe this is the current situation based on a reply I received from > [~fhueske] last year. > His reply was on the 7th of December 2017: > Link: > [http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/raw/%3CCAAdrtT0FprAbTEFmv3qsgy%2BBFEXtxBBxM_0VS%3DfDsMrzAqY9ew%40mail.gmail.com%3E] > Mailing list thread title: "Re: How to perform efficient DataSet reuse > between iterations" > > *Idea.* > Perhaps the better way to describe this *CacheOperator* feature is the > concept of "_job chaining_", where a new type of DataSink would receive data > that will: > - Be available to a subsequent job which somehow makes a reference to the > DataSink of the previous job; > - Have remained available (from the previous job execution) in the exact > same TaskManagers in the cluster. > Likely, the optimal memory distribution will be pretty similar between > chained jobs - if the data was read from disk again between jobs, it would > likely be distributed with the same (automatic or not) strategies, hence the > same distribution would likely be of use to sequential jobs. > *Design.* > Potential conflicts with the current Flink cluster execution model: > - The FlinkMiniCluster used with LocalEnvironment is destroyed when a job > finishes in local mode, so it would be necessary to change local mode to keep > a FlinkMiniCluster alive - what was the reasoning behind destroying it? > Simplifying the implementation? > - How would this look like in the API? > I envisioned an example like this: > {{DataSet> previousResult = > callSomeFlinkDataflowOperator(); // The result of some previous computation.}} > {{CacheOperator>> op = previousResult.cache();}} > {{... // Other operations...}} > {{environment.execute();}} > {{... // The previous job has finished.}} > {{DataSet> sorted = op.sort(0); // the previous DataSet, > which resulted from callSomeFlinkDataflowOperator() int the previous Flink > job, remained in
[jira] [Updated] (FLINK-10867) Add a DataSet-based CacheOperator to reuse results between jobs
[ https://issues.apache.org/jira/browse/FLINK-10867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Miguel E. Coimbra updated FLINK-10867: -- Description: *Motivation.* There are job scenarios where Flink batch processing users may be interested in processing a large amount of data, outputting results to disk and then reusing the results for another type of computation in Flink again. This feature suggestion emerged from my work as a PhD researcher working on graph stream processing. [https://arxiv.org/abs/1810.02781] More specifically, in our use case this would be very useful to maintain an evolving graph while allowing for specific logic on challenges such as _when_ and _how_ to integrate updates in the graph and also how to represent it. Furthermore, it would also be an enabler for rich use-cases that have synergy with this existing Jira issue pertaining graph partitioning: FLINK-1536 - Graph partitioning operators for Gelly *Problem.* While it would be negligible to write the results to disk and then read them back in a new job to be sent to the JobManager if they are small, this becomes prohibitive if there are several gigabytes of data to write/read and using a distributed storage (e.g. HDFS) is not an option. Even if there is a distributed storage available, as the number of sequential jobs increases, even the benefits of the secondary storage being distributed will diminish. *Existing alternatives.* I also considered, as a possibility, to compose the sequence of jobs in a single big job to submit to the JobManager, thus allowing reuse of results due to the natural forwarding of results to subsequent operators in dataflow programing. However, this becomes difficult due to two reasons: * The logic to connect the sequence of jobs may depend on factors external to Flink and not known at the start of the job composition. This also excludes limited iterative behavior like what is provided in {{BulkIteration/DeltaIteration;}} ** Composing a job with "too many" operators and inter-dependencies may lead to the Optimizer engaging an exponential optimization search space. This is particularly true for operators with multiple valid execution strategies, leading to a combinatorics problem. This leads to the Flink compiler _taking forever_ to even create a plan. I believe this is the current situation based on a reply I received from [~fhueske] last year. His reply was on the 7th of December 2017: Link: [http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/raw/%3CCAAdrtT0FprAbTEFmv3qsgy%2BBFEXtxBBxM_0VS%3DfDsMrzAqY9ew%40mail.gmail.com%3E] Mailing list thread title: "Re: How to perform efficient DataSet reuse between iterations" *Idea.* Perhaps the better way to describe this *CacheOperator* feature is the concept of "_job chaining_", where a new type of DataSink would receive data that will: - Be available to a subsequent job which somehow makes a reference to the DataSink of the previous job; - Have remained available (from the previous job execution) in the exact same TaskManagers in the cluster. Likely, the optimal memory distribution will be pretty similar between chained jobs - if the data was read from disk again between jobs, it would likely be distributed with the same (automatic or not) strategies, hence the same distribution would likely be of use to sequential jobs. *Design.* Potential conflicts with the current Flink cluster execution model: - The FlinkMiniCluster used with LocalEnvironment is destroyed when a job finishes in local mode, so it would be necessary to change local mode to keep a FlinkMiniCluster alive - what was the reasoning behind destroying it? Simplifying the implementation? - How would this look like in the API? I envisioned an example like this: {{DataSet> previousResult = callSomeFlinkDataflowOperator(); // The result of some previous computation.}} {{CacheOperator>> op = previousResult.cache();}} {{... // Other operations...}} {{environment.execute();}} {{... // The previous job has finished.}} {{DataSet> sorted = op.sort(0); // the previous DataSet, which resulted from callSomeFlinkDataflowOperator() int the previous Flink job, remained in memory.}} {{environment.execute(); // Trigger a different job whose data depends on the previous one.}} Besides adding appropriate classes to the Flink Java API, implementing this feature would require changing things so that: * JobManagers are aware that a completed job had cached operators - likely a new COMPLETED_AND_REUSABLE job state? * TaskManagers must keep references to the Flink memory management segments associated to the CacheOperator data; * CacheOperator must have a default number of usages and/or amount of time to be kept alive (I think both options should exist but the user may choose whether to use one or both); * Cluster coordination: should the JobManager be
[jira] [Updated] (FLINK-10867) Add a DataSet-based CacheOperator to reuse results between jobs
[ https://issues.apache.org/jira/browse/FLINK-10867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Miguel E. Coimbra updated FLINK-10867: -- Description: *Motivation.* There are job scenarios where Flink batch processing users may be interested in processing a large amount of data, outputting results to disk and then reusing the results for another type of computation in Flink again. This feature suggestion emerged from my work as a PhD researcher working on graph stream processing. [https://arxiv.org/abs/1810.02781] More specifically, in our use case this would be very useful to maintain an evolving graph while allowing for specific logic on challenges such as _when_ and _how_ to integrate updates in the graph and also how to represent it. Furthermore, it would also be an enabler for rich use-cases that have synergy with this existing Jira issue pertaining graph partitioning: FLINK-1536 - Graph partitioning operators for Gelly *Problem.* While it would be negligible to write the results to disk and then read them back in a new job to be sent to the JobManager if they are small, this becomes prohibitive if there are several gigabytes of data to write/read and using a distributed storage (e.g. HDFS) is not an option. Even if there is a distributed storage available, as the number of sequential jobs increases, even the benefits of the secondary storage being distributed will diminish. *Existing alternatives.* I also considered, as a possibility, to compose the sequence of jobs in a single big job to submit to the JobManager, thus allowing reuse of results due to the natural forwarding of results to subsequent operators in dataflow programing. However, this becomes difficult due to two reasons: * The logic to connect the sequence of jobs may depend on factors external to Flink and not known at the start of the job composition. This also excludes limited iterative behavior like what is provided in {{BulkIteration/DeltaIteration;}} ** Composing a job with "too many" operators and inter-dependencies may lead to the Optimizer engaging an exponential optimization search space. This is particularly true for operators with multiple valid execution strategies, leading to a combinatorics problem. This leads to the Flink compiler _taking forever_ to even create a plan. I believe this is the current situation based on a reply I received from [~fhueske] last year. His reply was on the 7th of December 2017: Link: [http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/raw/%3CCAAdrtT0FprAbTEFmv3qsgy%2BBFEXtxBBxM_0VS%3DfDsMrzAqY9ew%40mail.gmail.com%3E] Mailing list thread title: "Re: How to perform efficient DataSet reuse between iterations" *Idea.* Perhaps the better way to describe this *CachingOperator* feature is the concept of "_job chaining_", where a new type of DataSink would receive data that will: - Be available to a subsequent job which somehow makes a reference to the DataSink of the previous job; - Have remained available (from the previous job execution) in the exact same TaskManagers in the cluster. Likely, the optimal memory distribution will be pretty similar between chained jobs - if the data was read from disk again between jobs, it would likely be distributed with the same (automatic or not) strategies, hence the same distribution would likely be of use to sequential jobs. *Design.* Potential conflicts with the current Flink cluster execution model: - The FlinkMiniCluster used with LocalEnvironment is destroyed when a job finishes in local mode, so it would be necessary to change local mode to keep a FlinkMiniCluster alive - what was the reasoning behind destroying it? Simplifying the implementation? - How would this look like in the API? I envisioned an example like this: {{DataSet> previousResult = callSomeFlinkDataflowOperator(); // The result of some previous computation.}} {{CacheOperator>> op = previousResult.cache();}} {{... // Other operations...}} {{environment.execute();}} {{... // The previous job has finished.}} {{DataSet> sorted = op.sort(0); // the previous DataSet, which resulted from callSomeFlinkDataflowOperator() int the previous Flink job, remained in memory.}} {{environment.execute(); // Trigger a different job whose data depends on the previous one.}} Besides adding appropriate classes to the Flink Java API, implementing this feature would require changing things so that: * JobManagers are aware that a completed job had cached operators - likely a new COMPLETED_AND_REUSABLE job state? * TaskManagers must keep references to the Flink memory management segments associated to the CacheOperator data; * CacheOperator must have a default number of usages and/or amount of time to be kept alive (I think both options should exist but the user may choose whether to use one or both); * Cluster coordination: should the JobManager
[jira] [Commented] (FLINK-10874) Kafka 2.0 connector testMigrateFromAtLeastOnceToExactlyOnce failure
[ https://issues.apache.org/jira/browse/FLINK-10874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686361#comment-16686361 ] Piotr Nowojski commented on FLINK-10874: Google search for this error suggests that usually this error happens around broker restarting and re-electing the leaders. Maybe this is some kind of an after shock of restarting a broker from another test. Maybe this will be fixed by: https://issues.apache.org/jira/browse/FLINK-10838 > Kafka 2.0 connector testMigrateFromAtLeastOnceToExactlyOnce failure > --- > > Key: FLINK-10874 > URL: https://issues.apache.org/jira/browse/FLINK-10874 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.8.0 >Reporter: Piotr Nowojski >Priority: Critical > > https://api.travis-ci.org/v3/job/454449444/log.txt > {noformat} > Test > testMigrateFromAtLeastOnceToExactlyOnce(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase) > is running. > > 16:35:07,894 WARN > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer - Property > [transaction.timeout.ms] not specified. Setting it to 360 ms > 16:35:07,903 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer - Starting > FlinkKafkaInternalProducer (1/1) to produce into default topic > testMigrateFromAtLeastOnceToExactlyOnce > 16:35:08,785 ERROR > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase - > > Test > testMigrateFromAtLeastOnceToExactlyOnce(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase) > failed with: > java.lang.Exception: Could not complete snapshot 0 for operator MockTask > (1/1). > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:419) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshotWithLocalState(AbstractStreamOperatorTestHarness.java:505) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:497) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testRecoverWithChangeSemantics(FlinkKafkaProducerITCase.java:591) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testMigrateFromAtLeastOnceToExactlyOnce(FlinkKafkaProducerITCase.java:569) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) > at >
[jira] [Commented] (FLINK-10867) Add a DataSet-based CacheOperator to reuse results between jobs
[ https://issues.apache.org/jira/browse/FLINK-10867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686331#comment-16686331 ] ASF GitHub Bot commented on FLINK-10867: zentol opened a new pull request #7095: [FLINK-10867][metrics] Cache logical scopes separately for each reporter URL: https://github.com/apache/flink/pull/7095 ## What is the purpose of the change This PR fixes an issue in the metric system where the logical scope was not cached separately for each reporter. As a result other reporters might access logical scopes for which the wrong filter and/or delimiter was used. ## Brief change log * cache logicalScopeStrings separately each reporter * modify `FrontMetricGroup` to inject reporterIndex when computing logical scope ## Verifying this change This change added tests and can be verified as follows: * run `AbstractMetricGroupTest#testLogicalScopeCachingForMultipleReporters` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a DataSet-based CacheOperator to reuse results between jobs > --- > > Key: FLINK-10867 > URL: https://issues.apache.org/jira/browse/FLINK-10867 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, DataSet API, Local Runtime >Affects Versions: 1.8.0 >Reporter: Miguel E. Coimbra >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > *Motivation.* > There are job scenarios where Flink batch processing users may be interested > in processing a large amount of data, outputting results to disk and then > reusing the results for another type of computation in Flink again. > This feature suggestion emerged from my work as a PhD researcher working on > graph stream processing. > [https://arxiv.org/abs/1810.02781] > More specifically, in our use case this would be very useful to maintain an > evolving graph while allowing for specific logic on challenges such as _when_ > and _how_ to integrate updates in the graph and also how to represent it. > Furthermore, it would also be an enabler for rich use-cases that have synergy > with this existing Jira issue pertaining graph partitioning: > FLINK-1536 - Graph partitioning operators for Gelly > *Problem.* > While it would be negligible to write the results to disk and then read them > back in a new job to be sent to the JobManager if they are small, this > becomes prohibitive if there are several gigabytes of data to write/read and > using a distributed storage (e.g. HDFS) is not an option. > Even if there is a distributed storage available, as the number of sequential > jobs increases, even the benefits of the secondary storage being distributed > will diminish. > *Existing alternatives.* > I also considered, as a possibility, to compose the sequence of jobs in a > single big job to submit to the JobManager, thus allowing reuse of results > due to the natural forwarding of results to subsequent operators in dataflow > programing. > However, this becomes difficult due to two reasons: > * The logic to connect the sequence of jobs may depend on factors external > to Flink and not known at the start of the job composition. > This also excludes limited iterative behavior like what is provided in > {{BulkIteration/DeltaIteration;}} > * Composing a job with "too many" operators and inter-dependencies may lead > to the Optimizer engaging an exponential optimization search space. > This is particularly true for operators with multiple valid execution > strategies, leading to a combinatorics problem. > This leads to the Flink compiler _taking forever_ to even create a plan. > I believe this is the current situation based on a reply I received from > Fabian Hueske last year. > His reply was on the 7th of December 2017: > Link: > [http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/raw/%3CCAAdrtT0FprAbTEFmv3qsgy%2BBFEXtxBBxM_0VS%3DfDsMrzAqY9ew%40mail.gmail.com%3E] > Mailing list thread title: "Re: How to perform efficient DataSet reuse > between iterations" > > *Idea.* > Perhaps the better way to describe this *CachingOperator* feature is the > concept of "_job chaining_", where a new type of DataSink would receive data > that will: > - Be available to a subsequent job which somehow makes a reference to the > DataSink of the previous job; > - Have remained available (from the previous job execution) in the exact > same TaskManagers in the cluster. > Likely, the
[jira] [Commented] (FLINK-10867) Add a DataSet-based CacheOperator to reuse results between jobs
[ https://issues.apache.org/jira/browse/FLINK-10867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686346#comment-16686346 ] Chesnay Schepler commented on FLINK-10867: -- [~yanghua] Yes, thanks! It should point to FLINK-10857 instead. > Add a DataSet-based CacheOperator to reuse results between jobs > --- > > Key: FLINK-10867 > URL: https://issues.apache.org/jira/browse/FLINK-10867 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, DataSet API, Local Runtime >Affects Versions: 1.8.0 >Reporter: Miguel E. Coimbra >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > *Motivation.* > There are job scenarios where Flink batch processing users may be interested > in processing a large amount of data, outputting results to disk and then > reusing the results for another type of computation in Flink again. > This feature suggestion emerged from my work as a PhD researcher working on > graph stream processing. > [https://arxiv.org/abs/1810.02781] > More specifically, in our use case this would be very useful to maintain an > evolving graph while allowing for specific logic on challenges such as _when_ > and _how_ to integrate updates in the graph and also how to represent it. > Furthermore, it would also be an enabler for rich use-cases that have synergy > with this existing Jira issue pertaining graph partitioning: > FLINK-1536 - Graph partitioning operators for Gelly > *Problem.* > While it would be negligible to write the results to disk and then read them > back in a new job to be sent to the JobManager if they are small, this > becomes prohibitive if there are several gigabytes of data to write/read and > using a distributed storage (e.g. HDFS) is not an option. > Even if there is a distributed storage available, as the number of sequential > jobs increases, even the benefits of the secondary storage being distributed > will diminish. > *Existing alternatives.* > I also considered, as a possibility, to compose the sequence of jobs in a > single big job to submit to the JobManager, thus allowing reuse of results > due to the natural forwarding of results to subsequent operators in dataflow > programing. > However, this becomes difficult due to two reasons: > * The logic to connect the sequence of jobs may depend on factors external > to Flink and not known at the start of the job composition. > This also excludes limited iterative behavior like what is provided in > {{BulkIteration/DeltaIteration;}} > * Composing a job with "too many" operators and inter-dependencies may lead > to the Optimizer engaging an exponential optimization search space. > This is particularly true for operators with multiple valid execution > strategies, leading to a combinatorics problem. > This leads to the Flink compiler _taking forever_ to even create a plan. > I believe this is the current situation based on a reply I received from > Fabian Hueske last year. > His reply was on the 7th of December 2017: > Link: > [http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/raw/%3CCAAdrtT0FprAbTEFmv3qsgy%2BBFEXtxBBxM_0VS%3DfDsMrzAqY9ew%40mail.gmail.com%3E] > Mailing list thread title: "Re: How to perform efficient DataSet reuse > between iterations" > > *Idea.* > Perhaps the better way to describe this *CachingOperator* feature is the > concept of "_job chaining_", where a new type of DataSink would receive data > that will: > - Be available to a subsequent job which somehow makes a reference to the > DataSink of the previous job; > - Have remained available (from the previous job execution) in the exact > same TaskManagers in the cluster. > Likely, the optimal memory distribution will be pretty similar between > chained jobs - if the data was read from disk again between jobs, it would > likely be distributed with the same (automatic or not) strategies, hence the > same distribution would likely be of use to sequential jobs. > *Design.* > Potential conflicts with the current Flink cluster execution model: > - The FlinkMiniCluster used with LocalEnvironment is destroyed when a job > finishes in local mode, so it would be necessary to change local mode to keep > a FlinkMiniCluster alive - what was the reasoning behind destroying it? > Simplifying the implementation? > - How would this look like in the API? > I envisioned an example like this: > {{DataSet> previousResult = > callSomeFlinkDataflowOperator(); // The result of some previous computation.}} > {{CacheOperator>> op = previousResult.cache();}} > {{... // Other operations...}} > {{environment.execute();}} > {{... // The previous job has finished.}} > {{DataSet> sorted = op.sort(0); // the previous
[GitHub] twalthr commented on issue #7045: [hotfix] Update nightly master cron jobs
twalthr commented on issue #7045: [hotfix] Update nightly master cron jobs URL: https://github.com/apache/flink/pull/7045#issuecomment-438621835 Thanks @zentol. I will remove the kubernetes test and merge this. Once Travis gives green light. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10419) ClassNotFoundException while deserializing user exceptions from checkpointing
[ https://issues.apache.org/jira/browse/FLINK-10419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686356#comment-16686356 ] ASF GitHub Bot commented on FLINK-10419: tillrohrmann commented on a change in pull request #7093: [FLINK-10419] Using DeclineCheckpoint message class when invoking RPC declineCheckpoint URL: https://github.com/apache/flink/pull/7093#discussion_r233398912 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ## @@ -248,6 +256,75 @@ public static void teardownClass() { } } + @Test + public void testDeclineCheckpointInvocationWithUserException() throws Exception { + RpcService rpcService1 = null; + RpcService rpcService2 = null; + try { + final ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem(); + final ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem(); + + rpcService1 = new AkkaRpcService(actorSystem1, testingTimeout); + rpcService2 = new AkkaRpcService(actorSystem2, testingTimeout); + + final CompletableFuture declineCheckpointMessageFuture = new CompletableFuture<>(); + + final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build(); + final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration); + final JobMaster jobMaster = new JobMaster( + rpcService1, + jobMasterConfiguration, + jmResourceId, + jobGraph, + haServices, + DefaultSlotPoolFactory.fromConfiguration(configuration, rpcService1), + jobManagerSharedServices, + heartbeatServices, + blobServer, + UnregisteredJobManagerJobMetricGroupFactory.INSTANCE, + new NoOpOnCompletionActions(), + testingFatalErrorHandler, + JobMasterTest.class.getClassLoader()) { + @Override + public void declineCheckpoint(DeclineCheckpoint declineCheckpoint) { + declineCheckpointMessageFuture.complete(declineCheckpoint.getReason()); + } + }; + + jobMaster.start(jobMasterId, testingTimeout).get(); + + final String className = "UserException"; + final URLClassLoader userClassLoader = ClassLoaderUtils.compileAndLoadJava( + temporaryFolder.newFolder(), + className + ".java", + String.format("public class %s extends RuntimeException { public %s() {super(\"UserMessage\");} }", + className, + className)); + + Throwable userException = (Throwable) Class.forName(className, false, userClassLoader).newInstance(); + + CompletableFuture jobMasterGateway = + rpcService2.connect(jobMaster.getAddress(), jobMaster.getFencingToken(), JobMasterGateway.class); + + jobMasterGateway.thenAccept(gateway -> { + gateway.declineCheckpoint(new DeclineCheckpoint( Review comment: Could we actually instantiate a `RpcCheckpointResponder` instead of calling directly on the gateway. I think this is also part of the code which should be tested. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ClassNotFoundException while deserializing user exceptions from checkpointing > - > > Key: FLINK-10419 > URL: https://issues.apache.org/jira/browse/FLINK-10419 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.5.4, 1.6.0, 1.6.1, 1.7.0 >Reporter: Nico Kruber >Assignee: Dawid Wysakowicz >Priority: Major > Labels:
[GitHub] tillrohrmann commented on a change in pull request #7093: [FLINK-10419] Using DeclineCheckpoint message class when invoking RPC declineCheckpoint
tillrohrmann commented on a change in pull request #7093: [FLINK-10419] Using DeclineCheckpoint message class when invoking RPC declineCheckpoint URL: https://github.com/apache/flink/pull/7093#discussion_r233398912 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ## @@ -248,6 +256,75 @@ public static void teardownClass() { } } + @Test + public void testDeclineCheckpointInvocationWithUserException() throws Exception { + RpcService rpcService1 = null; + RpcService rpcService2 = null; + try { + final ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem(); + final ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem(); + + rpcService1 = new AkkaRpcService(actorSystem1, testingTimeout); + rpcService2 = new AkkaRpcService(actorSystem2, testingTimeout); + + final CompletableFuture declineCheckpointMessageFuture = new CompletableFuture<>(); + + final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build(); + final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration); + final JobMaster jobMaster = new JobMaster( + rpcService1, + jobMasterConfiguration, + jmResourceId, + jobGraph, + haServices, + DefaultSlotPoolFactory.fromConfiguration(configuration, rpcService1), + jobManagerSharedServices, + heartbeatServices, + blobServer, + UnregisteredJobManagerJobMetricGroupFactory.INSTANCE, + new NoOpOnCompletionActions(), + testingFatalErrorHandler, + JobMasterTest.class.getClassLoader()) { + @Override + public void declineCheckpoint(DeclineCheckpoint declineCheckpoint) { + declineCheckpointMessageFuture.complete(declineCheckpoint.getReason()); + } + }; + + jobMaster.start(jobMasterId, testingTimeout).get(); + + final String className = "UserException"; + final URLClassLoader userClassLoader = ClassLoaderUtils.compileAndLoadJava( + temporaryFolder.newFolder(), + className + ".java", + String.format("public class %s extends RuntimeException { public %s() {super(\"UserMessage\");} }", + className, + className)); + + Throwable userException = (Throwable) Class.forName(className, false, userClassLoader).newInstance(); + + CompletableFuture jobMasterGateway = + rpcService2.connect(jobMaster.getAddress(), jobMaster.getFencingToken(), JobMasterGateway.class); + + jobMasterGateway.thenAccept(gateway -> { + gateway.declineCheckpoint(new DeclineCheckpoint( Review comment: Could we actually instantiate a `RpcCheckpointResponder` instead of calling directly on the gateway. I think this is also part of the code which should be tested. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10419) ClassNotFoundException while deserializing user exceptions from checkpointing
[ https://issues.apache.org/jira/browse/FLINK-10419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686347#comment-16686347 ] ASF GitHub Bot commented on FLINK-10419: dawidwys closed pull request #7093: [FLINK-10419] Using DeclineCheckpoint message class when invoking RPC declineCheckpoint URL: https://github.com/apache/flink/pull/7093 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java index 22244f6cb8d..b8dc5545706 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.rpc.RpcGateway; public interface CheckpointCoordinatorGateway extends RpcGateway { @@ -31,9 +32,5 @@ void acknowledgeCheckpoint( final CheckpointMetrics checkpointMetrics, final TaskStateSnapshot subtaskState); - void declineCheckpoint( - JobID jobID, - ExecutionAttemptID executionAttemptID, - long checkpointId, - Throwable cause); + void declineCheckpoint(DeclineCheckpoint declineCheckpoint); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 5d2d363cf71..40a675aca31 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -690,13 +690,7 @@ public void acknowledgeCheckpoint( // TODO: This method needs a leader session ID @Override - public void declineCheckpoint( - final JobID jobID, - final ExecutionAttemptID executionAttemptID, - final long checkpointID, - final Throwable reason) { - final DeclineCheckpoint decline = new DeclineCheckpoint( - jobID, executionAttemptID, checkpointID, reason); + public void declineCheckpoint(DeclineCheckpoint decline) { final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); if (checkpointCoordinator != null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java index c90a8b5bbbc..2f656d09263 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java @@ -19,9 +19,13 @@ package org.apache.flink.runtime.rpc; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FutureUtils; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -87,6 +91,29 @@ public static void terminateRpcService(RpcService rpcService, Time timeout) thro rpcService.stopService().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); } + /** +* Shuts the given rpc services down and waits for their termination. +* +* @param rpcServices to shut down +* @param timeout for this operation +* @throws InterruptedException if the operation has been interrupted +* @throws ExecutionException if a problem occurred +* @throws TimeoutException if a timeout occurred +*/ + public static void terminateRpcServices( + Time timeout, + RpcService... rpcServices) throws InterruptedException, ExecutionException, TimeoutException { + final Collection> terminationFutures = new ArrayList<>(rpcServices.length); + + for (RpcService service : rpcServices) { + if (service != null) { + terminationFutures.add(service.stopService()); +
[GitHub] dawidwys closed pull request #7093: [FLINK-10419] Using DeclineCheckpoint message class when invoking RPC declineCheckpoint
dawidwys closed pull request #7093: [FLINK-10419] Using DeclineCheckpoint message class when invoking RPC declineCheckpoint URL: https://github.com/apache/flink/pull/7093 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java index 22244f6cb8d..b8dc5545706 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.rpc.RpcGateway; public interface CheckpointCoordinatorGateway extends RpcGateway { @@ -31,9 +32,5 @@ void acknowledgeCheckpoint( final CheckpointMetrics checkpointMetrics, final TaskStateSnapshot subtaskState); - void declineCheckpoint( - JobID jobID, - ExecutionAttemptID executionAttemptID, - long checkpointId, - Throwable cause); + void declineCheckpoint(DeclineCheckpoint declineCheckpoint); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 5d2d363cf71..40a675aca31 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -690,13 +690,7 @@ public void acknowledgeCheckpoint( // TODO: This method needs a leader session ID @Override - public void declineCheckpoint( - final JobID jobID, - final ExecutionAttemptID executionAttemptID, - final long checkpointID, - final Throwable reason) { - final DeclineCheckpoint decline = new DeclineCheckpoint( - jobID, executionAttemptID, checkpointID, reason); + public void declineCheckpoint(DeclineCheckpoint decline) { final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); if (checkpointCoordinator != null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java index c90a8b5bbbc..2f656d09263 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java @@ -19,9 +19,13 @@ package org.apache.flink.runtime.rpc; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FutureUtils; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -87,6 +91,29 @@ public static void terminateRpcService(RpcService rpcService, Time timeout) thro rpcService.stopService().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); } + /** +* Shuts the given rpc services down and waits for their termination. +* +* @param rpcServices to shut down +* @param timeout for this operation +* @throws InterruptedException if the operation has been interrupted +* @throws ExecutionException if a problem occurred +* @throws TimeoutException if a timeout occurred +*/ + public static void terminateRpcServices( + Time timeout, + RpcService... rpcServices) throws InterruptedException, ExecutionException, TimeoutException { + final Collection> terminationFutures = new ArrayList<>(rpcServices.length); + + for (RpcService service : rpcServices) { + if (service != null) { + terminationFutures.add(service.stopService()); + } + } + + FutureUtils.waitForAll(terminationFutures).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + } + // We don't want this class to be instantiable private
[jira] [Created] (FLINK-10878) State evolution E2E test failed on travis
Chesnay Schepler created FLINK-10878: Summary: State evolution E2E test failed on travis Key: FLINK-10878 URL: https://issues.apache.org/jira/browse/FLINK-10878 Project: Flink Issue Type: Bug Components: E2E Tests, State Backends, Checkpointing Affects Versions: 1.7.0 Reporter: Chesnay Schepler https://travis-ci.org/apache/flink/builds/454402843 {code} 2018-11-13 13:21:14,096 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 66 by task c375b648d817e2a1f25ab786094bfaac of job 81af79ed99a57065b3f0cbeb3cd42b2b. 2018-11-13 13:21:14,097 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 66 of job 81af79ed99a57065b3f0cbeb3cd42b2b. org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException: Task Source: Custom Source (1/1) was not running at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1166) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} {code} 2018-11-13 13:21:14,035 INFO org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder - Declining checkpoint 66 of job 81af79ed99a57065b3f0cbeb3cd42b2b. org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException: Task Source: Custom Source (1/1) was not running at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1166) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2018-11-13 13:21:14,351 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Initializing heap keyed state backend with stream factory. 2018-11-13 13:21:14,351 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Initializing heap keyed state backend from snapshot. 2018-11-13 13:21:15,212 INFO org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder - Declining checkpoint 66 of job 81af79ed99a57065b3f0cbeb3cd42b2b. org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException: Task received cancellation from one of its inputs at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyAbortOnCancellationBarrier(BarrierBuffer.java:404) at org.apache.flink.streaming.runtime.io.BarrierBuffer.processCancellationBarrier(BarrierBuffer.java:304) at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:204) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748) 2018-11-13 13:21:19,680 INFO org.apache.flink.runt {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10862) REST API does not show program descriptions of "simple" ProgramDescription
[ https://issues.apache.org/jira/browse/FLINK-10862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686343#comment-16686343 ] Stephan Ewen commented on FLINK-10862: -- The ProgramDescription is a bit of an artifact that we should probably deprecate. It is a bit of a gimmick to have this in Flink. Proper artifact management systems that any bigger infrastructure uses has their own metadata/descriptions/versioning. > REST API does not show program descriptions of "simple" ProgramDescription > -- > > Key: FLINK-10862 > URL: https://issues.apache.org/jira/browse/FLINK-10862 > Project: Flink > Issue Type: Bug > Components: REST >Affects Versions: 1.6.2 >Reporter: Flavio Pompermaier >Priority: Major > Labels: rest_api > > When uploading a jar containing a main class implementing ProgramDescription > interface, the REST API doesn't list its description. It works only if the > class implements Program (that I find pretty useless...why should I return > the plan?) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10867) Add a DataSet-based CacheOperator to reuse results between jobs
[ https://issues.apache.org/jira/browse/FLINK-10867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686345#comment-16686345 ] vinoyang commented on FLINK-10867: -- [~Zentol] Maybe the PR you submitted should not bind to this issue? > Add a DataSet-based CacheOperator to reuse results between jobs > --- > > Key: FLINK-10867 > URL: https://issues.apache.org/jira/browse/FLINK-10867 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, DataSet API, Local Runtime >Affects Versions: 1.8.0 >Reporter: Miguel E. Coimbra >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > *Motivation.* > There are job scenarios where Flink batch processing users may be interested > in processing a large amount of data, outputting results to disk and then > reusing the results for another type of computation in Flink again. > This feature suggestion emerged from my work as a PhD researcher working on > graph stream processing. > [https://arxiv.org/abs/1810.02781] > More specifically, in our use case this would be very useful to maintain an > evolving graph while allowing for specific logic on challenges such as _when_ > and _how_ to integrate updates in the graph and also how to represent it. > Furthermore, it would also be an enabler for rich use-cases that have synergy > with this existing Jira issue pertaining graph partitioning: > FLINK-1536 - Graph partitioning operators for Gelly > *Problem.* > While it would be negligible to write the results to disk and then read them > back in a new job to be sent to the JobManager if they are small, this > becomes prohibitive if there are several gigabytes of data to write/read and > using a distributed storage (e.g. HDFS) is not an option. > Even if there is a distributed storage available, as the number of sequential > jobs increases, even the benefits of the secondary storage being distributed > will diminish. > *Existing alternatives.* > I also considered, as a possibility, to compose the sequence of jobs in a > single big job to submit to the JobManager, thus allowing reuse of results > due to the natural forwarding of results to subsequent operators in dataflow > programing. > However, this becomes difficult due to two reasons: > * The logic to connect the sequence of jobs may depend on factors external > to Flink and not known at the start of the job composition. > This also excludes limited iterative behavior like what is provided in > {{BulkIteration/DeltaIteration;}} > * Composing a job with "too many" operators and inter-dependencies may lead > to the Optimizer engaging an exponential optimization search space. > This is particularly true for operators with multiple valid execution > strategies, leading to a combinatorics problem. > This leads to the Flink compiler _taking forever_ to even create a plan. > I believe this is the current situation based on a reply I received from > Fabian Hueske last year. > His reply was on the 7th of December 2017: > Link: > [http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/raw/%3CCAAdrtT0FprAbTEFmv3qsgy%2BBFEXtxBBxM_0VS%3DfDsMrzAqY9ew%40mail.gmail.com%3E] > Mailing list thread title: "Re: How to perform efficient DataSet reuse > between iterations" > > *Idea.* > Perhaps the better way to describe this *CachingOperator* feature is the > concept of "_job chaining_", where a new type of DataSink would receive data > that will: > - Be available to a subsequent job which somehow makes a reference to the > DataSink of the previous job; > - Have remained available (from the previous job execution) in the exact > same TaskManagers in the cluster. > Likely, the optimal memory distribution will be pretty similar between > chained jobs - if the data was read from disk again between jobs, it would > likely be distributed with the same (automatic or not) strategies, hence the > same distribution would likely be of use to sequential jobs. > *Design.* > Potential conflicts with the current Flink cluster execution model: > - The FlinkMiniCluster used with LocalEnvironment is destroyed when a job > finishes in local mode, so it would be necessary to change local mode to keep > a FlinkMiniCluster alive - what was the reasoning behind destroying it? > Simplifying the implementation? > - How would this look like in the API? > I envisioned an example like this: > {{DataSet> previousResult = > callSomeFlinkDataflowOperator(); // The result of some previous computation.}} > {{CacheOperator>> op = previousResult.cache();}} > {{... // Other operations...}} > {{environment.execute();}} > {{... // The previous job has finished.}} > {{DataSet> sorted = op.sort(0); // the previous DataSet, >
[jira] [Commented] (FLINK-10634) End-to-end test: Metrics accessible via REST API
[ https://issues.apache.org/jira/browse/FLINK-10634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686342#comment-16686342 ] ASF GitHub Bot commented on FLINK-10634: zentol commented on issue #7056: [FLINK-10634][metrics][rest] Add metrics availability e2e test URL: https://github.com/apache/flink/pull/7056#issuecomment-438618845 > Also it is worth adding a line in the README for the Prometheus and this test that states that you can run them locally with mvn clean verify -De2e-metrics -DdistDir=YOUR_DIR I don't really want to add these instructions right now since they are quite disconnected from the actual test. How the profile is called is determined in the pom, what the distDir property is called in the `FlinkDistribution`. I'd like to avoid them becoming out-dated, i.e. simply wrong. I have some ideas on how to better document how to run them, so that if the test is not executed you get a message saying what was missing. E.g. "Test was skipped since property 'e2e-metrics' was not specified." This way you would simply _try_ running them and get the infos on how to do it right there. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > End-to-end test: Metrics accessible via REST API > > > Key: FLINK-10634 > URL: https://issues.apache.org/jira/browse/FLINK-10634 > Project: Flink > Issue Type: Sub-task > Components: E2E Tests, Metrics, REST >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > > Verify that Flink's metrics can be accessed via the REST API. -- This message was sent by Atlassian JIRA (v7.6.3#76005)