[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator
[ https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15692468#comment-15692468 ] ASF GitHub Bot commented on FLINK-4541: --- Github user AlexanderShoshin commented on the issue: https://github.com/apache/flink/pull/2811 Hi, Fabian. I've made all the corrections. One thing I didn't understand - are we going to allow the case of !joinCondition.isEqui in DataSetSingleRowCrossRule? I also added several tests to verify that CrossJoin works for both left and right single row input. > Support for SQL NOT IN operator > --- > > Key: FLINK-4541 > URL: https://issues.apache.org/jira/browse/FLINK-4541 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Alexander Shoshin > > This should work: > {code} > def main(args: Array[String]): Unit = { > // set up execution environment > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", > 1)) > // register the DataSet as table "WordCount" > tEnv.registerDataSet("WordCount", input, 'word, 'frequency) > tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, > 'frequency).select('word).filter('word !== "hello")) > // run a SQL query on the Table and retrieve the result as a new Table > val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE > word NOT IN (SELECT word FROM WordCount2) GROUP BY word") > table.toDataSet[WC].print() > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2811: [FLINK-4541] Support for SQL NOT IN operator
Github user AlexanderShoshin commented on the issue: https://github.com/apache/flink/pull/2811 Hi, Fabian. I've made all the corrections. One thing I didn't understand - are we going to allow the case of !joinCondition.isEqui in DataSetSingleRowCrossRule? I also added several tests to verify that CrossJoin works for both left and right single row input. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #:
Github user aljoscha commented on the pull request: https://github.com/apache/flink/commit/da53953e5366185567515d209fcc85d4006f7e8a#commitcomment-19948322 In flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java: In flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java on line 40: I think this won't work because now we don't have all the special serialisation logic that `SimpleStateDescriptor` has. What you could do is override `getDefaultValue()` and return `null` there. Then, in `getInitialValue()` you return the default value and you don't need the extra field. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-5000) Rename Methods in ManagedInitializationContext
[ https://issues.apache.org/jira/browse/FLINK-5000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-5000. --- Resolution: Fixed Fix Version/s: 1.2.0 Implemented in 4656350fc33d42ff96ad6d5e836e62172b4b0de6 > Rename Methods in ManagedInitializationContext > -- > > Key: FLINK-5000 > URL: https://issues.apache.org/jira/browse/FLINK-5000 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.2.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.2.0 > > > We should rename {{getManagedOperatorStateStore()}} to > {{getOperatorStateStore()}} and {{getManagedKeyedStateStore()}} to > {{getKeyedStateStore()}}. There are no unmanaged stores and having that extra > word there seems a bit confusing, plus it makes the names longer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r89431529 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer{ + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator operator; + + /** +* Keep all {@code AsyncCollector} and their input {@link StreamElement} +*/ + private final Map , StreamElement> queue = new LinkedHashMap<>(); + /** +* For the AsyncWaitOperator chained with StreamSource, the checkpoint thread may get the +* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} while {@link AsyncCollectorBuffer#queue} +* is full since main thread waits on this lock. The StreamElement in +* {@link AsyncWaitOperator#processElement(StreamRecord)} should be treated as a part of all StreamElements +* in its queue. It will be kept in the operator state while snapshotting. +*/ + private StreamElement extraStreamElement; + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private final Output output; + private final TimestampedCollector timestampedCollector; + + /** +* Checkpoint lock from {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} +*/ + private final Object lock; + + private final Emitter emitter; + private final Thread emitThread; + + private IOException error; + + public AsyncCollectorBuffer( + int bufferSize, + AsyncDataStream.OutputMode mode, + Output output, + TimestampedCollector collector, + Object lock, + AsyncWaitOperator operator) { + Preconditions.checkArgument(bufferSize > 0, "Future buffer size should be greater than 0."); + Preconditions.checkNotNull(output, "Output should not be NULL."); + Preconditions.checkNotNull(collector, "TimestampedCollector should not be NULL."); + Preconditions.checkNotNull(lock, "Checkpoint lock should not be NULL."); + Preconditions.checkNotNull(operator, "Reference to AsyncWaitOperator should not be NULL."); + + this.bufferSize = bufferSize; + this.mode = mode; + this.output = output; + this.timestampedCollector = collector;
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15692081#comment-15692081 ] ASF GitHub Bot commented on FLINK-4391: --- Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r89431529 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer{ + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator operator; + + /** +* Keep all {@code AsyncCollector} and their input {@link StreamElement} +*/ + private final Map , StreamElement> queue = new LinkedHashMap<>(); + /** +* For the AsyncWaitOperator chained with StreamSource, the checkpoint thread may get the +* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} while {@link AsyncCollectorBuffer#queue} +* is full since main thread waits on this lock. The StreamElement in +* {@link AsyncWaitOperator#processElement(StreamRecord)} should be treated as a part of all StreamElements +* in its queue. It will be kept in the operator state while snapshotting. +*/ + private StreamElement extraStreamElement; + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private final Output output; + private final TimestampedCollector timestampedCollector; + + /** +* Checkpoint lock from {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} +*/ + private final Object lock; + + private final Emitter emitter; + private final Thread emitThread; + + private IOException error; + + public AsyncCollectorBuffer( + int bufferSize, + AsyncDataStream.OutputMode mode, + Output output, + TimestampedCollector collector, + Object lock, + AsyncWaitOperator operator) { + Preconditions.checkArgument(bufferSize > 0, "Future buffer size should be greater than 0."); + Preconditions.checkNotNull(output, "Output should not be NULL."); + Preconditions.checkNotNull(collector, "TimestampedCollector should not be NULL."); + Preconditions.checkNotNull(lock, "Checkpoint lock should not be NULL."); +
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15691892#comment-15691892 ] ASF GitHub Bot commented on FLINK-4391: --- Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r89424283 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollector.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.List; + +/** + * {@link AsyncCollector} collects data / error in user codes while processing async i/o. + * + * @param Input type + * @param Output type + */ +@Internal +public class AsyncCollector{ + private List result; + private Throwable error; + + private boolean isDone = false; + + private final AsyncCollectorBuffer buffer; + + public AsyncCollector(AsyncCollectorBuffer buffer) { + Preconditions.checkNotNull(buffer, "Reference to AsyncCollectorBuffer should not be null"); + + this.buffer = buffer; + } + + public AsyncCollector(AsyncCollectorBuffer buffer, boolean isDone) { + this(buffer); + this.isDone = isDone; --- End diff -- Actually, they work as a stub here. I think we can optimize those `AsyncCollector`s for `Watermark` and `LatencyMarker` out while optimizing the code in `nothingToDo()`. > Provide support for asynchronous operations over streams > > > Key: FLINK-4391 > URL: https://issues.apache.org/jira/browse/FLINK-4391 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Jamie Grier >Assignee: david.wang > > Many Flink users need to do asynchronous processing driven by data from a > DataStream. The classic example would be joining against an external > database in order to enrich a stream with extra information. > It would be nice to add general support for this type of operation in the > Flink API. Ideally this could simply take the form of a new operator that > manages async operations, keeps so many of them in flight, and then emits > results to downstream operators as the async operations complete. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r89424283 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollector.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.List; + +/** + * {@link AsyncCollector} collects data / error in user codes while processing async i/o. + * + * @param Input type + * @param Output type + */ +@Internal +public class AsyncCollector{ + private List result; + private Throwable error; + + private boolean isDone = false; + + private final AsyncCollectorBuffer buffer; + + public AsyncCollector(AsyncCollectorBuffer buffer) { + Preconditions.checkNotNull(buffer, "Reference to AsyncCollectorBuffer should not be null"); + + this.buffer = buffer; + } + + public AsyncCollector(AsyncCollectorBuffer buffer, boolean isDone) { + this(buffer); + this.isDone = isDone; --- End diff -- Actually, they work as a stub here. I think we can optimize those `AsyncCollector`s for `Watermark` and `LatencyMarker` out while optimizing the code in `nothingToDo()`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2646) Rich functions should provide a method "closeAfterFailure()"
[ https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15691858#comment-15691858 ] Liang Chen commented on FLINK-2646: --- Please go ahead, thanks. Sorry, i am a little busy recently. > Rich functions should provide a method "closeAfterFailure()" > > > Key: FLINK-2646 > URL: https://issues.apache.org/jira/browse/FLINK-2646 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Liang Chen > Fix For: 1.0.0 > > > Right now, the {{close()}} method of rich functions is invoked in case of > proper completion, and in case of canceling in case of error (to allow for > cleanup). > In certain cases, the user function needs to know why it is closed, whether > the task completed in a regular fashion, or was canceled/failed. > I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By > default, this method calls {{close()}}. The runtime is the changed to call > {{close()}} as part of the regular execution and {{closeAfterFailure()}} in > case of an irregular exit. > Because by default all cases call {{close()}} the change would not be API > breaking. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-1536) Graph partitioning operators for Gelly
[ https://issues.apache.org/jira/browse/FLINK-1536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15691610#comment-15691610 ] Ivan Mushketyk edited comment on FLINK-1536 at 11/23/16 11:11 PM: -- Oh, sorry, I didn't mean to type "bipartiate" word there :) Somehow this word ingrained in my muscle memory after working on the bipartite graph task :) Anyway. I'll do some background research and come up with a design document. Regarding a FLIP. Are there any guidlines about how (or even in what case) to create one? was (Author: ivan.mushketyk): Oh, sorry, I didn't mean to type "bipartiate" word there :) Somehow this word ingrained in my muscle memory after working on bipartite graph task :) Anyway. I'll do some background research and come up with a design document. Regarding a FLIP. Are there any guidlines about how (or even when) to create one? > Graph partitioning operators for Gelly > -- > > Key: FLINK-1536 > URL: https://issues.apache.org/jira/browse/FLINK-1536 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: Vasia Kalavri >Priority: Minor > > Smart graph partitioning can significantly improve the performance and > scalability of graph analysis applications. Depending on the computation > pattern, a graph partitioning algorithm divides the graph into (maybe > overlapping) subgraphs, optimizing some objective. For example, if > communication is performed across graph edges, one might want to minimize the > edges that cross from one partition to another. > The problem of graph partitioning is a well studied problem and several > algorithms have been proposed in the literature. The goal of this project > would be to choose a few existing partitioning techniques and implement the > corresponding graph partitioning operators for Gelly. > Some related literature can be found [here| > http://www.citeulike.org/user/vasiakalavri/tag/graph-partitioning]. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1536) Graph partitioning operators for Gelly
[ https://issues.apache.org/jira/browse/FLINK-1536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15691610#comment-15691610 ] Ivan Mushketyk commented on FLINK-1536: --- Oh, sorry, I didn't mean to type "bipartiate" word there :) Somehow this word ingrained in my muscle memory after working on bipartite graph task :) Anyway. I'll do some background research and come up with a design document. Regarding a FLIP. Are there any guidlines about how (or even when) to create one? > Graph partitioning operators for Gelly > -- > > Key: FLINK-1536 > URL: https://issues.apache.org/jira/browse/FLINK-1536 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: Vasia Kalavri >Priority: Minor > > Smart graph partitioning can significantly improve the performance and > scalability of graph analysis applications. Depending on the computation > pattern, a graph partitioning algorithm divides the graph into (maybe > overlapping) subgraphs, optimizing some objective. For example, if > communication is performed across graph edges, one might want to minimize the > edges that cross from one partition to another. > The problem of graph partitioning is a well studied problem and several > algorithms have been proposed in the literature. The goal of this project > would be to choose a few existing partitioning techniques and implement the > corresponding graph partitioning operators for Gelly. > Some related literature can be found [here| > http://www.citeulike.org/user/vasiakalavri/tag/graph-partitioning]. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5146) Improved resource cleanup in RocksDB keyed state backend
[ https://issues.apache.org/jira/browse/FLINK-5146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter updated FLINK-5146: -- Priority: Blocker (was: Major) > Improved resource cleanup in RocksDB keyed state backend > > > Key: FLINK-5146 > URL: https://issues.apache.org/jira/browse/FLINK-5146 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Blocker > > Currently, the resources such as taken snapshots or iterators are not always > cleaned up in the RocksDB state backend. In particular, not starting the > runnable future will leave taken snapshots unreleased. > We should improve the releases of all resources allocated through the RocksDB > JNI bridge. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4997) Extending Window Function Metadata
[ https://issues.apache.org/jira/browse/FLINK-4997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15691503#comment-15691503 ] ASF GitHub Bot commented on FLINK-4997: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2756 The changes look very good! I'll rebase on top of the recent changes that introduced new `fold()`/`reduce()` and will also reshuffle/squash the commits a bit to make the history clearer. I hope that's alright. I'll also change the name of the `process()` method in `InternalWindowFunction` back to `apply()` so that we have less changes and the core of the changes is more obvious. You could change that name in a future PR if you'd like but it's purely internal. I hope this is also alright. A word on commit titles: we normally use imperative mood, i.e. "Extend WindowFunction Metadata" instead of "Extending ..." or "Extends ...". Initially, this might seem strange but you can think of a commit as being the command to do something. This is a good read on commit messages: http://chris.beams.io/posts/git-commit/ I'm not writing so much to discourage you, I just like good commit messages. And I hope you'll keep up the good work in the code. Btw, do you also want to do the same changes for `AllWindowedStream`? > Extending Window Function Metadata > -- > > Key: FLINK-4997 > URL: https://issues.apache.org/jira/browse/FLINK-4997 > Project: Flink > Issue Type: New Feature > Components: DataStream API, Streaming, Windowing Operators >Reporter: Ventura Del Monte >Assignee: Ventura Del Monte > Fix For: 1.2.0 > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2756: [FLINK-4997] Extending Window Function Metadata
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2756 The changes look very good! ð I'll rebase on top of the recent changes that introduced new `fold()`/`reduce()` and will also reshuffle/squash the commits a bit to make the history clearer. I hope that's alright. I'll also change the name of the `process()` method in `InternalWindowFunction` back to `apply()` so that we have less changes and the core of the changes is more obvious. You could change that name in a future PR if you'd like but it's purely internal. I hope this is also alright. A word on commit titles: we normally use imperative mood, i.e. "Extend WindowFunction Metadata" instead of "Extending ..." or "Extends ...". Initially, this might seem strange but you can think of a commit as being the command to do something. This is a good read on commit messages: http://chris.beams.io/posts/git-commit/ I'm not writing so much to discourage you, I just like good commit messages. And I hope you'll keep up the good work in the code. ð Btw, do you also want to do the same changes for `AllWindowedStream`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-4676) Merge flink-batch-connectors and flink-streaming-connectors modules
[ https://issues.apache.org/jira/browse/FLINK-4676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske reassigned FLINK-4676: Assignee: Fabian Hueske > Merge flink-batch-connectors and flink-streaming-connectors modules > --- > > Key: FLINK-4676 > URL: https://issues.apache.org/jira/browse/FLINK-4676 > Project: Flink > Issue Type: Task > Components: Batch Connectors and Input/Output Formats, Build System, > Streaming Connectors >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Minor > Fix For: 1.2.0 > > > We have two separate Maven modules for batch and streaming connectors > (flink-batch-connectors and flink-streaming-connectors) that contain modules > for the individual external systems and storage formats such as HBase, > Cassandra, Avro, Elasticsearch, etc. > Some of these systems can be used in streaming as well as batch jobs as for > instance HBase, Cassandra, and Elasticsearch. > However, due to the separate main modules for streaming and batch connectors, > we currently need to decide where to put a connector. > For example, the flink-connector-cassandra module is located in > flink-streaming-connectors but includes a CassandraInputFormat and > CassandraOutputFormat (i.e., a batch source and sink). > This issue is about merging flink-batch-connectors and > flink-streaming-connectors into a joint flink-connectors module. > Names of moved modules should not be changed (although this leads to an > inconsistent naming scheme: flink-connector-cassandra vs. flink-hbase) to > keep the change of code structure transparent to users. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-2662) CompilerException: "Bug: Plan generation for Unions picked a ship strategy between binary plan operators."
[ https://issues.apache.org/jira/browse/FLINK-2662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-2662. Resolution: Fixed Fix Version/s: (was: 1.1.3) 1.1.4 Fixed for 1.1.4 with efbd293afe4348b0f199e2c66a990ae6880edcef Fixed for 1.2.0 with 7d91b9ec71c9b711e04a91f847f5c85d3f561da6 > CompilerException: "Bug: Plan generation for Unions picked a ship strategy > between binary plan operators." > -- > > Key: FLINK-2662 > URL: https://issues.apache.org/jira/browse/FLINK-2662 > Project: Flink > Issue Type: Bug > Components: Optimizer >Affects Versions: 0.9.1, 0.10.0 >Reporter: Gabor Gevay >Assignee: Fabian Hueske >Priority: Critical > Fix For: 1.2.0, 1.1.4, 1.0.0 > > Attachments: Bug.java, FlinkBug.scala > > > I have a Flink program which throws the exception in the jira title. Full > text: > Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: > Plan generation for Unions picked a ship strategy between binary plan > operators. > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) > at > org.apache.flink.optimizer.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:194) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:49) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:41) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:162) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:520) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402) > at > org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:202) > at > org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:63) > at malom.Solver.main(Solver.java:66) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) > The execution plan: > http://compalg.inf.elte.hu/~ggevay/flink/plan_3_4_0_0_without_verif.txt > (I obtained this by commenting out the line that throws the exception) > The code is here: > https://github.com/ggevay/flink/tree/plan-generation-bug > The class to run is "Solver". It needs a command line argument, which is a > directory where it would write output. (On first run, it generates some > lookuptables for a few minutes, which are then placed to /tmp/movegen) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-5143) Add EXISTS to list of supported operators
[ https://issues.apache.org/jira/browse/FLINK-5143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-5143. Resolution: Fixed Fix Version/s: 1.2.0 Fixed with 06d252e89b58c5947b331cf24a552d11ff8767e8 > Add EXISTS to list of supported operators > - > > Key: FLINK-5143 > URL: https://issues.apache.org/jira/browse/FLINK-5143 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > Fix For: 1.2.0 > > > EXISTS is supported in certain cases. We should add it so that e.g. TPC-H > query 4 runs properly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-4937. Resolution: Fixed Fix Version/s: 1.2.0 Fixed with 74e0971a5511c511feb94bee7a0ce39eb9951b62 > Add incremental group window aggregation for streaming Table API > > > Key: FLINK-4937 > URL: https://issues.apache.org/jira/browse/FLINK-4937 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: sunjincheng > Fix For: 1.2.0 > > > Group-window aggregates for streaming tables are currently not done in an > incremental fashion. This means that the window collects all records and > performs the aggregation when the window is closed instead of eagerly > updating a partial aggregate for every added record. Since records are > buffered, non-incremental aggregation requires more storage space than > incremental aggregation. > The DataStream API which is used under the hood of the streaming Table API > features [incremental > aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation] > using a {{ReduceFunction}}. > We should add support for incremental aggregation in group-windows. > This is a follow-up task of FLINK-4691. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2792 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2662) CompilerException: "Bug: Plan generation for Unions picked a ship strategy between binary plan operators."
[ https://issues.apache.org/jira/browse/FLINK-2662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15691243#comment-15691243 ] ASF GitHub Bot commented on FLINK-2662: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2848 > CompilerException: "Bug: Plan generation for Unions picked a ship strategy > between binary plan operators." > -- > > Key: FLINK-2662 > URL: https://issues.apache.org/jira/browse/FLINK-2662 > Project: Flink > Issue Type: Bug > Components: Optimizer >Affects Versions: 0.9.1, 0.10.0 >Reporter: Gabor Gevay >Assignee: Fabian Hueske >Priority: Critical > Fix For: 1.0.0, 1.2.0, 1.1.3 > > Attachments: Bug.java, FlinkBug.scala > > > I have a Flink program which throws the exception in the jira title. Full > text: > Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: > Plan generation for Unions picked a ship strategy between binary plan > operators. > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) > at > org.apache.flink.optimizer.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:194) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:49) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:41) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:162) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:520) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402) > at > org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:202) > at > org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:63) > at malom.Solver.main(Solver.java:66) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) > The execution plan: > http://compalg.inf.elte.hu/~ggevay/flink/plan_3_4_0_0_without_verif.txt > (I obtained this by commenting out the line that throws the exception) > The code is here: > https://github.com/ggevay/flink/tree/plan-generation-bug > The class to run is "Solver". It needs a command line argument, which is a > directory where it would write output. (On first run, it generates some > lookuptables for a few minutes, which are then placed to /tmp/movegen) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2848: [FLINK-2662] [optimizer] Fix computation of global...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2848 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15691244#comment-15691244 ] ASF GitHub Bot commented on FLINK-4937: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2792 > Add incremental group window aggregation for streaming Table API > > > Key: FLINK-4937 > URL: https://issues.apache.org/jira/browse/FLINK-4937 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Group-window aggregates for streaming tables are currently not done in an > incremental fashion. This means that the window collects all records and > performs the aggregation when the window is closed instead of eagerly > updating a partial aggregate for every added record. Since records are > buffered, non-incremental aggregation requires more storage space than > incremental aggregation. > The DataStream API which is used under the hood of the streaming Table API > features [incremental > aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation] > using a {{ReduceFunction}}. > We should add support for incremental aggregation in group-windows. > This is a follow-up task of FLINK-4691. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2853: [FLINK-5143] [table] Add EXISTS to list of support...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2853 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5143) Add EXISTS to list of supported operators
[ https://issues.apache.org/jira/browse/FLINK-5143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15691245#comment-15691245 ] ASF GitHub Bot commented on FLINK-5143: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2853 > Add EXISTS to list of supported operators > - > > Key: FLINK-5143 > URL: https://issues.apache.org/jira/browse/FLINK-5143 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > EXISTS is supported in certain cases. We should add it so that e.g. TPC-H > query 4 runs properly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2755: [hotfix][docs] Stream joins don't support tuple po...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2755 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2819: [FLINK-4961] [ml] SGD for Matrix Factorization (WIP)
Github user gaborhermann commented on the issue: https://github.com/apache/flink/pull/2819 Hello Theodore, Thanks for taking a look at this PR! 1. No problem. Some performance evaluations might help us in this case too. I don't believe it should have much effect on the performance anyway as a 3-way join is only used outside the iteration. 2. I really like the idea of doing a performance evaluation! I'm not exactly sure how to do this with a `join` instead of a `coGroup`, so let me sketch an implementation before elaborating on the pros/cons. (It was more straightforward to use `coGroup`.) 3. The benefit of using more blocks is to use less memory, just as in the ALS algorithm, with the disadvantage of using more network. However, more blocks here means much more network and bookkeeping compared to ALS, because there are more Flink iterations. I've investigated this a bit more and found that the main "bottleneck" is the maximum size of the sort-buffer, which was around 100 MB with around 40 GB TaskManager memory, and large matrix blocks do not fit in. So even given enough memory, we cannot use large blocks. Unfortunately we cannot configure the maximum size of the sort-buffer, and I would not like to change the underlying code in the core or runtime, but I tried a workaround which might work (splitting the factor blocks to subblocks). I'll just need to run some measurements. If this workaround turns out fine, then it should be okay to go on with this and give instructions in the docs. 4. Okay, I agree. I hope we could generalize this non-overlapping blocking to other optimization problems :) I'll take a look at the paper you linked! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4961) SGD for Matrix Factorization
[ https://issues.apache.org/jira/browse/FLINK-4961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15691024#comment-15691024 ] ASF GitHub Bot commented on FLINK-4961: --- Github user gaborhermann commented on the issue: https://github.com/apache/flink/pull/2819 Hello Theodore, Thanks for taking a look at this PR! 1. No problem. Some performance evaluations might help us in this case too. I don't believe it should have much effect on the performance anyway as a 3-way join is only used outside the iteration. 2. I really like the idea of doing a performance evaluation! I'm not exactly sure how to do this with a `join` instead of a `coGroup`, so let me sketch an implementation before elaborating on the pros/cons. (It was more straightforward to use `coGroup`.) 3. The benefit of using more blocks is to use less memory, just as in the ALS algorithm, with the disadvantage of using more network. However, more blocks here means much more network and bookkeeping compared to ALS, because there are more Flink iterations. I've investigated this a bit more and found that the main "bottleneck" is the maximum size of the sort-buffer, which was around 100 MB with around 40 GB TaskManager memory, and large matrix blocks do not fit in. So even given enough memory, we cannot use large blocks. Unfortunately we cannot configure the maximum size of the sort-buffer, and I would not like to change the underlying code in the core or runtime, but I tried a workaround which might work (splitting the factor blocks to subblocks). I'll just need to run some measurements. If this workaround turns out fine, then it should be okay to go on with this and give instructions in the docs. 4. Okay, I agree. I hope we could generalize this non-overlapping blocking to other optimization problems :) I'll take a look at the paper you linked! > SGD for Matrix Factorization > > > Key: FLINK-4961 > URL: https://issues.apache.org/jira/browse/FLINK-4961 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Gábor Hermann >Assignee: Gábor Hermann > > We have started an implementation of distributed stochastic gradient descent > for matrix factorization based on Gemulla et al. [1]. > The main problem with distributed SGD in general is the conflicting updates > of the model variable. In case of matrix factorization we can avoid > conflicting updates by carefully deciding in each iteration step which blocks > of the rating matrix we should use to update the corresponding blocks of the > user and item matrices (see Figure 1. in paper). > Although a general SGD solver might seem relevant for this issue, we can do > much better in the special case of matrix factorization. E.g. in case of a > linear regression model, the model is broadcasted in every iteration. As the > model is typically small in that case, we can only avoid conflicts by having > a "global" model. Based on this, the general SGD solver is a different issue. > To give more details, the algorithm works as follows. > We randomly create user and item vectors, then randomly partition them into > {{k}} user and {{k}} item blocks. Based on these factor blocks we partition > the rating matrix to {{k * k}} blocks correspondingly. > In one iteration step we choose {{k}} non-conflicting rating blocks, i.e. we > should not choose two rating blocks simultaneously with the same user or item > block. This is done by assigning a rating block ID to every user and item > block. We match the user, item, and rating blocks by the current rating block > ID, and update the user and item factors by the ratings locally. We also > update the rating block ID for the factor blocks, thus in the next iteration > we use other rating blocks to update the factors. > In {{k}} iteration we sweep through the whole rating matrix of {{k * k}} > blocks (so instead of {{numberOfIterationSteps}} iterations we should do {{k > * numberOfIterationSteps}} iterations). > [1] [http://people.mpi-inf.mpg.de/~rgemulla/publications/gemulla11dsgd.pdf] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4895) Drop support for Hadoop 1
[ https://issues.apache.org/jira/browse/FLINK-4895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15691019#comment-15691019 ] ASF GitHub Bot commented on FLINK-4895: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2850 Good catch. I removed the reflection magic from the class. > Drop support for Hadoop 1 > - > > Key: FLINK-4895 > URL: https://issues.apache.org/jira/browse/FLINK-4895 > Project: Flink > Issue Type: Task > Components: Build System >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Robert Metzger > > As per this mailing list discussion: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-Drop-Hadoop-1-support-with-Flink-1-2-td9530.html > the community agreed to drop support for Hadoop 1. > The task includes > - removing the hadoop-1 / hadoop-2 build profiles, > - removing the scripts for generating hadoop-x poms > - updating the release script > - updating the nightly build script > - updating the travis configuration file > - updating the documentation > - updating the website -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2850: [FLINK-4895] Drop Hadoop1 support
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2850 Good catch. I removed the reflection magic from the class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4712) Implementing ranking predictions for ALS
[ https://issues.apache.org/jira/browse/FLINK-4712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690966#comment-15690966 ] ASF GitHub Bot commented on FLINK-4712: --- Github user gaborhermann commented on the issue: https://github.com/apache/flink/pull/2838 Hello Theodore, Thank you for checking out our solution! I would not like to answer now, as we did most of the work together with @proto-n, and I might be wrong in some aspects. I'll discuss these issues with him tomorrow personally, and answer ASAP! > Implementing ranking predictions for ALS > > > Key: FLINK-4712 > URL: https://issues.apache.org/jira/browse/FLINK-4712 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Domokos Miklós Kelen >Assignee: Gábor Hermann > > We started working on implementing ranking predictions for recommender > systems. Ranking prediction means that beside predicting scores for user-item > pairs, the recommender system is able to recommend a top K list for the users. > Details: > In practice, this would mean finding the K items for a particular user with > the highest predicted rating. It should be possible also to specify whether > to exclude the already seen items from a particular user's toplist. (See for > example the 'exclude_known' setting of [Graphlab Create's ranking > factorization > recommender|https://turi.com/products/create/docs/generated/graphlab.recommender.ranking_factorization_recommender.RankingFactorizationRecommender.recommend.html#graphlab.recommender.ranking_factorization_recommender.RankingFactorizationRecommender.recommend] > ). > The output of the topK recommendation function could be in the form of > {{DataSet[(Int,Int,Int)]}}, meaning (user, item, rank), similar to Graphlab > Create's output. However, this is arguable: follow up work includes > implementing ranking recommendation evaluation metrics (such as precision@k, > recall@k, ndcg@k), similar to [Spark's > implementations|https://spark.apache.org/docs/1.5.0/mllib-evaluation-metrics.html#ranking-systems]. > It would be beneficial if we were able to design the API such that it could > be included in the proposed evaluation framework (see > [5157|https://issues.apache.org/jira/browse/FLINK-2157]), which makes it > neccessary to consider the possible output type {{DataSet[(Int, > Array[Int])]}} or {{DataSet[(Int, Array[(Int,Double)])]}} meaning (user, > array of items), possibly including the predicted scores as well. See > [4713|https://issues.apache.org/jira/browse/FLINK-4713] for details. > Another question arising is whether to provide this function as a member of > the ALS class, as a switch-kind of parameter to the ALS implementation > (meaning the model is either a rating or a ranking recommender model) or in > some other way. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2838: [FLINK-4712] [FLINK-4713] [ml] Ranking recommendation & e...
Github user gaborhermann commented on the issue: https://github.com/apache/flink/pull/2838 Hello Theodore, Thank you for checking out our solution! I would not like to answer now, as we did most of the work together with @proto-n, and I might be wrong in some aspects. I'll discuss these issues with him tomorrow personally, and answer ASAP! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3921) StringParser not specifying encoding to use
[ https://issues.apache.org/jira/browse/FLINK-3921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690946#comment-15690946 ] ASF GitHub Bot commented on FLINK-3921: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2060 I am working on merging this .. > StringParser not specifying encoding to use > --- > > Key: FLINK-3921 > URL: https://issues.apache.org/jira/browse/FLINK-3921 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.3 >Reporter: Tatu Saloranta >Assignee: Rekha Joshi >Priority: Trivial > > Class `flink.types.parser.StringParser` has javadocs indicating that contents > are expected to be Ascii, similar to `StringValueParser`. That makes sense, > but when constructing actual instance, no encoding is specified; on line 66 > f.ex: >this.result = new String(bytes, startPos+1, i - startPos - 2); > which leads to using whatever default platform encoding is. If contents > really are always Ascii (would not count on that as parser is used from CSV > reader), not a big deal, but it can lead to the usual Latin-1-VS-UTF-8 issues. > So I think that encoding should be explicitly specified, whatever is to be > used: javadocs claim ascii, so could be "us-ascii", but could well be UTF-8 > or even ISO-8859-1. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2060: [FLINK-3921] StringParser encoding
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2060 I am working on merging this .. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4861) Package optional project artifacts
[ https://issues.apache.org/jira/browse/FLINK-4861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690897#comment-15690897 ] ASF GitHub Bot commented on FLINK-4861: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2664 @rmetzger, I have not been able to improve on the current configuration. `useTransitiveFiltering` seems to work as the inverse from the plugin documentation (true prevents transitive dependencies from being filtered), but project artifacts are still ignored as transitive dependencies. This implementation is only including so there shouldn't be the same risk of failing to exclude an unneeded dependency. How would creating separate assembly descriptors be beneficial? > Package optional project artifacts > -- > > Key: FLINK-4861 > URL: https://issues.apache.org/jira/browse/FLINK-4861 > Project: Flink > Issue Type: New Feature > Components: Build System >Affects Versions: 1.2.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.2.0 > > > Per the mailing list > [discussion|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Additional-project-downloads-td13223.html], > package the Flink libraries and connectors into subdirectories of a new > {{opt}} directory in the release/snapshot tarballs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2664: [FLINK-4861] [build] Package optional project artifacts
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2664 @rmetzger, I have not been able to improve on the current configuration. `useTransitiveFiltering` seems to work as the inverse from the plugin documentation (true prevents transitive dependencies from being filtered), but project artifacts are still ignored as transitive dependencies. This implementation is only including so there shouldn't be the same risk of failing to exclude an unneeded dependency. How would creating separate assembly descriptors be beneficial? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner
[ https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690834#comment-15690834 ] ASF GitHub Bot commented on FLINK-4928: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r88945180 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements
[GitHub] flink issue #2767: [FLINK-4988] Elasticsearch 5.x support
Github user mikedias commented on the issue: https://github.com/apache/flink/pull/2767 Not sure if I can exclude netty4 dependency, but I'll take a look. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner
[ https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690852#comment-15690852 ] ASF GitHub Bot commented on FLINK-4928: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89297180 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements
[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner
[ https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690843#comment-15690843 ] ASF GitHub Bot commented on FLINK-4928: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89291899 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements
[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner
[ https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690838#comment-15690838 ] ASF GitHub Bot commented on FLINK-4928: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89292596 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements
[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner
[ https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690846#comment-15690846 ] ASF GitHub Bot commented on FLINK-4928: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89360108 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements
[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89292596 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements LeaderContender, OnCompletionActions, FatalErrorHandler { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class); + + /** The process environment variables
[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner
[ https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690839#comment-15690839 ] ASF GitHub Bot commented on FLINK-4928: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r88945488 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements
[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89293704 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements LeaderContender, OnCompletionActions, FatalErrorHandler { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class); + + /** The process environment variables
[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner
[ https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690849#comment-15690849 ] ASF GitHub Bot commented on FLINK-4928: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89293375 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. --- End diff -- `YarnResourceManager` does not exist. > Implement FLIP-6 YARN Application Master Runner > --- > > Key: FLINK-4928 > URL:
[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89296224 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements LeaderContender, OnCompletionActions, FatalErrorHandler { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class); + + /** The process environment variables
[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r88944228 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements LeaderContender, OnCompletionActions, FatalErrorHandler { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class); + + /** The process environment variables
[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89358052 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements LeaderContender, OnCompletionActions, FatalErrorHandler { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class); + + /** The process environment variables
[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner
[ https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690848#comment-15690848 ] ASF GitHub Bot commented on FLINK-4928: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89292790 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements
[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r88946050 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements LeaderContender, OnCompletionActions, FatalErrorHandler { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class); + + /** The process environment variables
[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner
[ https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690837#comment-15690837 ] ASF GitHub Bot commented on FLINK-4928: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r88944228 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements
[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner
[ https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690844#comment-15690844 ] ASF GitHub Bot commented on FLINK-4928: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89291817 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements
[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89291817 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements LeaderContender, OnCompletionActions, FatalErrorHandler { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class); + + /** The process environment variables
[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r88945488 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements LeaderContender, OnCompletionActions, FatalErrorHandler { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class); + + /** The process environment variables
[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner
[ https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690836#comment-15690836 ] ASF GitHub Bot commented on FLINK-4928: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89352452 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements
[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89291899 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements LeaderContender, OnCompletionActions, FatalErrorHandler { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class); + + /** The process environment variables
[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89360108 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements LeaderContender, OnCompletionActions, FatalErrorHandler { --- End diff -- The current implementation tries to solve the Yarn cluster per job scenario, right? Maybe we could refactor the code such that we have
[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner
[ https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690840#comment-15690840 ] ASF GitHub Bot commented on FLINK-4928: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89291958 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements
[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner
[ https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690842#comment-15690842 ] ASF GitHub Bot commented on FLINK-4928: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89293704 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690831#comment-15690831 ] ASF GitHub Bot commented on FLINK-4988: --- Github user mikedias commented on the issue: https://github.com/apache/flink/pull/2767 Not sure if I can exclude netty4 dependency, but I'll take a look. > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner
[ https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690850#comment-15690850 ] ASF GitHub Bot commented on FLINK-4928: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r88946050 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements
[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner
[ https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690832#comment-15690832 ] ASF GitHub Bot commented on FLINK-4928: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r88944244 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements
[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner
[ https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690833#comment-15690833 ] ASF GitHub Bot commented on FLINK-4928: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89292656 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements
[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner
[ https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690845#comment-15690845 ] ASF GitHub Bot commented on FLINK-4928: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89294779 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements
[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89297418 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements LeaderContender, OnCompletionActions, FatalErrorHandler { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class); + + /** The process environment variables
[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner
[ https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690851#comment-15690851 ] ASF GitHub Bot commented on FLINK-4928: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89358052 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements
[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89292790 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements LeaderContender, OnCompletionActions, FatalErrorHandler { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class); + + /** The process environment variables
[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner
[ https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690835#comment-15690835 ] ASF GitHub Bot commented on FLINK-4928: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89296224 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements
[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r88945180 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements LeaderContender, OnCompletionActions, FatalErrorHandler { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class); + + /** The process environment variables
[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner
[ https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690841#comment-15690841 ] ASF GitHub Bot commented on FLINK-4928: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89293898 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements
[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r88944244 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements LeaderContender, OnCompletionActions, FatalErrorHandler { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class); + + /** The process environment variables
[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89291958 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements LeaderContender, OnCompletionActions, FatalErrorHandler { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class); + + /** The process environment variables
[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89352452 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements LeaderContender, OnCompletionActions, FatalErrorHandler { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class); + + /** The process environment variables
[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89294779 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements LeaderContender, OnCompletionActions, FatalErrorHandler { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class); + + /** The process environment variables
[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89292656 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements LeaderContender, OnCompletionActions, FatalErrorHandler { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class); + + /** The process environment variables
[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89293375 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. --- End diff -- `YarnResourceManager` does not exist. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89297180 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements LeaderContender, OnCompletionActions, FatalErrorHandler { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class); + + /** The process environment variables
[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r89293898 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements LeaderContender, OnCompletionActions, FatalErrorHandler { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class); + + /** The process environment variables
[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89365136 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java --- @@ -139,24 +143,65 @@ public void runFetchLoop() throws Exception { PeriodicOffsetCommitter periodicCommitter = null; try { - // read offsets from ZooKeeper for partitions that did not restore offsets - { - List partitionsWithNoOffset = new ArrayList<>(); - for (KafkaTopicPartitionState partition : subscribedPartitions()) { - if (!partition.isOffsetDefined()) { - partitionsWithNoOffset.add(partition.getKafkaTopicPartition()); - } + List partitionsWithNoOffset = new ArrayList<>(); + for (KafkaTopicPartitionState partition : subscribedPartitions()) { + if (!partition.isOffsetDefined()) { + partitionsWithNoOffset.add(partition.getKafkaTopicPartition()); } + } + + if (partitionsWithNoOffset.size() == subscribedPartitions().length) { + // if all partitions have no initial offsets, that means we're starting fresh without any restored state + switch (startupMode) { + case EARLIEST: + LOG.info("Setting starting point as earliest offset for partitions {}", partitionsWithNoOffset); + + for (KafkaTopicPartitionState partition : subscribedPartitions()) { + partition.setOffset(OffsetRequest.EarliestTime()); + } + break; + case LATEST: + LOG.info("Setting starting point as latest offset for partitions {}", partitionsWithNoOffset); + + for (KafkaTopicPartitionState partition : subscribedPartitions()) { + partition.setOffset(OffsetRequest.LatestTime()); + } + break; + default: + case GROUP_OFFSETS: + LOG.info("Using group offsets in Zookeeper of group.id {} as starting point for partitions {}", + kafkaConfig.getProperty("group.id"), partitionsWithNoOffset); + + MapzkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitionsWithNoOffset); + for (KafkaTopicPartitionState partition : subscribedPartitions()) { + Long offset = zkOffsets.get(partition.getKafkaTopicPartition()); + if (offset != null) { + // the committed offset in ZK represents the next record to process, + // so we subtract it by 1 to correctly represent internal state + partition.setOffset(offset - 1); + } else { + // if we can't find an offset for a partition in ZK when using GROUP_OFFSETS, + // we default to "auto.offset.reset" like the Kafka high-level consumer + LOG.warn("No group offset can be found for partition {} in Zookeeper;" + + " resetting starting offset to 'auto.offset.reset'", partition); + + partition.setOffset(invalidOffsetBehavior); + } + } + } + } else if (partitionsWithNoOffset.size() > 0 && partitionsWithNoOffset.size() < subscribedPartitions().length) {
[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support
[ https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690828#comment-15690828 ] ASF GitHub Bot commented on FLINK-4988: --- Github user mikedias commented on the issue: https://github.com/apache/flink/pull/2767 No, ES is not backward compatible... But we can reuse some classes or interfaces between versions. I have plans to do this in another PR, just for isolate possible issues. > Elasticsearch 5.x support > - > > Key: FLINK-4988 > URL: https://issues.apache.org/jira/browse/FLINK-4988 > Project: Flink > Issue Type: New Feature >Reporter: Mike Dias > > Elasticsearch 5.x was released: > https://www.elastic.co/blog/elasticsearch-5-0-0-released -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker
[ https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690827#comment-15690827 ] ASF GitHub Bot commented on FLINK-4280: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89365136 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java --- @@ -139,24 +143,65 @@ public void runFetchLoop() throws Exception { PeriodicOffsetCommitter periodicCommitter = null; try { - // read offsets from ZooKeeper for partitions that did not restore offsets - { - List partitionsWithNoOffset = new ArrayList<>(); - for (KafkaTopicPartitionState partition : subscribedPartitions()) { - if (!partition.isOffsetDefined()) { - partitionsWithNoOffset.add(partition.getKafkaTopicPartition()); - } + List partitionsWithNoOffset = new ArrayList<>(); + for (KafkaTopicPartitionState partition : subscribedPartitions()) { + if (!partition.isOffsetDefined()) { + partitionsWithNoOffset.add(partition.getKafkaTopicPartition()); } + } + + if (partitionsWithNoOffset.size() == subscribedPartitions().length) { + // if all partitions have no initial offsets, that means we're starting fresh without any restored state + switch (startupMode) { + case EARLIEST: + LOG.info("Setting starting point as earliest offset for partitions {}", partitionsWithNoOffset); + + for (KafkaTopicPartitionState partition : subscribedPartitions()) { + partition.setOffset(OffsetRequest.EarliestTime()); + } + break; + case LATEST: + LOG.info("Setting starting point as latest offset for partitions {}", partitionsWithNoOffset); + + for (KafkaTopicPartitionState partition : subscribedPartitions()) { + partition.setOffset(OffsetRequest.LatestTime()); + } + break; + default: + case GROUP_OFFSETS: + LOG.info("Using group offsets in Zookeeper of group.id {} as starting point for partitions {}", + kafkaConfig.getProperty("group.id"), partitionsWithNoOffset); + + MapzkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitionsWithNoOffset); + for (KafkaTopicPartitionState partition : subscribedPartitions()) { + Long offset = zkOffsets.get(partition.getKafkaTopicPartition()); + if (offset != null) { + // the committed offset in ZK represents the next record to process, + // so we subtract it by 1 to correctly represent internal state + partition.setOffset(offset - 1); + } else { + // if we can't find an offset for a partition in ZK when using GROUP_OFFSETS, + // we default to "auto.offset.reset" like the Kafka high-level consumer + LOG.warn("No group offset can be found for partition {} in Zookeeper;" + + " resetting starting offset to 'auto.offset.reset'", partition); + + partition.setOffset(invalidOffsetBehavior); +
[GitHub] flink issue #2767: [FLINK-4988] Elasticsearch 5.x support
Github user mikedias commented on the issue: https://github.com/apache/flink/pull/2767 No, ES is not backward compatible... But we can reuse some classes or interfaces between versions. I have plans to do this in another PR, just for isolate possible issues. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker
[ https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690817#comment-15690817 ] ASF GitHub Bot commented on FLINK-4280: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2509 Thanks for the review @rmetzger :) I'll aim to address your comments and rebase by the end of this week (will tag you once it's ready). > New Flink-specific option to set starting position of Kafka consumer without > respecting external offsets in ZK / Broker > --- > > Key: FLINK-4280 > URL: https://issues.apache.org/jira/browse/FLINK-4280 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Currently, to start reading from the "earliest" and "latest" position in > topics for the Flink Kafka consumer, users set the Kafka config > {{auto.offset.reset}} in the provided properties configuration. > However, the way this config actually works might be a bit misleading if > users were trying to find a way to "read topics from a starting position". > The way the {{auto.offset.reset}} config works in the Flink Kafka consumer > resembles Kafka's original intent for the setting: first, existing external > offsets committed to the ZK / brokers will be checked; if none exists, then > will {{auto.offset.reset}} be respected. > I propose to add Flink-specific ways to define the starting position, without > taking into account the external offsets. The original behaviour (reference > external offsets first) can be changed to be a user option, so that the > behaviour can be retained for frequent Kafka users that may need some > collaboration with existing non-Flink Kafka consumer applications. > How users will interact with the Flink Kafka consumer after this is added, > with a newly introduced {{flink.starting-position}} config: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "earliest/latest"); > props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a > warning) > props.setProperty("group.id", "...") // this won't have effect on the > starting position anymore (may still be used in external offset committing) > ... > {code} > Or, reference external offsets in ZK / broker: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "external-offsets"); > props.setProperty("auto.offset.reset", "earliest/latest"); // default will be > latest > props.setProperty("group.id", "..."); // will be used to lookup external > offsets in ZK / broker on startup > ... > {code} > A thing we would need to decide on is what would the default value be for > {{flink.starting-position}}. > Two merits I see in adding this: > 1. This compensates the way users generally interpret "read from a starting > position". As the Flink Kafka connector is somewhat essentially a > "high-level" Kafka consumer for Flink users, I think it is reasonable to add > Flink-specific functionality that users will find useful, although it wasn't > supported in Kafka's original consumer designs. > 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is > used only to expose progress to the outside world, and not used to manipulate > how Kafka topics are read in Flink (unless users opt to do so)" is even more > definite and solid. There was some discussion in this PR > (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I > think adding this "decouples" more Flink's internal offset checkpointing from > the external Kafka's offset store. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2509 Thanks for the review @rmetzger :) I'll aim to address your comments and rebase by the end of this week (will tag you once it's ready). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89361344 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java --- @@ -263,6 +265,7 @@ public Void answer(InvocationOnMock invocation) { schema, new Properties(), 0L, + StartupMode.GROUP_OFFSETS, --- End diff -- Will do! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker
[ https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690809#comment-15690809 ] ASF GitHub Bot commented on FLINK-4280: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89364029 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -444,6 +445,134 @@ public void run() { kafkaOffsetHandler.close(); deleteTestTopic(topicName); } + + /** +* This test ensures that when explicitly set to start from earliest record, the consumer +* ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. +*/ + public void runStartFromEarliestOffsets() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + } + + /** +* This test ensures that when explicitly set to start from latest record, the consumer +* ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. +*/ + public void runStartFromLatestOffsets() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + final Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + Thread consumeThread = new Thread(new Runnable() { + @Override + public void run() { + try { + readSequence(env, StartupMode.LATEST, readProps, parallelism, topicName, 30, 50); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + consumeThread.start(); + + Thread.sleep(5000); --- End diff -- Will probably need to write a different / custom read method or topology for this then. The problem is that I wanted to reuse `readSequence()` for the test, but it expects an exact number of read elements for the test to succeed. > New Flink-specific option to set starting position of Kafka consumer without > respecting external offsets in ZK / Broker >
[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89364029 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -444,6 +445,134 @@ public void run() { kafkaOffsetHandler.close(); deleteTestTopic(topicName); } + + /** +* This test ensures that when explicitly set to start from earliest record, the consumer +* ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. +*/ + public void runStartFromEarliestOffsets() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + } + + /** +* This test ensures that when explicitly set to start from latest record, the consumer +* ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. +*/ + public void runStartFromLatestOffsets() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + final Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + Thread consumeThread = new Thread(new Runnable() { + @Override + public void run() { + try { + readSequence(env, StartupMode.LATEST, readProps, parallelism, topicName, 30, 50); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + consumeThread.start(); + + Thread.sleep(5000); --- End diff -- Will probably need to write a different / custom read method or topology for this then. The problem is that I wanted to reuse `readSequence()` for the test, but it expects an exact number of read elements for the test to succeed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89362689 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java --- @@ -408,23 +412,32 @@ else if (partitionsRemoved) { } } - private void getMissingOffsetsFromKafka( + private void replaceEarliestOrLatestOffsetsWithActualValuesFromKafka( Listpartitions) throws IOException { // collect which partitions we should fetch offsets for - List partitionsToGetOffsetsFor = new ArrayList<>(); + List partitionsWithEarliestOffsetSetting = new ArrayList<>(); --- End diff -- Good point, didn't think of that. I'll call `getLastOffsetFromKafka` if getOffset() returns `OffsetRequest.EarliestTime()` or `OffsetRequest.LatestTime()`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89363213 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -444,6 +445,134 @@ public void run() { kafkaOffsetHandler.close(); deleteTestTopic(topicName); } + + /** +* This test ensures that when explicitly set to start from earliest record, the consumer +* ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. +*/ + public void runStartFromEarliestOffsets() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + } + + /** +* This test ensures that when explicitly set to start from latest record, the consumer +* ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. +*/ + public void runStartFromLatestOffsets() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + final Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + Thread consumeThread = new Thread(new Runnable() { + @Override + public void run() { + try { + readSequence(env, StartupMode.LATEST, readProps, parallelism, topicName, 30, 50); + } catch (Exception e) { + throw new RuntimeException(e); --- End diff -- Ah, right! Will fix. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker
[ https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690798#comment-15690798 ] ASF GitHub Bot commented on FLINK-4280: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89363213 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -444,6 +445,134 @@ public void run() { kafkaOffsetHandler.close(); deleteTestTopic(topicName); } + + /** +* This test ensures that when explicitly set to start from earliest record, the consumer +* ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. +*/ + public void runStartFromEarliestOffsets() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + } + + /** +* This test ensures that when explicitly set to start from latest record, the consumer +* ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. +*/ + public void runStartFromLatestOffsets() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + final Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + Thread consumeThread = new Thread(new Runnable() { + @Override + public void run() { + try { + readSequence(env, StartupMode.LATEST, readProps, parallelism, topicName, 30, 50); + } catch (Exception e) { + throw new RuntimeException(e); --- End diff -- Ah, right! Will fix. > New Flink-specific option to set starting position of Kafka consumer without > respecting external offsets in ZK / Broker > --- > > Key: FLINK-4280 > URL: https://issues.apache.org/jira/browse/FLINK-4280 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >
[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker
[ https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690788#comment-15690788 ] ASF GitHub Bot commented on FLINK-4280: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89362689 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java --- @@ -408,23 +412,32 @@ else if (partitionsRemoved) { } } - private void getMissingOffsetsFromKafka( + private void replaceEarliestOrLatestOffsetsWithActualValuesFromKafka( Listpartitions) throws IOException { // collect which partitions we should fetch offsets for - List partitionsToGetOffsetsFor = new ArrayList<>(); + List partitionsWithEarliestOffsetSetting = new ArrayList<>(); --- End diff -- Good point, didn't think of that. I'll call `getLastOffsetFromKafka` if getOffset() returns `OffsetRequest.EarliestTime()` or `OffsetRequest.LatestTime()`. > New Flink-specific option to set starting position of Kafka consumer without > respecting external offsets in ZK / Broker > --- > > Key: FLINK-4280 > URL: https://issues.apache.org/jira/browse/FLINK-4280 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Currently, to start reading from the "earliest" and "latest" position in > topics for the Flink Kafka consumer, users set the Kafka config > {{auto.offset.reset}} in the provided properties configuration. > However, the way this config actually works might be a bit misleading if > users were trying to find a way to "read topics from a starting position". > The way the {{auto.offset.reset}} config works in the Flink Kafka consumer > resembles Kafka's original intent for the setting: first, existing external > offsets committed to the ZK / brokers will be checked; if none exists, then > will {{auto.offset.reset}} be respected. > I propose to add Flink-specific ways to define the starting position, without > taking into account the external offsets. The original behaviour (reference > external offsets first) can be changed to be a user option, so that the > behaviour can be retained for frequent Kafka users that may need some > collaboration with existing non-Flink Kafka consumer applications. > How users will interact with the Flink Kafka consumer after this is added, > with a newly introduced {{flink.starting-position}} config: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "earliest/latest"); > props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a > warning) > props.setProperty("group.id", "...") // this won't have effect on the > starting position anymore (may still be used in external offset committing) > ... > {code} > Or, reference external offsets in ZK / broker: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "external-offsets"); > props.setProperty("auto.offset.reset", "earliest/latest"); // default will be > latest > props.setProperty("group.id", "..."); // will be used to lookup external > offsets in ZK / broker on startup > ... > {code} > A thing we would need to decide on is what would the default value be for > {{flink.starting-position}}. > Two merits I see in adding this: > 1. This compensates the way users generally interpret "read from a starting > position". As the Flink Kafka connector is somewhat essentially a > "high-level" Kafka consumer for Flink users, I think it is reasonable to add > Flink-specific functionality that users will find useful, although it wasn't > supported in Kafka's original consumer designs. > 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is > used only to expose progress to the outside world, and not used to manipulate > how Kafka topics are read in Flink (unless users opt to do so)" is even more > definite and solid. There was some discussion in this PR > (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I > think adding this "decouples" more Flink's internal offset checkpointing from > the external Kafka's offset store. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker
[ https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690768#comment-15690768 ] ASF GitHub Bot commented on FLINK-4280: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89361344 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java --- @@ -263,6 +265,7 @@ public Void answer(InvocationOnMock invocation) { schema, new Properties(), 0L, + StartupMode.GROUP_OFFSETS, --- End diff -- Will do! > New Flink-specific option to set starting position of Kafka consumer without > respecting external offsets in ZK / Broker > --- > > Key: FLINK-4280 > URL: https://issues.apache.org/jira/browse/FLINK-4280 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Currently, to start reading from the "earliest" and "latest" position in > topics for the Flink Kafka consumer, users set the Kafka config > {{auto.offset.reset}} in the provided properties configuration. > However, the way this config actually works might be a bit misleading if > users were trying to find a way to "read topics from a starting position". > The way the {{auto.offset.reset}} config works in the Flink Kafka consumer > resembles Kafka's original intent for the setting: first, existing external > offsets committed to the ZK / brokers will be checked; if none exists, then > will {{auto.offset.reset}} be respected. > I propose to add Flink-specific ways to define the starting position, without > taking into account the external offsets. The original behaviour (reference > external offsets first) can be changed to be a user option, so that the > behaviour can be retained for frequent Kafka users that may need some > collaboration with existing non-Flink Kafka consumer applications. > How users will interact with the Flink Kafka consumer after this is added, > with a newly introduced {{flink.starting-position}} config: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "earliest/latest"); > props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a > warning) > props.setProperty("group.id", "...") // this won't have effect on the > starting position anymore (may still be used in external offset committing) > ... > {code} > Or, reference external offsets in ZK / broker: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "external-offsets"); > props.setProperty("auto.offset.reset", "earliest/latest"); // default will be > latest > props.setProperty("group.id", "..."); // will be used to lookup external > offsets in ZK / broker on startup > ... > {code} > A thing we would need to decide on is what would the default value be for > {{flink.starting-position}}. > Two merits I see in adding this: > 1. This compensates the way users generally interpret "read from a starting > position". As the Flink Kafka connector is somewhat essentially a > "high-level" Kafka consumer for Flink users, I think it is reasonable to add > Flink-specific functionality that users will find useful, although it wasn't > supported in Kafka's original consumer designs. > 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is > used only to expose progress to the outside world, and not used to manipulate > how Kafka topics are read in Flink (unless users opt to do so)" is even more > definite and solid. There was some discussion in this PR > (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I > think adding this "decouples" more Flink's internal offset checkpointing from > the external Kafka's offset store. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker
[ https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690775#comment-15690775 ] ASF GitHub Bot commented on FLINK-4280: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89361839 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java --- @@ -45,10 +45,7 @@ import java.io.File; import java.net.BindException; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.UUID; +import java.util.*; --- End diff -- Ah, this was a IDE auto-complete. The style checks don't cover the test codes, right? I'll revert this. > New Flink-specific option to set starting position of Kafka consumer without > respecting external offsets in ZK / Broker > --- > > Key: FLINK-4280 > URL: https://issues.apache.org/jira/browse/FLINK-4280 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Currently, to start reading from the "earliest" and "latest" position in > topics for the Flink Kafka consumer, users set the Kafka config > {{auto.offset.reset}} in the provided properties configuration. > However, the way this config actually works might be a bit misleading if > users were trying to find a way to "read topics from a starting position". > The way the {{auto.offset.reset}} config works in the Flink Kafka consumer > resembles Kafka's original intent for the setting: first, existing external > offsets committed to the ZK / brokers will be checked; if none exists, then > will {{auto.offset.reset}} be respected. > I propose to add Flink-specific ways to define the starting position, without > taking into account the external offsets. The original behaviour (reference > external offsets first) can be changed to be a user option, so that the > behaviour can be retained for frequent Kafka users that may need some > collaboration with existing non-Flink Kafka consumer applications. > How users will interact with the Flink Kafka consumer after this is added, > with a newly introduced {{flink.starting-position}} config: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "earliest/latest"); > props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a > warning) > props.setProperty("group.id", "...") // this won't have effect on the > starting position anymore (may still be used in external offset committing) > ... > {code} > Or, reference external offsets in ZK / broker: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "external-offsets"); > props.setProperty("auto.offset.reset", "earliest/latest"); // default will be > latest > props.setProperty("group.id", "..."); // will be used to lookup external > offsets in ZK / broker on startup > ... > {code} > A thing we would need to decide on is what would the default value be for > {{flink.starting-position}}. > Two merits I see in adding this: > 1. This compensates the way users generally interpret "read from a starting > position". As the Flink Kafka connector is somewhat essentially a > "high-level" Kafka consumer for Flink users, I think it is reasonable to add > Flink-specific functionality that users will find useful, although it wasn't > supported in Kafka's original consumer designs. > 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is > used only to expose progress to the outside world, and not used to manipulate > how Kafka topics are read in Flink (unless users opt to do so)" is even more > definite and solid. There was some discussion in this PR > (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I > think adding this "decouples" more Flink's internal offset checkpointing from > the external Kafka's offset store. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker
[ https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690773#comment-15690773 ] ASF GitHub Bot commented on FLINK-4280: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89361688 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java --- @@ -131,6 +131,25 @@ public void testMetricsAndEndOfStream() throws Exception { runEndOfStreamTest(); } + // --- startup mode --- + + // TODO not passing due to Kafka Consumer config error --- End diff -- Hmm, if I recall correctly, that's what I did in the first place, but that caused some other issues. I'll definitely give this another look and make sure the test is runnable. > New Flink-specific option to set starting position of Kafka consumer without > respecting external offsets in ZK / Broker > --- > > Key: FLINK-4280 > URL: https://issues.apache.org/jira/browse/FLINK-4280 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Currently, to start reading from the "earliest" and "latest" position in > topics for the Flink Kafka consumer, users set the Kafka config > {{auto.offset.reset}} in the provided properties configuration. > However, the way this config actually works might be a bit misleading if > users were trying to find a way to "read topics from a starting position". > The way the {{auto.offset.reset}} config works in the Flink Kafka consumer > resembles Kafka's original intent for the setting: first, existing external > offsets committed to the ZK / brokers will be checked; if none exists, then > will {{auto.offset.reset}} be respected. > I propose to add Flink-specific ways to define the starting position, without > taking into account the external offsets. The original behaviour (reference > external offsets first) can be changed to be a user option, so that the > behaviour can be retained for frequent Kafka users that may need some > collaboration with existing non-Flink Kafka consumer applications. > How users will interact with the Flink Kafka consumer after this is added, > with a newly introduced {{flink.starting-position}} config: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "earliest/latest"); > props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a > warning) > props.setProperty("group.id", "...") // this won't have effect on the > starting position anymore (may still be used in external offset committing) > ... > {code} > Or, reference external offsets in ZK / broker: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "external-offsets"); > props.setProperty("auto.offset.reset", "earliest/latest"); // default will be > latest > props.setProperty("group.id", "..."); // will be used to lookup external > offsets in ZK / broker on startup > ... > {code} > A thing we would need to decide on is what would the default value be for > {{flink.starting-position}}. > Two merits I see in adding this: > 1. This compensates the way users generally interpret "read from a starting > position". As the Flink Kafka connector is somewhat essentially a > "high-level" Kafka consumer for Flink users, I think it is reasonable to add > Flink-specific functionality that users will find useful, although it wasn't > supported in Kafka's original consumer designs. > 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is > used only to expose progress to the outside world, and not used to manipulate > how Kafka topics are read in Flink (unless users opt to do so)" is even more > definite and solid. There was some discussion in this PR > (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I > think adding this "decouples" more Flink's internal offset checkpointing from > the external Kafka's offset store. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89361839 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java --- @@ -45,10 +45,7 @@ import java.io.File; import java.net.BindException; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.UUID; +import java.util.*; --- End diff -- Ah, this was a IDE auto-complete. The style checks don't cover the test codes, right? I'll revert this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-4820) Slf4j / log4j version upgrade to support dynamic change of log levels.
[ https://issues.apache.org/jira/browse/FLINK-4820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen reassigned FLINK-4820: --- Assignee: Stephan Ewen > Slf4j / log4j version upgrade to support dynamic change of log levels. > -- > > Key: FLINK-4820 > URL: https://issues.apache.org/jira/browse/FLINK-4820 > Project: Flink > Issue Type: Task >Reporter: Zhenzhong Xu >Assignee: Stephan Ewen > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89361688 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java --- @@ -131,6 +131,25 @@ public void testMetricsAndEndOfStream() throws Exception { runEndOfStreamTest(); } + // --- startup mode --- + + // TODO not passing due to Kafka Consumer config error --- End diff -- Hmm, if I recall correctly, that's what I did in the first place, but that caused some other issues. I'll definitely give this another look and make sure the test is runnable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3404) Extend Kafka consumers with interface StoppableFunction
[ https://issues.apache.org/jira/browse/FLINK-3404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690734#comment-15690734 ] static-max commented on FLINK-3404: --- I'm in need of a _stop()_ method too, I will propose an PR for this issue if there is interest. > Extend Kafka consumers with interface StoppableFunction > --- > > Key: FLINK-3404 > URL: https://issues.apache.org/jira/browse/FLINK-3404 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Matthias J. Sax > > Kafka consumers are not stoppable right now. To make them stoppable, they > must implement {{StoppableFunction}}. Implementing method {{stop()}} must > ensure, that the consumer stops pulling new messages from Kafka and issues a > final checkpoint with the last offset. Afterwards, {{run()}} must return. > When implementing this, keep in mind, that the gathered checkpoint might > later be used as a savepoint. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5122) Elasticsearch Sink loses documents when cluster has high load
[ https://issues.apache.org/jira/browse/FLINK-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690712#comment-15690712 ] ASF GitHub Bot commented on FLINK-5122: --- GitHub user static-max opened a pull request: https://github.com/apache/flink/pull/2861 [FLINK-5122] Index requests will be retried if the error is only temp… This PR will re-add index requests to the BulkProcessor if the error is temporay, like * Generel timeout errors * No master * UnavailableShardsException (Rebalancing, Node down) * Bulk queue on node full You can merge this pull request into a Git repository by running: $ git pull https://github.com/static-max/flink flink-connector-elasticsearch2-robust Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2861.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2861 commit 2ea8bd099100203d73af9b3a5e616e6d6d1cd50d Author: Max KuklinskiDate: 2016-11-23T16:54:11Z [FLINK-5122] Index requests will be retried if the error is only temporary on Elasticsearch side. Covered are: Timeouts, No Master, UnavailableShardsException, bulk queue on node full > Elasticsearch Sink loses documents when cluster has high load > - > > Key: FLINK-5122 > URL: https://issues.apache.org/jira/browse/FLINK-5122 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.2.0 >Reporter: static-max >Assignee: static-max > > My cluster had high load and documents got not indexed. This violates the "at > least once" semantics in the ES connector. > I gave pressure on my cluster to test Flink, causing new indices to be > created and balanced. On those errors the bulk should be tried again instead > of being discarded. > Primary shard not active because ES decided to rebalance the index: > 2016-11-15 15:35:16,123 ERROR > org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink - > Failed to index document in Elasticsearch: > UnavailableShardsException[[index-name][3] primary shard is not active > Timeout: [1m], request: [BulkShardRequest to [index-name] containing [20] > requests]] > Bulk queue on node full (I set queue to a low value to reproduce error): > 22:37:57,702 ERROR > org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink - > Failed to index document in Elasticsearch: > RemoteTransportException[[node1][192.168.1.240:9300][indices:data/write/bulk[s][p]]]; > nested: EsRejectedExecutionException[rejected execution of > org.elasticsearch.transport.TransportService$4@727e677c on > EsThreadPoolExecutor[bulk, queue capacity = 1, > org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51322d37[Running, > pool size = 2, active threads = 2, queued tasks = 1, completed tasks = > 2939]]]; > I can try to propose a PR for this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...
GitHub user static-max opened a pull request: https://github.com/apache/flink/pull/2861 [FLINK-5122] Index requests will be retried if the error is only temp⦠This PR will re-add index requests to the BulkProcessor if the error is temporay, like * Generel timeout errors * No master * UnavailableShardsException (Rebalancing, Node down) * Bulk queue on node full You can merge this pull request into a Git repository by running: $ git pull https://github.com/static-max/flink flink-connector-elasticsearch2-robust Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2861.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2861 commit 2ea8bd099100203d73af9b3a5e616e6d6d1cd50d Author: Max KuklinskiDate: 2016-11-23T16:54:11Z [FLINK-5122] Index requests will be retried if the error is only temporary on Elasticsearch side. Covered are: Timeouts, No Master, UnavailableShardsException, bulk queue on node full --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---