[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15692468#comment-15692468
 ] 

ASF GitHub Bot commented on FLINK-4541:
---

Github user AlexanderShoshin commented on the issue:

https://github.com/apache/flink/pull/2811
  
Hi, Fabian.

I've made all the corrections. One thing I didn't understand - are we going 
to allow the case of !joinCondition.isEqui in DataSetSingleRowCrossRule?

I also added several tests to verify that CrossJoin works for both left and 
right single row input.


> Support for SQL NOT IN operator
> ---
>
> Key: FLINK-4541
> URL: https://issues.apache.org/jira/browse/FLINK-4541
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Alexander Shoshin
>
> This should work:
> {code}
> def main(args: Array[String]): Unit = {
> // set up execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 
> 1))
> // register the DataSet as table "WordCount"
> tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
> tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, 
> 'frequency).select('word).filter('word !== "hello"))
> // run a SQL query on the Table and retrieve the result as a new Table
> val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE 
> word NOT IN (SELECT word FROM WordCount2) GROUP BY word")
> table.toDataSet[WC].print()
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2811: [FLINK-4541] Support for SQL NOT IN operator

2016-11-23 Thread AlexanderShoshin
Github user AlexanderShoshin commented on the issue:

https://github.com/apache/flink/pull/2811
  
Hi, Fabian.

I've made all the corrections. One thing I didn't understand - are we going 
to allow the case of !joinCondition.isEqui in DataSetSingleRowCrossRule?

I also added several tests to verify that CrossJoin works for both left and 
right single row input.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #:

2016-11-23 Thread aljoscha
Github user aljoscha commented on the pull request:


https://github.com/apache/flink/commit/da53953e5366185567515d209fcc85d4006f7e8a#commitcomment-19948322
  
In 
flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java:
In 
flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
 on line 40:
I think this won't work because now we don't have all the special 
serialisation logic that `SimpleStateDescriptor` has. What you could do is 
override `getDefaultValue()` and return `null` there. Then, in 
`getInitialValue()` you return the default value and you don't need the extra 
field.

What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-5000) Rename Methods in ManagedInitializationContext

2016-11-23 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-5000.
---
   Resolution: Fixed
Fix Version/s: 1.2.0

Implemented in 4656350fc33d42ff96ad6d5e836e62172b4b0de6

> Rename Methods in ManagedInitializationContext
> --
>
> Key: FLINK-5000
> URL: https://issues.apache.org/jira/browse/FLINK-5000
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.2.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.2.0
>
>
> We should rename {{getManagedOperatorStateStore()}} to 
> {{getOperatorStateStore()}} and  {{getManagedKeyedStateStore()}} to 
> {{getKeyedStateStore()}}. There are no unmanaged stores and having that extra 
> word there seems a bit confusing, plus it makes the names longer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-23 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r89431529
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer {
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator operator;
+
+   /**
+* Keep all {@code AsyncCollector} and their input {@link StreamElement}
+*/
+   private final Map, StreamElement> queue = new 
LinkedHashMap<>();
+   /**
+* For the AsyncWaitOperator chained with StreamSource, the checkpoint 
thread may get the
+* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} 
while {@link AsyncCollectorBuffer#queue}
+* is full since main thread waits on this lock. The StreamElement in
+* {@link AsyncWaitOperator#processElement(StreamRecord)} should be 
treated as a part of all StreamElements
+* in its queue. It will be kept in the operator state while 
snapshotting.
+*/
+   private StreamElement extraStreamElement;
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private final Output output;
+   private final TimestampedCollector timestampedCollector;
+
+   /**
+* Checkpoint lock from {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask#lock}
+*/
+   private final Object lock;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private IOException error;
+
+   public AsyncCollectorBuffer(
+   int bufferSize,
+   AsyncDataStream.OutputMode mode,
+   Output output,
+   TimestampedCollector collector,
+   Object lock,
+   AsyncWaitOperator operator) {
+   Preconditions.checkArgument(bufferSize > 0, "Future buffer size 
should be greater than 0.");
+   Preconditions.checkNotNull(output, "Output should not be 
NULL.");
+   Preconditions.checkNotNull(collector, "TimestampedCollector 
should not be NULL.");
+   Preconditions.checkNotNull(lock, "Checkpoint lock should not be 
NULL.");
+   Preconditions.checkNotNull(operator, "Reference to 
AsyncWaitOperator should not be NULL.");
+
+   this.bufferSize = bufferSize;
+   this.mode = mode;
+   this.output = output;
+   this.timestampedCollector = collector;
  

[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15692081#comment-15692081
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r89431529
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer {
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator operator;
+
+   /**
+* Keep all {@code AsyncCollector} and their input {@link StreamElement}
+*/
+   private final Map, StreamElement> queue = new 
LinkedHashMap<>();
+   /**
+* For the AsyncWaitOperator chained with StreamSource, the checkpoint 
thread may get the
+* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} 
while {@link AsyncCollectorBuffer#queue}
+* is full since main thread waits on this lock. The StreamElement in
+* {@link AsyncWaitOperator#processElement(StreamRecord)} should be 
treated as a part of all StreamElements
+* in its queue. It will be kept in the operator state while 
snapshotting.
+*/
+   private StreamElement extraStreamElement;
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private final Output output;
+   private final TimestampedCollector timestampedCollector;
+
+   /**
+* Checkpoint lock from {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask#lock}
+*/
+   private final Object lock;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private IOException error;
+
+   public AsyncCollectorBuffer(
+   int bufferSize,
+   AsyncDataStream.OutputMode mode,
+   Output output,
+   TimestampedCollector collector,
+   Object lock,
+   AsyncWaitOperator operator) {
+   Preconditions.checkArgument(bufferSize > 0, "Future buffer size 
should be greater than 0.");
+   Preconditions.checkNotNull(output, "Output should not be 
NULL.");
+   Preconditions.checkNotNull(collector, "TimestampedCollector 
should not be NULL.");
+   Preconditions.checkNotNull(lock, "Checkpoint lock should not be 
NULL.");
+   

[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15691892#comment-15691892
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r89424283
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollector.java
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link AsyncCollector} collects data / error in user codes while 
processing async i/o.
+ *
+ * @param  Input type
+ * @param  Output type
+ */
+@Internal
+public class AsyncCollector {
+   private List result;
+   private Throwable error;
+
+   private boolean isDone = false;
+
+   private final AsyncCollectorBuffer buffer;
+
+   public AsyncCollector(AsyncCollectorBuffer buffer) {
+   Preconditions.checkNotNull(buffer, "Reference to 
AsyncCollectorBuffer should not be null");
+
+   this.buffer = buffer;
+   }
+
+   public AsyncCollector(AsyncCollectorBuffer buffer, boolean 
isDone) {
+   this(buffer);
+   this.isDone = isDone;
--- End diff --

Actually, they work as a stub here. I think we can optimize those 
`AsyncCollector`s for `Watermark` and `LatencyMarker` out while optimizing the 
code in `nothingToDo()`.


> Provide support for asynchronous operations over streams
> 
>
> Key: FLINK-4391
> URL: https://issues.apache.org/jira/browse/FLINK-4391
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Jamie Grier
>Assignee: david.wang
>
> Many Flink users need to do asynchronous processing driven by data from a 
> DataStream.  The classic example would be joining against an external 
> database in order to enrich a stream with extra information.
> It would be nice to add general support for this type of operation in the 
> Flink API.  Ideally this could simply take the form of a new operator that 
> manages async operations, keeps so many of them in flight, and then emits 
> results to downstream operators as the async operations complete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-23 Thread bjlovegithub
Github user bjlovegithub commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r89424283
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollector.java
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link AsyncCollector} collects data / error in user codes while 
processing async i/o.
+ *
+ * @param  Input type
+ * @param  Output type
+ */
+@Internal
+public class AsyncCollector {
+   private List result;
+   private Throwable error;
+
+   private boolean isDone = false;
+
+   private final AsyncCollectorBuffer buffer;
+
+   public AsyncCollector(AsyncCollectorBuffer buffer) {
+   Preconditions.checkNotNull(buffer, "Reference to 
AsyncCollectorBuffer should not be null");
+
+   this.buffer = buffer;
+   }
+
+   public AsyncCollector(AsyncCollectorBuffer buffer, boolean 
isDone) {
+   this(buffer);
+   this.isDone = isDone;
--- End diff --

Actually, they work as a stub here. I think we can optimize those 
`AsyncCollector`s for `Watermark` and `LatencyMarker` out while optimizing the 
code in `nothingToDo()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2646) Rich functions should provide a method "closeAfterFailure()"

2016-11-23 Thread Liang Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15691858#comment-15691858
 ] 

Liang Chen commented on FLINK-2646:
---

Please go ahead, thanks.
Sorry, i am a little busy recently.

> Rich functions should provide a method "closeAfterFailure()"
> 
>
> Key: FLINK-2646
> URL: https://issues.apache.org/jira/browse/FLINK-2646
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Liang Chen
> Fix For: 1.0.0
>
>
> Right now, the {{close()}} method of rich functions is invoked in case of 
> proper completion, and in case of canceling in case of error (to allow for 
> cleanup).
> In certain cases, the user function needs to know why it is closed, whether 
> the task completed in a regular fashion, or was canceled/failed.
> I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By 
> default, this method calls {{close()}}. The runtime is the changed to call 
> {{close()}} as part of the regular execution and {{closeAfterFailure()}} in 
> case of an irregular exit.
> Because by default all cases call {{close()}} the change would not be API 
> breaking.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-1536) Graph partitioning operators for Gelly

2016-11-23 Thread Ivan Mushketyk (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15691610#comment-15691610
 ] 

Ivan Mushketyk edited comment on FLINK-1536 at 11/23/16 11:11 PM:
--

Oh, sorry, I didn't mean to type "bipartiate" word there :) Somehow this word 
ingrained in my muscle memory after working on the bipartite graph task :)

Anyway. I'll do some background research and come up with a design document.
Regarding a FLIP. Are there any guidlines about how (or even in what case) to 
create one? 


was (Author: ivan.mushketyk):
Oh, sorry, I didn't mean to type "bipartiate" word there :) Somehow this word 
ingrained in my muscle memory after working on bipartite graph task :)

Anyway. I'll do some background research and come up with a design document.
Regarding a FLIP. Are there any guidlines about how (or even when) to create 
one? 

> Graph partitioning operators for Gelly
> --
>
> Key: FLINK-1536
> URL: https://issues.apache.org/jira/browse/FLINK-1536
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Vasia Kalavri
>Priority: Minor
>
> Smart graph partitioning can significantly improve the performance and 
> scalability of graph analysis applications. Depending on the computation 
> pattern, a graph partitioning algorithm divides the graph into (maybe 
> overlapping) subgraphs, optimizing some objective. For example, if 
> communication is performed across graph edges, one might want to minimize the 
> edges that cross from one partition to another.
> The problem of graph partitioning is a well studied problem and several 
> algorithms have been proposed in the literature. The goal of this project 
> would be to choose a few existing partitioning techniques and implement the 
> corresponding graph partitioning operators for Gelly.
> Some related literature can be found [here| 
> http://www.citeulike.org/user/vasiakalavri/tag/graph-partitioning].



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1536) Graph partitioning operators for Gelly

2016-11-23 Thread Ivan Mushketyk (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15691610#comment-15691610
 ] 

Ivan Mushketyk commented on FLINK-1536:
---

Oh, sorry, I didn't mean to type "bipartiate" word there :) Somehow this word 
ingrained in my muscle memory after working on bipartite graph task :)

Anyway. I'll do some background research and come up with a design document.
Regarding a FLIP. Are there any guidlines about how (or even when) to create 
one? 

> Graph partitioning operators for Gelly
> --
>
> Key: FLINK-1536
> URL: https://issues.apache.org/jira/browse/FLINK-1536
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Vasia Kalavri
>Priority: Minor
>
> Smart graph partitioning can significantly improve the performance and 
> scalability of graph analysis applications. Depending on the computation 
> pattern, a graph partitioning algorithm divides the graph into (maybe 
> overlapping) subgraphs, optimizing some objective. For example, if 
> communication is performed across graph edges, one might want to minimize the 
> edges that cross from one partition to another.
> The problem of graph partitioning is a well studied problem and several 
> algorithms have been proposed in the literature. The goal of this project 
> would be to choose a few existing partitioning techniques and implement the 
> corresponding graph partitioning operators for Gelly.
> Some related literature can be found [here| 
> http://www.citeulike.org/user/vasiakalavri/tag/graph-partitioning].



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5146) Improved resource cleanup in RocksDB keyed state backend

2016-11-23 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-5146:
--
Priority: Blocker  (was: Major)

> Improved resource cleanup in RocksDB keyed state backend
> 
>
> Key: FLINK-5146
> URL: https://issues.apache.org/jira/browse/FLINK-5146
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Blocker
>
> Currently, the resources such as taken snapshots or iterators are not always 
> cleaned up in the RocksDB state backend. In particular, not starting the 
> runnable future will leave taken snapshots unreleased.
> We should improve the releases of all resources allocated through the RocksDB 
> JNI bridge.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4997) Extending Window Function Metadata

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15691503#comment-15691503
 ] 

ASF GitHub Bot commented on FLINK-4997:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2756
  
The changes look very good!  

I'll rebase on top of the recent changes that introduced new 
`fold()`/`reduce()` and will also reshuffle/squash the commits a bit to make 
the history clearer. I hope that's alright.

I'll also change the name of the `process()` method in 
`InternalWindowFunction` back to `apply()` so that we have less changes and the 
core of the changes is more obvious. You could change that name in a future PR 
if you'd like but it's purely internal. I hope this is also alright.

A word on commit titles: we normally use imperative mood, i.e. "Extend 
WindowFunction Metadata" instead of "Extending ..." or "Extends ...". 
Initially, this might seem strange but you can think of a commit as being the 
command to do something. This is a good read on commit messages: 
http://chris.beams.io/posts/git-commit/ I'm not writing so much to discourage 
you, I just like good commit messages. And I hope you'll keep up the good work 
in the code.  

Btw, do you also want to do the same changes for `AllWindowedStream`?


> Extending Window Function Metadata
> --
>
> Key: FLINK-4997
> URL: https://issues.apache.org/jira/browse/FLINK-4997
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, Streaming, Windowing Operators
>Reporter: Ventura Del Monte
>Assignee: Ventura Del Monte
> Fix For: 1.2.0
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2756: [FLINK-4997] Extending Window Function Metadata

2016-11-23 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2756
  
The changes look very good! 👍 

I'll rebase on top of the recent changes that introduced new 
`fold()`/`reduce()` and will also reshuffle/squash the commits a bit to make 
the history clearer. I hope that's alright.

I'll also change the name of the `process()` method in 
`InternalWindowFunction` back to `apply()` so that we have less changes and the 
core of the changes is more obvious. You could change that name in a future PR 
if you'd like but it's purely internal. I hope this is also alright.

A word on commit titles: we normally use imperative mood, i.e. "Extend 
WindowFunction Metadata" instead of "Extending ..." or "Extends ...". 
Initially, this might seem strange but you can think of a commit as being the 
command to do something. This is a good read on commit messages: 
http://chris.beams.io/posts/git-commit/ I'm not writing so much to discourage 
you, I just like good commit messages. And I hope you'll keep up the good work 
in the code. 😃 

Btw, do you also want to do the same changes for `AllWindowedStream`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-4676) Merge flink-batch-connectors and flink-streaming-connectors modules

2016-11-23 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske reassigned FLINK-4676:


Assignee: Fabian Hueske

> Merge flink-batch-connectors and flink-streaming-connectors modules
> ---
>
> Key: FLINK-4676
> URL: https://issues.apache.org/jira/browse/FLINK-4676
> Project: Flink
>  Issue Type: Task
>  Components: Batch Connectors and Input/Output Formats, Build System, 
> Streaming Connectors
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Minor
> Fix For: 1.2.0
>
>
> We have two separate Maven modules for batch and streaming connectors 
> (flink-batch-connectors and flink-streaming-connectors) that contain modules 
> for the individual external systems and storage formats such as HBase, 
> Cassandra, Avro, Elasticsearch, etc.
> Some of these systems can be used in streaming as well as batch jobs as for 
> instance HBase, Cassandra, and Elasticsearch. 
> However, due to the separate main modules for streaming and batch connectors, 
> we currently need to decide where to put a connector. 
> For example, the flink-connector-cassandra module is located in 
> flink-streaming-connectors but includes a CassandraInputFormat and 
> CassandraOutputFormat (i.e., a batch source and sink).
> This issue is about merging flink-batch-connectors and 
> flink-streaming-connectors into a joint flink-connectors module.
> Names of moved modules should not be changed (although this leads to an 
> inconsistent naming scheme: flink-connector-cassandra vs. flink-hbase) to 
> keep the change of code structure transparent to users. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2662) CompilerException: "Bug: Plan generation for Unions picked a ship strategy between binary plan operators."

2016-11-23 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-2662.

   Resolution: Fixed
Fix Version/s: (was: 1.1.3)
   1.1.4

Fixed for 1.1.4 with efbd293afe4348b0f199e2c66a990ae6880edcef
Fixed for 1.2.0 with 7d91b9ec71c9b711e04a91f847f5c85d3f561da6

> CompilerException: "Bug: Plan generation for Unions picked a ship strategy 
> between binary plan operators."
> --
>
> Key: FLINK-2662
> URL: https://issues.apache.org/jira/browse/FLINK-2662
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 0.9.1, 0.10.0
>Reporter: Gabor Gevay
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.2.0, 1.1.4, 1.0.0
>
> Attachments: Bug.java, FlinkBug.scala
>
>
> I have a Flink program which throws the exception in the jira title. Full 
> text:
> Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: 
> Plan generation for Unions picked a ship strategy between binary plan 
> operators.
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>   at 
> org.apache.flink.optimizer.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:194)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:49)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:41)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:162)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:520)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
>   at 
> org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:202)
>   at 
> org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:63)
>   at malom.Solver.main(Solver.java:66)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> The execution plan:
> http://compalg.inf.elte.hu/~ggevay/flink/plan_3_4_0_0_without_verif.txt
> (I obtained this by commenting out the line that throws the exception)
> The code is here:
> https://github.com/ggevay/flink/tree/plan-generation-bug
> The class to run is "Solver". It needs a command line argument, which is a 
> directory where it would write output. (On first run, it generates some 
> lookuptables for a few minutes, which are then placed to /tmp/movegen)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-5143) Add EXISTS to list of supported operators

2016-11-23 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-5143.

   Resolution: Fixed
Fix Version/s: 1.2.0

Fixed with 06d252e89b58c5947b331cf24a552d11ff8767e8

> Add EXISTS to list of supported operators
> -
>
> Key: FLINK-5143
> URL: https://issues.apache.org/jira/browse/FLINK-5143
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
> Fix For: 1.2.0
>
>
> EXISTS is supported in certain cases. We should add it so that e.g. TPC-H 
> query 4 runs properly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-23 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-4937.

   Resolution: Fixed
Fix Version/s: 1.2.0

Fixed with 74e0971a5511c511feb94bee7a0ce39eb9951b62

> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
> Fix For: 1.2.0
>
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2792


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2662) CompilerException: "Bug: Plan generation for Unions picked a ship strategy between binary plan operators."

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15691243#comment-15691243
 ] 

ASF GitHub Bot commented on FLINK-2662:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2848


> CompilerException: "Bug: Plan generation for Unions picked a ship strategy 
> between binary plan operators."
> --
>
> Key: FLINK-2662
> URL: https://issues.apache.org/jira/browse/FLINK-2662
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 0.9.1, 0.10.0
>Reporter: Gabor Gevay
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.0.0, 1.2.0, 1.1.3
>
> Attachments: Bug.java, FlinkBug.scala
>
>
> I have a Flink program which throws the exception in the jira title. Full 
> text:
> Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: 
> Plan generation for Unions picked a ship strategy between binary plan 
> operators.
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>   at 
> org.apache.flink.optimizer.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:194)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:49)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:41)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:162)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:520)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
>   at 
> org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:202)
>   at 
> org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:63)
>   at malom.Solver.main(Solver.java:66)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> The execution plan:
> http://compalg.inf.elte.hu/~ggevay/flink/plan_3_4_0_0_without_verif.txt
> (I obtained this by commenting out the line that throws the exception)
> The code is here:
> https://github.com/ggevay/flink/tree/plan-generation-bug
> The class to run is "Solver". It needs a command line argument, which is a 
> directory where it would write output. (On first run, it generates some 
> lookuptables for a few minutes, which are then placed to /tmp/movegen)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2848: [FLINK-2662] [optimizer] Fix computation of global...

2016-11-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2848


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15691244#comment-15691244
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2792


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2853: [FLINK-5143] [table] Add EXISTS to list of support...

2016-11-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2853


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5143) Add EXISTS to list of supported operators

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15691245#comment-15691245
 ] 

ASF GitHub Bot commented on FLINK-5143:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2853


> Add EXISTS to list of supported operators
> -
>
> Key: FLINK-5143
> URL: https://issues.apache.org/jira/browse/FLINK-5143
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> EXISTS is supported in certain cases. We should add it so that e.g. TPC-H 
> query 4 runs properly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2755: [hotfix][docs] Stream joins don't support tuple po...

2016-11-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2755


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2819: [FLINK-4961] [ml] SGD for Matrix Factorization (WIP)

2016-11-23 Thread gaborhermann
Github user gaborhermann commented on the issue:

https://github.com/apache/flink/pull/2819
  
Hello Theodore,

Thanks for taking a look at this PR!

1. No problem. Some performance evaluations might help us in this case too. 
I don't believe it should have much effect on the performance anyway as a 3-way 
join is only used outside the iteration.
2. I really like the idea of doing a performance evaluation! I'm not 
exactly sure how to do this with a `join` instead of a `coGroup`, so let me 
sketch an implementation before elaborating on the pros/cons. (It was more 
straightforward to use `coGroup`.)
3. The benefit of using more blocks is to use less memory, just as in the 
ALS algorithm, with the disadvantage of using more network. However, more 
blocks here means much more network and bookkeeping compared to ALS, because 
there are more Flink iterations. I've investigated this a bit more and found 
that the main "bottleneck" is the maximum size of the sort-buffer, which was 
around 100 MB with around 40 GB TaskManager memory, and large matrix blocks do 
not fit in. So even given enough memory, we cannot use large blocks. 
Unfortunately we cannot configure the maximum size of the sort-buffer, and I 
would not like to change the underlying code in the core or runtime, but I 
tried a workaround which might work (splitting the factor blocks to subblocks). 
I'll just need to run some measurements. If this workaround turns out fine, 
then it should be okay to go on with this and give instructions in the docs.
4. Okay, I agree.

I hope we could generalize this non-overlapping blocking to other 
optimization problems :) I'll take a look at the paper you linked!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4961) SGD for Matrix Factorization

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15691024#comment-15691024
 ] 

ASF GitHub Bot commented on FLINK-4961:
---

Github user gaborhermann commented on the issue:

https://github.com/apache/flink/pull/2819
  
Hello Theodore,

Thanks for taking a look at this PR!

1. No problem. Some performance evaluations might help us in this case too. 
I don't believe it should have much effect on the performance anyway as a 3-way 
join is only used outside the iteration.
2. I really like the idea of doing a performance evaluation! I'm not 
exactly sure how to do this with a `join` instead of a `coGroup`, so let me 
sketch an implementation before elaborating on the pros/cons. (It was more 
straightforward to use `coGroup`.)
3. The benefit of using more blocks is to use less memory, just as in the 
ALS algorithm, with the disadvantage of using more network. However, more 
blocks here means much more network and bookkeeping compared to ALS, because 
there are more Flink iterations. I've investigated this a bit more and found 
that the main "bottleneck" is the maximum size of the sort-buffer, which was 
around 100 MB with around 40 GB TaskManager memory, and large matrix blocks do 
not fit in. So even given enough memory, we cannot use large blocks. 
Unfortunately we cannot configure the maximum size of the sort-buffer, and I 
would not like to change the underlying code in the core or runtime, but I 
tried a workaround which might work (splitting the factor blocks to subblocks). 
I'll just need to run some measurements. If this workaround turns out fine, 
then it should be okay to go on with this and give instructions in the docs.
4. Okay, I agree.

I hope we could generalize this non-overlapping blocking to other 
optimization problems :) I'll take a look at the paper you linked!


> SGD for Matrix Factorization
> 
>
> Key: FLINK-4961
> URL: https://issues.apache.org/jira/browse/FLINK-4961
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Gábor Hermann
>Assignee: Gábor Hermann
>
> We have started an implementation of distributed stochastic gradient descent 
> for matrix factorization based on Gemulla et al. [1].
> The main problem with distributed SGD in general is the conflicting updates 
> of the model variable. In case of matrix factorization we can avoid 
> conflicting updates by carefully deciding in each iteration step which blocks 
> of the rating matrix we should use to update the corresponding blocks of the 
> user and item matrices (see Figure 1. in paper).
> Although a general SGD solver might seem relevant for this issue, we can do 
> much better in the special case of matrix factorization. E.g. in case of a 
> linear regression model, the model is broadcasted in every iteration. As the 
> model is typically small in that case, we can only avoid conflicts by having 
> a "global" model. Based on this, the general SGD solver is a different issue.
> To give more details, the algorithm works as follows.
> We randomly create user and item vectors, then randomly partition them into 
> {{k}} user and {{k}} item blocks. Based on these factor blocks we partition 
> the rating matrix to {{k * k}} blocks correspondingly.
> In one iteration step we choose {{k}} non-conflicting rating blocks, i.e. we 
> should not choose two rating blocks simultaneously with the same user or item 
> block. This is done by assigning a rating block ID to every user and item 
> block. We match the user, item, and rating blocks by the current rating block 
> ID, and update the user and item factors by the ratings locally. We also 
> update the rating block ID for the factor blocks, thus in the next iteration 
> we use other rating blocks to update the factors.
> In {{k}} iteration we sweep through the whole rating matrix of {{k * k}} 
> blocks (so instead of {{numberOfIterationSteps}} iterations we should do {{k 
> * numberOfIterationSteps}} iterations).
> [1] [http://people.mpi-inf.mpg.de/~rgemulla/publications/gemulla11dsgd.pdf]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4895) Drop support for Hadoop 1

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15691019#comment-15691019
 ] 

ASF GitHub Bot commented on FLINK-4895:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2850
  
Good catch. I removed the reflection magic from the class.


> Drop support for Hadoop 1
> -
>
> Key: FLINK-4895
> URL: https://issues.apache.org/jira/browse/FLINK-4895
> Project: Flink
>  Issue Type: Task
>  Components: Build System
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>
> As per this mailing list discussion: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-Drop-Hadoop-1-support-with-Flink-1-2-td9530.html
>  the community agreed to drop support for Hadoop 1.
> The task includes
> - removing the hadoop-1 / hadoop-2 build profiles, 
> - removing the scripts for generating hadoop-x poms
> - updating the release script
> - updating the nightly build script
> - updating the travis configuration file
> - updating the documentation
> - updating the website



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2850: [FLINK-4895] Drop Hadoop1 support

2016-11-23 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2850
  
Good catch. I removed the reflection magic from the class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4712) Implementing ranking predictions for ALS

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690966#comment-15690966
 ] 

ASF GitHub Bot commented on FLINK-4712:
---

Github user gaborhermann commented on the issue:

https://github.com/apache/flink/pull/2838
  
Hello Theodore,

Thank you for checking out our solution! I would not like to answer now, as 
we did most of the work together with @proto-n, and I might be wrong in some 
aspects. I'll discuss these issues with him tomorrow personally, and answer 
ASAP!


> Implementing ranking predictions for ALS
> 
>
> Key: FLINK-4712
> URL: https://issues.apache.org/jira/browse/FLINK-4712
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Domokos Miklós Kelen
>Assignee: Gábor Hermann
>
> We started working on implementing ranking predictions for recommender 
> systems. Ranking prediction means that beside predicting scores for user-item 
> pairs, the recommender system is able to recommend a top K list for the users.
> Details:
> In practice, this would mean finding the K items for a particular user with 
> the highest predicted rating. It should be possible also to specify whether 
> to exclude the already seen items from a particular user's toplist. (See for 
> example the 'exclude_known' setting of [Graphlab Create's ranking 
> factorization 
> recommender|https://turi.com/products/create/docs/generated/graphlab.recommender.ranking_factorization_recommender.RankingFactorizationRecommender.recommend.html#graphlab.recommender.ranking_factorization_recommender.RankingFactorizationRecommender.recommend]
>  ).
> The output of the topK recommendation function could be in the form of 
> {{DataSet[(Int,Int,Int)]}}, meaning (user, item, rank), similar to Graphlab 
> Create's output. However, this is arguable: follow up work includes 
> implementing ranking recommendation evaluation metrics (such as precision@k, 
> recall@k, ndcg@k), similar to [Spark's 
> implementations|https://spark.apache.org/docs/1.5.0/mllib-evaluation-metrics.html#ranking-systems].
>  It would be beneficial if we were able to design the API such that it could 
> be included in the proposed evaluation framework (see 
> [5157|https://issues.apache.org/jira/browse/FLINK-2157]), which makes it 
> neccessary to consider the possible output type {{DataSet[(Int, 
> Array[Int])]}} or {{DataSet[(Int, Array[(Int,Double)])]}} meaning (user, 
> array of items), possibly including the predicted scores as well. See 
> [4713|https://issues.apache.org/jira/browse/FLINK-4713] for details.
> Another question arising is whether to provide this function as a member of 
> the ALS class, as a switch-kind of parameter to the ALS implementation 
> (meaning the model is either a rating or a ranking recommender model) or in 
> some other way.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2838: [FLINK-4712] [FLINK-4713] [ml] Ranking recommendation & e...

2016-11-23 Thread gaborhermann
Github user gaborhermann commented on the issue:

https://github.com/apache/flink/pull/2838
  
Hello Theodore,

Thank you for checking out our solution! I would not like to answer now, as 
we did most of the work together with @proto-n, and I might be wrong in some 
aspects. I'll discuss these issues with him tomorrow personally, and answer 
ASAP!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3921) StringParser not specifying encoding to use

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690946#comment-15690946
 ] 

ASF GitHub Bot commented on FLINK-3921:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2060
  
I am working on merging this ..


> StringParser not specifying encoding to use
> ---
>
> Key: FLINK-3921
> URL: https://issues.apache.org/jira/browse/FLINK-3921
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.3
>Reporter: Tatu Saloranta
>Assignee: Rekha Joshi
>Priority: Trivial
>
> Class `flink.types.parser.StringParser` has javadocs indicating that contents 
> are expected to be Ascii, similar to `StringValueParser`. That makes sense, 
> but when constructing actual instance, no encoding is specified; on line 66 
> f.ex:
>this.result = new String(bytes, startPos+1, i - startPos - 2);
> which leads to using whatever default platform encoding is. If contents 
> really are always Ascii (would not count on that as parser is used from CSV 
> reader), not a big deal, but it can lead to the usual Latin-1-VS-UTF-8 issues.
> So I think that encoding should be explicitly specified, whatever is to be 
> used: javadocs claim ascii, so could be "us-ascii", but could well be UTF-8 
> or even ISO-8859-1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2060: [FLINK-3921] StringParser encoding

2016-11-23 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2060
  
I am working on merging this ..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4861) Package optional project artifacts

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690897#comment-15690897
 ] 

ASF GitHub Bot commented on FLINK-4861:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2664
  
@rmetzger, I have not been able to improve on the current configuration. 
`useTransitiveFiltering` seems to work as the inverse from the plugin 
documentation (true prevents transitive dependencies from being filtered), but 
project artifacts are still ignored as transitive dependencies.

This implementation is only including so there shouldn't be the same risk 
of failing to exclude an unneeded dependency.

How would creating separate assembly descriptors be beneficial?


> Package optional project artifacts
> --
>
> Key: FLINK-4861
> URL: https://issues.apache.org/jira/browse/FLINK-4861
> Project: Flink
>  Issue Type: New Feature
>  Components: Build System
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.2.0
>
>
> Per the mailing list 
> [discussion|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Additional-project-downloads-td13223.html],
>  package the Flink libraries and connectors into subdirectories of a new 
> {{opt}} directory in the release/snapshot tarballs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2664: [FLINK-4861] [build] Package optional project artifacts

2016-11-23 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2664
  
@rmetzger, I have not been able to improve on the current configuration. 
`useTransitiveFiltering` seems to work as the inverse from the plugin 
documentation (true prevents transitive dependencies from being filtered), but 
project artifacts are still ignored as transitive dependencies.

This implementation is only including so there shouldn't be the same risk 
of failing to exclude an unneeded dependency.

How would creating separate assembly descriptors be beneficial?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690834#comment-15690834
 ] 

ASF GitHub Bot commented on FLINK-4928:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r88945180
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements 

[GitHub] flink issue #2767: [FLINK-4988] Elasticsearch 5.x support

2016-11-23 Thread mikedias
Github user mikedias commented on the issue:

https://github.com/apache/flink/pull/2767
  
Not sure if I can exclude netty4 dependency, but I'll take a look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690852#comment-15690852
 ] 

ASF GitHub Bot commented on FLINK-4928:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r89297180
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements 

[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690843#comment-15690843
 ] 

ASF GitHub Bot commented on FLINK-4928:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r89291899
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements 

[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690838#comment-15690838
 ] 

ASF GitHub Bot commented on FLINK-4928:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r89292596
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements 

[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690846#comment-15690846
 ] 

ASF GitHub Bot commented on FLINK-4928:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r89360108
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements 

[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...

2016-11-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r89292596
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements LeaderContender, 
OnCompletionActions, FatalErrorHandler {
+
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class);
+
+   /** The process environment variables 

[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690839#comment-15690839
 ] 

ASF GitHub Bot commented on FLINK-4928:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r88945488
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements 

[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...

2016-11-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r89293704
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements LeaderContender, 
OnCompletionActions, FatalErrorHandler {
+
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class);
+
+   /** The process environment variables 

[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690849#comment-15690849
 ] 

ASF GitHub Bot commented on FLINK-4928:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r89293375
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
--- End diff --

`YarnResourceManager` does not exist.


> Implement FLIP-6 YARN Application Master Runner
> ---
>
> Key: FLINK-4928
> URL: 

[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...

2016-11-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r89296224
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements LeaderContender, 
OnCompletionActions, FatalErrorHandler {
+
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class);
+
+   /** The process environment variables 

[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...

2016-11-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r88944228
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements LeaderContender, 
OnCompletionActions, FatalErrorHandler {
+
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class);
+
+   /** The process environment variables 

[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...

2016-11-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r89358052
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements LeaderContender, 
OnCompletionActions, FatalErrorHandler {
+
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class);
+
+   /** The process environment variables 

[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690848#comment-15690848
 ] 

ASF GitHub Bot commented on FLINK-4928:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r89292790
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements 

[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...

2016-11-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r88946050
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements LeaderContender, 
OnCompletionActions, FatalErrorHandler {
+
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class);
+
+   /** The process environment variables 

[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690837#comment-15690837
 ] 

ASF GitHub Bot commented on FLINK-4928:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r88944228
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements 

[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690844#comment-15690844
 ] 

ASF GitHub Bot commented on FLINK-4928:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r89291817
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements 

[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...

2016-11-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r89291817
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements LeaderContender, 
OnCompletionActions, FatalErrorHandler {
+
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class);
+
+   /** The process environment variables 

[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...

2016-11-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r88945488
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements LeaderContender, 
OnCompletionActions, FatalErrorHandler {
+
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class);
+
+   /** The process environment variables 

[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690836#comment-15690836
 ] 

ASF GitHub Bot commented on FLINK-4928:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r89352452
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements 

[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...

2016-11-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r89291899
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements LeaderContender, 
OnCompletionActions, FatalErrorHandler {
+
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class);
+
+   /** The process environment variables 

[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...

2016-11-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r89360108
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements LeaderContender, 
OnCompletionActions, FatalErrorHandler {
--- End diff --

The current implementation tries to solve the Yarn cluster per job 
scenario, right? Maybe we could refactor the code such that we have 

[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690840#comment-15690840
 ] 

ASF GitHub Bot commented on FLINK-4928:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r89291958
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements 

[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690842#comment-15690842
 ] 

ASF GitHub Bot commented on FLINK-4928:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r89293704
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements 

[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690831#comment-15690831
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user mikedias commented on the issue:

https://github.com/apache/flink/pull/2767
  
Not sure if I can exclude netty4 dependency, but I'll take a look.


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690850#comment-15690850
 ] 

ASF GitHub Bot commented on FLINK-4928:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r88946050
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements 

[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690832#comment-15690832
 ] 

ASF GitHub Bot commented on FLINK-4928:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r88944244
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements 

[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690833#comment-15690833
 ] 

ASF GitHub Bot commented on FLINK-4928:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r89292656
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements 

[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690845#comment-15690845
 ] 

ASF GitHub Bot commented on FLINK-4928:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r89294779
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements 

[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...

2016-11-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r89297418
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements LeaderContender, 
OnCompletionActions, FatalErrorHandler {
+
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class);
+
+   /** The process environment variables 

[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690851#comment-15690851
 ] 

ASF GitHub Bot commented on FLINK-4928:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r89358052
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements 

[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...

2016-11-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r89292790
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements LeaderContender, 
OnCompletionActions, FatalErrorHandler {
+
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class);
+
+   /** The process environment variables 

[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690835#comment-15690835
 ] 

ASF GitHub Bot commented on FLINK-4928:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r89296224
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements 

[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...

2016-11-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r88945180
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements LeaderContender, 
OnCompletionActions, FatalErrorHandler {
+
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class);
+
+   /** The process environment variables 

[jira] [Commented] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690841#comment-15690841
 ] 

ASF GitHub Bot commented on FLINK-4928:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r89293898
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements 

[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...

2016-11-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r88944244
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements LeaderContender, 
OnCompletionActions, FatalErrorHandler {
+
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class);
+
+   /** The process environment variables 

[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...

2016-11-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r89291958
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements LeaderContender, 
OnCompletionActions, FatalErrorHandler {
+
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class);
+
+   /** The process environment variables 

[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...

2016-11-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r89352452
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements LeaderContender, 
OnCompletionActions, FatalErrorHandler {
+
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class);
+
+   /** The process environment variables 

[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...

2016-11-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r89294779
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements LeaderContender, 
OnCompletionActions, FatalErrorHandler {
+
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class);
+
+   /** The process environment variables 

[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...

2016-11-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r89292656
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements LeaderContender, 
OnCompletionActions, FatalErrorHandler {
+
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class);
+
+   /** The process environment variables 

[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...

2016-11-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r89293375
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
--- End diff --

`YarnResourceManager` does not exist.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...

2016-11-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r89297180
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements LeaderContender, 
OnCompletionActions, FatalErrorHandler {
+
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class);
+
+   /** The process environment variables 

[GitHub] flink pull request #2744: [FLINK-4928] [yarn] Implement FLIP-6 YARN Applicat...

2016-11-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2744#discussion_r89293898
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner implements LeaderContender, 
OnCompletionActions, FatalErrorHandler {
+
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class);
+
+   /** The process environment variables 

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89365136
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
 ---
@@ -139,24 +143,65 @@ public void runFetchLoop() throws Exception {
 
PeriodicOffsetCommitter periodicCommitter = null;
try {
-   // read offsets from ZooKeeper for partitions that did 
not restore offsets
-   {
-   List 
partitionsWithNoOffset = new ArrayList<>();
-   for 
(KafkaTopicPartitionState partition : 
subscribedPartitions()) {
-   if (!partition.isOffsetDefined()) {
-   
partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
-   }
+   List partitionsWithNoOffset = new 
ArrayList<>();
+   for (KafkaTopicPartitionState 
partition : subscribedPartitions()) {
+   if (!partition.isOffsetDefined()) {
+   
partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
}
+   }
+
+   if (partitionsWithNoOffset.size() == 
subscribedPartitions().length) {
+   // if all partitions have no initial offsets, 
that means we're starting fresh without any restored state
+   switch (startupMode) {
+   case EARLIEST:
+   LOG.info("Setting starting 
point as earliest offset for partitions {}", partitionsWithNoOffset);
+
+   for 
(KafkaTopicPartitionState partition : 
subscribedPartitions()) {
+   
partition.setOffset(OffsetRequest.EarliestTime());
+   }
+   break;
+   case LATEST:
+   LOG.info("Setting starting 
point as latest offset for partitions {}", partitionsWithNoOffset);
+
+   for 
(KafkaTopicPartitionState partition : 
subscribedPartitions()) {
+   
partition.setOffset(OffsetRequest.LatestTime());
+   }
+   break;
+   default:
+   case GROUP_OFFSETS:
+   LOG.info("Using group offsets 
in Zookeeper of group.id {} as starting point for partitions {}",
+   
kafkaConfig.getProperty("group.id"), partitionsWithNoOffset);
+
+   Map 
zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitionsWithNoOffset);
+   for 
(KafkaTopicPartitionState partition : 
subscribedPartitions()) {
+   Long offset = 
zkOffsets.get(partition.getKafkaTopicPartition());
+   if (offset != null) {
+   // the 
committed offset in ZK represents the next record to process,
+   // so we 
subtract it by 1 to correctly represent internal state
+   
partition.setOffset(offset - 1);
+   } else {
+   // if we can't 
find an offset for a partition in ZK when using GROUP_OFFSETS,
+   // we default 
to "auto.offset.reset" like the Kafka high-level consumer
+   LOG.warn("No 
group offset can be found for partition {} in Zookeeper;" +
+   " 
resetting starting offset to 'auto.offset.reset'", partition);
+
+   
partition.setOffset(invalidOffsetBehavior);
+   }
+   }
+   }
+   } else if (partitionsWithNoOffset.size() > 0 && 
partitionsWithNoOffset.size() < subscribedPartitions().length) {

[jira] [Commented] (FLINK-4988) Elasticsearch 5.x support

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690828#comment-15690828
 ] 

ASF GitHub Bot commented on FLINK-4988:
---

Github user mikedias commented on the issue:

https://github.com/apache/flink/pull/2767
  
No, ES is not backward compatible... But we can reuse some classes or 
interfaces between versions. I have plans to do this in another PR, just for 
isolate possible issues.


> Elasticsearch 5.x support
> -
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
>  Issue Type: New Feature
>Reporter: Mike Dias
>
> Elasticsearch 5.x was released: 
> https://www.elastic.co/blog/elasticsearch-5-0-0-released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690827#comment-15690827
 ] 

ASF GitHub Bot commented on FLINK-4280:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89365136
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
 ---
@@ -139,24 +143,65 @@ public void runFetchLoop() throws Exception {
 
PeriodicOffsetCommitter periodicCommitter = null;
try {
-   // read offsets from ZooKeeper for partitions that did 
not restore offsets
-   {
-   List 
partitionsWithNoOffset = new ArrayList<>();
-   for 
(KafkaTopicPartitionState partition : 
subscribedPartitions()) {
-   if (!partition.isOffsetDefined()) {
-   
partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
-   }
+   List partitionsWithNoOffset = new 
ArrayList<>();
+   for (KafkaTopicPartitionState 
partition : subscribedPartitions()) {
+   if (!partition.isOffsetDefined()) {
+   
partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
}
+   }
+
+   if (partitionsWithNoOffset.size() == 
subscribedPartitions().length) {
+   // if all partitions have no initial offsets, 
that means we're starting fresh without any restored state
+   switch (startupMode) {
+   case EARLIEST:
+   LOG.info("Setting starting 
point as earliest offset for partitions {}", partitionsWithNoOffset);
+
+   for 
(KafkaTopicPartitionState partition : 
subscribedPartitions()) {
+   
partition.setOffset(OffsetRequest.EarliestTime());
+   }
+   break;
+   case LATEST:
+   LOG.info("Setting starting 
point as latest offset for partitions {}", partitionsWithNoOffset);
+
+   for 
(KafkaTopicPartitionState partition : 
subscribedPartitions()) {
+   
partition.setOffset(OffsetRequest.LatestTime());
+   }
+   break;
+   default:
+   case GROUP_OFFSETS:
+   LOG.info("Using group offsets 
in Zookeeper of group.id {} as starting point for partitions {}",
+   
kafkaConfig.getProperty("group.id"), partitionsWithNoOffset);
+
+   Map 
zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitionsWithNoOffset);
+   for 
(KafkaTopicPartitionState partition : 
subscribedPartitions()) {
+   Long offset = 
zkOffsets.get(partition.getKafkaTopicPartition());
+   if (offset != null) {
+   // the 
committed offset in ZK represents the next record to process,
+   // so we 
subtract it by 1 to correctly represent internal state
+   
partition.setOffset(offset - 1);
+   } else {
+   // if we can't 
find an offset for a partition in ZK when using GROUP_OFFSETS,
+   // we default 
to "auto.offset.reset" like the Kafka high-level consumer
+   LOG.warn("No 
group offset can be found for partition {} in Zookeeper;" +
+   " 
resetting starting offset to 'auto.offset.reset'", partition);
+
+   
partition.setOffset(invalidOffsetBehavior);
+

[GitHub] flink issue #2767: [FLINK-4988] Elasticsearch 5.x support

2016-11-23 Thread mikedias
Github user mikedias commented on the issue:

https://github.com/apache/flink/pull/2767
  
No, ES is not backward compatible... But we can reuse some classes or 
interfaces between versions. I have plans to do this in another PR, just for 
isolate possible issues.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690817#comment-15690817
 ] 

ASF GitHub Bot commented on FLINK-4280:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2509
  
Thanks for the review @rmetzger :)
I'll aim to address your comments and rebase by the end of this week (will 
tag you once it's ready).


> New Flink-specific option to set starting position of Kafka consumer without 
> respecting external offsets in ZK / Broker
> ---
>
> Key: FLINK-4280
> URL: https://issues.apache.org/jira/browse/FLINK-4280
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> Currently, to start reading from the "earliest" and "latest" position in 
> topics for the Flink Kafka consumer, users set the Kafka config 
> {{auto.offset.reset}} in the provided properties configuration.
> However, the way this config actually works might be a bit misleading if 
> users were trying to find a way to "read topics from a starting position". 
> The way the {{auto.offset.reset}} config works in the Flink Kafka consumer 
> resembles Kafka's original intent for the setting: first, existing external 
> offsets committed to the ZK / brokers will be checked; if none exists, then 
> will {{auto.offset.reset}} be respected.
> I propose to add Flink-specific ways to define the starting position, without 
> taking into account the external offsets. The original behaviour (reference 
> external offsets first) can be changed to be a user option, so that the 
> behaviour can be retained for frequent Kafka users that may need some 
> collaboration with existing non-Flink Kafka consumer applications.
> How users will interact with the Flink Kafka consumer after this is added, 
> with a newly introduced {{flink.starting-position}} config:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "earliest/latest");
> props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a 
> warning)
> props.setProperty("group.id", "...") // this won't have effect on the 
> starting position anymore (may still be used in external offset committing)
> ...
> {code}
> Or, reference external offsets in ZK / broker:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "external-offsets");
> props.setProperty("auto.offset.reset", "earliest/latest"); // default will be 
> latest
> props.setProperty("group.id", "..."); // will be used to lookup external 
> offsets in ZK / broker on startup
> ...
> {code}
> A thing we would need to decide on is what would the default value be for 
> {{flink.starting-position}}.
> Two merits I see in adding this:
> 1. This compensates the way users generally interpret "read from a starting 
> position". As the Flink Kafka connector is somewhat essentially a 
> "high-level" Kafka consumer for Flink users, I think it is reasonable to add 
> Flink-specific functionality that users will find useful, although it wasn't 
> supported in Kafka's original consumer designs.
> 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is 
> used only to expose progress to the outside world, and not used to manipulate 
> how Kafka topics are read in Flink (unless users opt to do so)" is even more 
> definite and solid. There was some discussion in this PR 
> (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I 
> think adding this "decouples" more Flink's internal offset checkpointing from 
> the external Kafka's offset store.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...

2016-11-23 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2509
  
Thanks for the review @rmetzger :)
I'll aim to address your comments and rebase by the end of this week (will 
tag you once it's ready).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89361344
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
 ---
@@ -263,6 +265,7 @@ public Void answer(InvocationOnMock invocation) {
 schema,
 new Properties(),
 0L,
+   StartupMode.GROUP_OFFSETS,
--- End diff --

Will do!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690809#comment-15690809
 ] 

ASF GitHub Bot commented on FLINK-4280:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89364029
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -444,6 +445,134 @@ public void run() {
kafkaOffsetHandler.close();
deleteTestTopic(topicName);
}
+
+   /**
+* This test ensures that when explicitly set to start from earliest 
record, the consumer
+* ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+*/
+   public void runStartFromEarliestOffsets() throws Exception {
+   // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
+   final int parallelism = 3;
+   final int recordsInEachPartition = 50;
+
+   final String topicName = 
writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env.getConfig().disableSysoutLogging();
+   env.setParallelism(parallelism);
+
+   Properties readProps = new Properties();
+   readProps.putAll(standardProps);
+   readProps.setProperty("auto.offset.reset", "latest"); // this 
should be ignored
+
+   // the committed offsets should be ignored
+   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+   readSequence(env, StartupMode.EARLIEST, readProps, parallelism, 
topicName, recordsInEachPartition, 0);
+
+   kafkaOffsetHandler.close();
+   deleteTestTopic(topicName);
+   }
+
+   /**
+* This test ensures that when explicitly set to start from latest 
record, the consumer
+* ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+*/
+   public void runStartFromLatestOffsets() throws Exception {
+   // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
+   final int parallelism = 3;
+   final int recordsInEachPartition = 50;
+
+   final String topicName = 
writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env.getConfig().disableSysoutLogging();
+   env.setParallelism(parallelism);
+
+   final Properties readProps = new Properties();
+   readProps.putAll(standardProps);
+   readProps.setProperty("auto.offset.reset", "earliest"); // this 
should be ignored
+
+   // the committed offsets should be ignored
+   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+   Thread consumeThread = new Thread(new Runnable() {
+   @Override
+   public void run() {
+   try {
+   readSequence(env, StartupMode.LATEST, 
readProps, parallelism, topicName, 30, 50);
+   } catch (Exception e) {
+   throw new RuntimeException(e);
+   }
+   }
+   });
+   consumeThread.start();
+
+   Thread.sleep(5000);
--- End diff --

Will probably need to write a different / custom read method or topology 
for this then.
The problem is that I wanted to reuse `readSequence()` for the test, but it 
expects an exact number of read elements for the test to succeed.


> New Flink-specific option to set starting position of Kafka consumer without 
> respecting external offsets in ZK / Broker
> 

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89364029
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -444,6 +445,134 @@ public void run() {
kafkaOffsetHandler.close();
deleteTestTopic(topicName);
}
+
+   /**
+* This test ensures that when explicitly set to start from earliest 
record, the consumer
+* ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+*/
+   public void runStartFromEarliestOffsets() throws Exception {
+   // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
+   final int parallelism = 3;
+   final int recordsInEachPartition = 50;
+
+   final String topicName = 
writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env.getConfig().disableSysoutLogging();
+   env.setParallelism(parallelism);
+
+   Properties readProps = new Properties();
+   readProps.putAll(standardProps);
+   readProps.setProperty("auto.offset.reset", "latest"); // this 
should be ignored
+
+   // the committed offsets should be ignored
+   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+   readSequence(env, StartupMode.EARLIEST, readProps, parallelism, 
topicName, recordsInEachPartition, 0);
+
+   kafkaOffsetHandler.close();
+   deleteTestTopic(topicName);
+   }
+
+   /**
+* This test ensures that when explicitly set to start from latest 
record, the consumer
+* ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+*/
+   public void runStartFromLatestOffsets() throws Exception {
+   // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
+   final int parallelism = 3;
+   final int recordsInEachPartition = 50;
+
+   final String topicName = 
writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env.getConfig().disableSysoutLogging();
+   env.setParallelism(parallelism);
+
+   final Properties readProps = new Properties();
+   readProps.putAll(standardProps);
+   readProps.setProperty("auto.offset.reset", "earliest"); // this 
should be ignored
+
+   // the committed offsets should be ignored
+   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+   Thread consumeThread = new Thread(new Runnable() {
+   @Override
+   public void run() {
+   try {
+   readSequence(env, StartupMode.LATEST, 
readProps, parallelism, topicName, 30, 50);
+   } catch (Exception e) {
+   throw new RuntimeException(e);
+   }
+   }
+   });
+   consumeThread.start();
+
+   Thread.sleep(5000);
--- End diff --

Will probably need to write a different / custom read method or topology 
for this then.
The problem is that I wanted to reuse `readSequence()` for the test, but it 
expects an exact number of read elements for the test to succeed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89362689
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -408,23 +412,32 @@ else if (partitionsRemoved) {
}
}
 
-   private void getMissingOffsetsFromKafka(
+   private void replaceEarliestOrLatestOffsetsWithActualValuesFromKafka(
List 
partitions) throws IOException
{
// collect which partitions we should fetch offsets for
-   List 
partitionsToGetOffsetsFor = new ArrayList<>();
+   List 
partitionsWithEarliestOffsetSetting = new ArrayList<>();
--- End diff --

Good point, didn't think of that. I'll call `getLastOffsetFromKafka` if 
getOffset() returns `OffsetRequest.EarliestTime()` or 
`OffsetRequest.LatestTime()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89363213
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -444,6 +445,134 @@ public void run() {
kafkaOffsetHandler.close();
deleteTestTopic(topicName);
}
+
+   /**
+* This test ensures that when explicitly set to start from earliest 
record, the consumer
+* ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+*/
+   public void runStartFromEarliestOffsets() throws Exception {
+   // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
+   final int parallelism = 3;
+   final int recordsInEachPartition = 50;
+
+   final String topicName = 
writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env.getConfig().disableSysoutLogging();
+   env.setParallelism(parallelism);
+
+   Properties readProps = new Properties();
+   readProps.putAll(standardProps);
+   readProps.setProperty("auto.offset.reset", "latest"); // this 
should be ignored
+
+   // the committed offsets should be ignored
+   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+   readSequence(env, StartupMode.EARLIEST, readProps, parallelism, 
topicName, recordsInEachPartition, 0);
+
+   kafkaOffsetHandler.close();
+   deleteTestTopic(topicName);
+   }
+
+   /**
+* This test ensures that when explicitly set to start from latest 
record, the consumer
+* ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+*/
+   public void runStartFromLatestOffsets() throws Exception {
+   // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
+   final int parallelism = 3;
+   final int recordsInEachPartition = 50;
+
+   final String topicName = 
writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env.getConfig().disableSysoutLogging();
+   env.setParallelism(parallelism);
+
+   final Properties readProps = new Properties();
+   readProps.putAll(standardProps);
+   readProps.setProperty("auto.offset.reset", "earliest"); // this 
should be ignored
+
+   // the committed offsets should be ignored
+   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+   Thread consumeThread = new Thread(new Runnable() {
+   @Override
+   public void run() {
+   try {
+   readSequence(env, StartupMode.LATEST, 
readProps, parallelism, topicName, 30, 50);
+   } catch (Exception e) {
+   throw new RuntimeException(e);
--- End diff --

Ah, right! Will fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690798#comment-15690798
 ] 

ASF GitHub Bot commented on FLINK-4280:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89363213
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -444,6 +445,134 @@ public void run() {
kafkaOffsetHandler.close();
deleteTestTopic(topicName);
}
+
+   /**
+* This test ensures that when explicitly set to start from earliest 
record, the consumer
+* ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+*/
+   public void runStartFromEarliestOffsets() throws Exception {
+   // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
+   final int parallelism = 3;
+   final int recordsInEachPartition = 50;
+
+   final String topicName = 
writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env.getConfig().disableSysoutLogging();
+   env.setParallelism(parallelism);
+
+   Properties readProps = new Properties();
+   readProps.putAll(standardProps);
+   readProps.setProperty("auto.offset.reset", "latest"); // this 
should be ignored
+
+   // the committed offsets should be ignored
+   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+   readSequence(env, StartupMode.EARLIEST, readProps, parallelism, 
topicName, recordsInEachPartition, 0);
+
+   kafkaOffsetHandler.close();
+   deleteTestTopic(topicName);
+   }
+
+   /**
+* This test ensures that when explicitly set to start from latest 
record, the consumer
+* ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+*/
+   public void runStartFromLatestOffsets() throws Exception {
+   // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
+   final int parallelism = 3;
+   final int recordsInEachPartition = 50;
+
+   final String topicName = 
writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env.getConfig().disableSysoutLogging();
+   env.setParallelism(parallelism);
+
+   final Properties readProps = new Properties();
+   readProps.putAll(standardProps);
+   readProps.setProperty("auto.offset.reset", "earliest"); // this 
should be ignored
+
+   // the committed offsets should be ignored
+   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+   Thread consumeThread = new Thread(new Runnable() {
+   @Override
+   public void run() {
+   try {
+   readSequence(env, StartupMode.LATEST, 
readProps, parallelism, topicName, 30, 50);
+   } catch (Exception e) {
+   throw new RuntimeException(e);
--- End diff --

Ah, right! Will fix.


> New Flink-specific option to set starting position of Kafka consumer without 
> respecting external offsets in ZK / Broker
> ---
>
> Key: FLINK-4280
> URL: https://issues.apache.org/jira/browse/FLINK-4280
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>  

[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690788#comment-15690788
 ] 

ASF GitHub Bot commented on FLINK-4280:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89362689
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -408,23 +412,32 @@ else if (partitionsRemoved) {
}
}
 
-   private void getMissingOffsetsFromKafka(
+   private void replaceEarliestOrLatestOffsetsWithActualValuesFromKafka(
List 
partitions) throws IOException
{
// collect which partitions we should fetch offsets for
-   List 
partitionsToGetOffsetsFor = new ArrayList<>();
+   List 
partitionsWithEarliestOffsetSetting = new ArrayList<>();
--- End diff --

Good point, didn't think of that. I'll call `getLastOffsetFromKafka` if 
getOffset() returns `OffsetRequest.EarliestTime()` or 
`OffsetRequest.LatestTime()`.


> New Flink-specific option to set starting position of Kafka consumer without 
> respecting external offsets in ZK / Broker
> ---
>
> Key: FLINK-4280
> URL: https://issues.apache.org/jira/browse/FLINK-4280
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> Currently, to start reading from the "earliest" and "latest" position in 
> topics for the Flink Kafka consumer, users set the Kafka config 
> {{auto.offset.reset}} in the provided properties configuration.
> However, the way this config actually works might be a bit misleading if 
> users were trying to find a way to "read topics from a starting position". 
> The way the {{auto.offset.reset}} config works in the Flink Kafka consumer 
> resembles Kafka's original intent for the setting: first, existing external 
> offsets committed to the ZK / brokers will be checked; if none exists, then 
> will {{auto.offset.reset}} be respected.
> I propose to add Flink-specific ways to define the starting position, without 
> taking into account the external offsets. The original behaviour (reference 
> external offsets first) can be changed to be a user option, so that the 
> behaviour can be retained for frequent Kafka users that may need some 
> collaboration with existing non-Flink Kafka consumer applications.
> How users will interact with the Flink Kafka consumer after this is added, 
> with a newly introduced {{flink.starting-position}} config:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "earliest/latest");
> props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a 
> warning)
> props.setProperty("group.id", "...") // this won't have effect on the 
> starting position anymore (may still be used in external offset committing)
> ...
> {code}
> Or, reference external offsets in ZK / broker:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "external-offsets");
> props.setProperty("auto.offset.reset", "earliest/latest"); // default will be 
> latest
> props.setProperty("group.id", "..."); // will be used to lookup external 
> offsets in ZK / broker on startup
> ...
> {code}
> A thing we would need to decide on is what would the default value be for 
> {{flink.starting-position}}.
> Two merits I see in adding this:
> 1. This compensates the way users generally interpret "read from a starting 
> position". As the Flink Kafka connector is somewhat essentially a 
> "high-level" Kafka consumer for Flink users, I think it is reasonable to add 
> Flink-specific functionality that users will find useful, although it wasn't 
> supported in Kafka's original consumer designs.
> 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is 
> used only to expose progress to the outside world, and not used to manipulate 
> how Kafka topics are read in Flink (unless users opt to do so)" is even more 
> definite and solid. There was some discussion in this PR 
> (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I 
> think adding this "decouples" more Flink's internal offset checkpointing from 
> the external Kafka's offset store.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690768#comment-15690768
 ] 

ASF GitHub Bot commented on FLINK-4280:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89361344
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
 ---
@@ -263,6 +265,7 @@ public Void answer(InvocationOnMock invocation) {
 schema,
 new Properties(),
 0L,
+   StartupMode.GROUP_OFFSETS,
--- End diff --

Will do!


> New Flink-specific option to set starting position of Kafka consumer without 
> respecting external offsets in ZK / Broker
> ---
>
> Key: FLINK-4280
> URL: https://issues.apache.org/jira/browse/FLINK-4280
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> Currently, to start reading from the "earliest" and "latest" position in 
> topics for the Flink Kafka consumer, users set the Kafka config 
> {{auto.offset.reset}} in the provided properties configuration.
> However, the way this config actually works might be a bit misleading if 
> users were trying to find a way to "read topics from a starting position". 
> The way the {{auto.offset.reset}} config works in the Flink Kafka consumer 
> resembles Kafka's original intent for the setting: first, existing external 
> offsets committed to the ZK / brokers will be checked; if none exists, then 
> will {{auto.offset.reset}} be respected.
> I propose to add Flink-specific ways to define the starting position, without 
> taking into account the external offsets. The original behaviour (reference 
> external offsets first) can be changed to be a user option, so that the 
> behaviour can be retained for frequent Kafka users that may need some 
> collaboration with existing non-Flink Kafka consumer applications.
> How users will interact with the Flink Kafka consumer after this is added, 
> with a newly introduced {{flink.starting-position}} config:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "earliest/latest");
> props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a 
> warning)
> props.setProperty("group.id", "...") // this won't have effect on the 
> starting position anymore (may still be used in external offset committing)
> ...
> {code}
> Or, reference external offsets in ZK / broker:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "external-offsets");
> props.setProperty("auto.offset.reset", "earliest/latest"); // default will be 
> latest
> props.setProperty("group.id", "..."); // will be used to lookup external 
> offsets in ZK / broker on startup
> ...
> {code}
> A thing we would need to decide on is what would the default value be for 
> {{flink.starting-position}}.
> Two merits I see in adding this:
> 1. This compensates the way users generally interpret "read from a starting 
> position". As the Flink Kafka connector is somewhat essentially a 
> "high-level" Kafka consumer for Flink users, I think it is reasonable to add 
> Flink-specific functionality that users will find useful, although it wasn't 
> supported in Kafka's original consumer designs.
> 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is 
> used only to expose progress to the outside world, and not used to manipulate 
> how Kafka topics are read in Flink (unless users opt to do so)" is even more 
> definite and solid. There was some discussion in this PR 
> (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I 
> think adding this "decouples" more Flink's internal offset checkpointing from 
> the external Kafka's offset store.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690775#comment-15690775
 ] 

ASF GitHub Bot commented on FLINK-4280:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89361839
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 ---
@@ -45,10 +45,7 @@
 
 import java.io.File;
 import java.net.BindException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
+import java.util.*;
--- End diff --

Ah, this was a IDE auto-complete. The style checks don't cover the test 
codes, right? I'll revert this.


> New Flink-specific option to set starting position of Kafka consumer without 
> respecting external offsets in ZK / Broker
> ---
>
> Key: FLINK-4280
> URL: https://issues.apache.org/jira/browse/FLINK-4280
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> Currently, to start reading from the "earliest" and "latest" position in 
> topics for the Flink Kafka consumer, users set the Kafka config 
> {{auto.offset.reset}} in the provided properties configuration.
> However, the way this config actually works might be a bit misleading if 
> users were trying to find a way to "read topics from a starting position". 
> The way the {{auto.offset.reset}} config works in the Flink Kafka consumer 
> resembles Kafka's original intent for the setting: first, existing external 
> offsets committed to the ZK / brokers will be checked; if none exists, then 
> will {{auto.offset.reset}} be respected.
> I propose to add Flink-specific ways to define the starting position, without 
> taking into account the external offsets. The original behaviour (reference 
> external offsets first) can be changed to be a user option, so that the 
> behaviour can be retained for frequent Kafka users that may need some 
> collaboration with existing non-Flink Kafka consumer applications.
> How users will interact with the Flink Kafka consumer after this is added, 
> with a newly introduced {{flink.starting-position}} config:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "earliest/latest");
> props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a 
> warning)
> props.setProperty("group.id", "...") // this won't have effect on the 
> starting position anymore (may still be used in external offset committing)
> ...
> {code}
> Or, reference external offsets in ZK / broker:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "external-offsets");
> props.setProperty("auto.offset.reset", "earliest/latest"); // default will be 
> latest
> props.setProperty("group.id", "..."); // will be used to lookup external 
> offsets in ZK / broker on startup
> ...
> {code}
> A thing we would need to decide on is what would the default value be for 
> {{flink.starting-position}}.
> Two merits I see in adding this:
> 1. This compensates the way users generally interpret "read from a starting 
> position". As the Flink Kafka connector is somewhat essentially a 
> "high-level" Kafka consumer for Flink users, I think it is reasonable to add 
> Flink-specific functionality that users will find useful, although it wasn't 
> supported in Kafka's original consumer designs.
> 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is 
> used only to expose progress to the outside world, and not used to manipulate 
> how Kafka topics are read in Flink (unless users opt to do so)" is even more 
> definite and solid. There was some discussion in this PR 
> (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I 
> think adding this "decouples" more Flink's internal offset checkpointing from 
> the external Kafka's offset store.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690773#comment-15690773
 ] 

ASF GitHub Bot commented on FLINK-4280:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89361688
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
 ---
@@ -131,6 +131,25 @@ public void testMetricsAndEndOfStream() throws 
Exception {
runEndOfStreamTest();
}
 
+   // --- startup mode ---
+
+   // TODO not passing due to Kafka Consumer config error
--- End diff --

Hmm, if I recall correctly, that's what I did in the first place, but that 
caused some other issues. I'll definitely give this another look and make sure 
the test is runnable.


> New Flink-specific option to set starting position of Kafka consumer without 
> respecting external offsets in ZK / Broker
> ---
>
> Key: FLINK-4280
> URL: https://issues.apache.org/jira/browse/FLINK-4280
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> Currently, to start reading from the "earliest" and "latest" position in 
> topics for the Flink Kafka consumer, users set the Kafka config 
> {{auto.offset.reset}} in the provided properties configuration.
> However, the way this config actually works might be a bit misleading if 
> users were trying to find a way to "read topics from a starting position". 
> The way the {{auto.offset.reset}} config works in the Flink Kafka consumer 
> resembles Kafka's original intent for the setting: first, existing external 
> offsets committed to the ZK / brokers will be checked; if none exists, then 
> will {{auto.offset.reset}} be respected.
> I propose to add Flink-specific ways to define the starting position, without 
> taking into account the external offsets. The original behaviour (reference 
> external offsets first) can be changed to be a user option, so that the 
> behaviour can be retained for frequent Kafka users that may need some 
> collaboration with existing non-Flink Kafka consumer applications.
> How users will interact with the Flink Kafka consumer after this is added, 
> with a newly introduced {{flink.starting-position}} config:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "earliest/latest");
> props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a 
> warning)
> props.setProperty("group.id", "...") // this won't have effect on the 
> starting position anymore (may still be used in external offset committing)
> ...
> {code}
> Or, reference external offsets in ZK / broker:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "external-offsets");
> props.setProperty("auto.offset.reset", "earliest/latest"); // default will be 
> latest
> props.setProperty("group.id", "..."); // will be used to lookup external 
> offsets in ZK / broker on startup
> ...
> {code}
> A thing we would need to decide on is what would the default value be for 
> {{flink.starting-position}}.
> Two merits I see in adding this:
> 1. This compensates the way users generally interpret "read from a starting 
> position". As the Flink Kafka connector is somewhat essentially a 
> "high-level" Kafka consumer for Flink users, I think it is reasonable to add 
> Flink-specific functionality that users will find useful, although it wasn't 
> supported in Kafka's original consumer designs.
> 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is 
> used only to expose progress to the outside world, and not used to manipulate 
> how Kafka topics are read in Flink (unless users opt to do so)" is even more 
> definite and solid. There was some discussion in this PR 
> (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I 
> think adding this "decouples" more Flink's internal offset checkpointing from 
> the external Kafka's offset store.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89361839
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 ---
@@ -45,10 +45,7 @@
 
 import java.io.File;
 import java.net.BindException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
+import java.util.*;
--- End diff --

Ah, this was a IDE auto-complete. The style checks don't cover the test 
codes, right? I'll revert this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-4820) Slf4j / log4j version upgrade to support dynamic change of log levels.

2016-11-23 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen reassigned FLINK-4820:
---

Assignee: Stephan Ewen

> Slf4j / log4j version upgrade to support dynamic change of log levels.
> --
>
> Key: FLINK-4820
> URL: https://issues.apache.org/jira/browse/FLINK-4820
> Project: Flink
>  Issue Type: Task
>Reporter: Zhenzhong Xu
>Assignee: Stephan Ewen
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89361688
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
 ---
@@ -131,6 +131,25 @@ public void testMetricsAndEndOfStream() throws 
Exception {
runEndOfStreamTest();
}
 
+   // --- startup mode ---
+
+   // TODO not passing due to Kafka Consumer config error
--- End diff --

Hmm, if I recall correctly, that's what I did in the first place, but that 
caused some other issues. I'll definitely give this another look and make sure 
the test is runnable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3404) Extend Kafka consumers with interface StoppableFunction

2016-11-23 Thread static-max (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690734#comment-15690734
 ] 

static-max commented on FLINK-3404:
---

I'm in need of a _stop()_ method too, I will propose an PR for this issue if 
there is interest.

> Extend Kafka consumers with interface StoppableFunction
> ---
>
> Key: FLINK-3404
> URL: https://issues.apache.org/jira/browse/FLINK-3404
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Matthias J. Sax
>
> Kafka consumers are not stoppable right now. To make them stoppable, they 
> must implement {{StoppableFunction}}. Implementing method {{stop()}} must 
> ensure, that the consumer stops pulling new messages from Kafka and issues a 
> final checkpoint with the last offset. Afterwards, {{run()}} must return.
> When implementing this, keep in mind, that the gathered checkpoint might 
> later be used as a savepoint.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5122) Elasticsearch Sink loses documents when cluster has high load

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690712#comment-15690712
 ] 

ASF GitHub Bot commented on FLINK-5122:
---

GitHub user static-max opened a pull request:

https://github.com/apache/flink/pull/2861

[FLINK-5122] Index requests will be retried if the error is only temp…

This PR will re-add index requests to the BulkProcessor if the error is 
temporay, like
* Generel timeout errors
* No master
* UnavailableShardsException (Rebalancing, Node down)
* Bulk queue on node full

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/static-max/flink 
flink-connector-elasticsearch2-robust

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2861.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2861


commit 2ea8bd099100203d73af9b3a5e616e6d6d1cd50d
Author: Max Kuklinski 
Date:   2016-11-23T16:54:11Z

[FLINK-5122] Index requests will be retried if the error is only temporary 
on Elasticsearch side. Covered are: Timeouts, No Master, 
UnavailableShardsException, bulk queue on node full




> Elasticsearch Sink loses documents when cluster has high load
> -
>
> Key: FLINK-5122
> URL: https://issues.apache.org/jira/browse/FLINK-5122
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.2.0
>Reporter: static-max
>Assignee: static-max
>
> My cluster had high load and documents got not indexed. This violates the "at 
> least once" semantics in the ES connector.
> I gave pressure on my cluster to test Flink, causing new indices to be 
> created and balanced. On those errors the bulk should be tried again instead 
> of being discarded.
> Primary shard not active because ES decided to rebalance the index:
> 2016-11-15 15:35:16,123 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - 
> Failed to index document in Elasticsearch: 
> UnavailableShardsException[[index-name][3] primary shard is not active 
> Timeout: [1m], request: [BulkShardRequest to [index-name] containing [20] 
> requests]]
> Bulk queue on node full (I set queue to a low value to reproduce error):
> 22:37:57,702 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - 
> Failed to index document in Elasticsearch: 
> RemoteTransportException[[node1][192.168.1.240:9300][indices:data/write/bulk[s][p]]];
>  nested: EsRejectedExecutionException[rejected execution of 
> org.elasticsearch.transport.TransportService$4@727e677c on 
> EsThreadPoolExecutor[bulk, queue capacity = 1, 
> org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51322d37[Running,
>  pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 
> 2939]]];
> I can try to propose a PR for this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...

2016-11-23 Thread static-max
GitHub user static-max opened a pull request:

https://github.com/apache/flink/pull/2861

[FLINK-5122] Index requests will be retried if the error is only temp…

This PR will re-add index requests to the BulkProcessor if the error is 
temporay, like
* Generel timeout errors
* No master
* UnavailableShardsException (Rebalancing, Node down)
* Bulk queue on node full

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/static-max/flink 
flink-connector-elasticsearch2-robust

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2861.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2861


commit 2ea8bd099100203d73af9b3a5e616e6d6d1cd50d
Author: Max Kuklinski 
Date:   2016-11-23T16:54:11Z

[FLINK-5122] Index requests will be retried if the error is only temporary 
on Elasticsearch side. Covered are: Timeouts, No Master, 
UnavailableShardsException, bulk queue on node full




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   4   >