[GitHub] flink issue #4935: [Flink-7945][Metrics]Fix per partition-lag metr...

2017-11-03 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/4935
  
update the code according to the comment. ping @tzulitai 


---


[jira] [Commented] (FLINK-7977) bump version of compatibility check for Flink 1.4

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238759#comment-16238759
 ] 

ASF GitHub Bot commented on FLINK-7977:
---

GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/4945

[FLINK-7977][build] bump version of compatibility check for Flink 1.4

## What is the purpose of the change

Since Flink maintains backward compatibility check for 2 versions, Flink 
1.4 should check compatibility with 1.2

## Brief change log

bump compatible version from 1.1 to 1.2

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

none

## Documentation

none


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-7977

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4945.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4945


commit 4a0cbaac403d7df895ec04fffaa2ecf3b7f9d309
Author: Bowen Li 
Date:   2017-11-04T04:08:24Z

[FLINK-7977] bump version of compatibility check for Flink 1.4




> bump version of compatibility check for Flink 1.4
> -
>
> Key: FLINK-7977
> URL: https://issues.apache.org/jira/browse/FLINK-7977
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.4.0
>
>
> Since Flink maintains backward compatibility check for 2 versions, Flink 1.4 
> should check compatibility with 1.2



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4945: [FLINK-7977][build] bump version of compatibility ...

2017-11-03 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/4945

[FLINK-7977][build] bump version of compatibility check for Flink 1.4

## What is the purpose of the change

Since Flink maintains backward compatibility check for 2 versions, Flink 
1.4 should check compatibility with 1.2

## Brief change log

bump compatible version from 1.1 to 1.2

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

none

## Documentation

none


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-7977

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4945.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4945


commit 4a0cbaac403d7df895ec04fffaa2ecf3b7f9d309
Author: Bowen Li 
Date:   2017-11-04T04:08:24Z

[FLINK-7977] bump version of compatibility check for Flink 1.4




---


[jira] [Updated] (FLINK-7977) bump version of compatibility check for Flink 1.4

2017-11-03 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-7977:

Component/s: Build System

> bump version of compatibility check for Flink 1.4
> -
>
> Key: FLINK-7977
> URL: https://issues.apache.org/jira/browse/FLINK-7977
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.4.0
>
>
> Since Flink maintains backward compatibility check for 2 versions, Flink 1.4 
> should check compatibility with 1.2



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7977) bump version of compatibility check for Flink 1.4

2017-11-03 Thread Bowen Li (JIRA)
Bowen Li created FLINK-7977:
---

 Summary: bump version of compatibility check for Flink 1.4
 Key: FLINK-7977
 URL: https://issues.apache.org/jira/browse/FLINK-7977
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.4.0
Reporter: Bowen Li
Assignee: Bowen Li
Priority: Minor
 Fix For: 1.4.0


Since Flink maintains backward compatibility check for 2 versions, Flink 1.4 
should check compatibility with 1.2



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7976) bump japicmp-maven-plugin version in Flink

2017-11-03 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-7976:

Fix Version/s: 1.5.0

> bump japicmp-maven-plugin version in Flink
> --
>
> Key: FLINK-7976
> URL: https://issues.apache.org/jira/browse/FLINK-7976
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.5.0
>
>
> bump japicmp-maven-plugin version from 0.7.0 to 0.11.0



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7976) bump japicmp-maven-plugin version in Flink

2017-11-03 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-7976:

Affects Version/s: 1.4.0

> bump japicmp-maven-plugin version in Flink
> --
>
> Key: FLINK-7976
> URL: https://issues.apache.org/jira/browse/FLINK-7976
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
>
> bump japicmp-maven-plugin version from 0.7.0 to 0.11.0



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7976) bump japicmp-maven-plugin version in Flink

2017-11-03 Thread Bowen Li (JIRA)
Bowen Li created FLINK-7976:
---

 Summary: bump japicmp-maven-plugin version in Flink
 Key: FLINK-7976
 URL: https://issues.apache.org/jira/browse/FLINK-7976
 Project: Flink
  Issue Type: Improvement
Reporter: Bowen Li
Assignee: Bowen Li
Priority: Minor


bump japicmp-maven-plugin version from 0.7.0 to 0.11.0



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4944: [hotfix] add space bewteen error message lines

2017-11-03 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/4944

[hotfix] add space bewteen error message lines

## What is the purpose of the change

add space bewteen error message lines

## Brief change log

add space bewteen error message lines

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

none

## Documentation

none

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink hotfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4944.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4944


commit be9b95a1b528a02aa46e0a4c90b42718b6bf6035
Author: Bowen Li 
Date:   2017-11-03T22:46:02Z

[hotfix] add space bewteen error message lines




---


[jira] [Assigned] (FLINK-7717) Port TaskManagerMetricsHandler to new REST endpoint

2017-11-03 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li reassigned FLINK-7717:
---

Assignee: (was: Bowen Li)

> Port TaskManagerMetricsHandler to new REST endpoint
> ---
>
> Key: FLINK-7717
> URL: https://issues.apache.org/jira/browse/FLINK-7717
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{TaskManagerMetricsHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7716) Port JobManagerMetricsHandler to new REST endpoint

2017-11-03 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li reassigned FLINK-7716:
---

Assignee: (was: Bowen Li)

> Port JobManagerMetricsHandler to new REST endpoint
> --
>
> Key: FLINK-7716
> URL: https://issues.apache.org/jira/browse/FLINK-7716
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JobManagerMetricsHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7718) Port JobVertixMetricsHandler to new REST endpoint

2017-11-03 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li reassigned FLINK-7718:
---

Assignee: (was: Bowen Li)

> Port JobVertixMetricsHandler to new REST endpoint
> -
>
> Key: FLINK-7718
> URL: https://issues.apache.org/jira/browse/FLINK-7718
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JobVertixMetricsHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7949) AsyncWaitOperator is not restarting when queue is full

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238298#comment-16238298
 ] 

ASF GitHub Bot commented on FLINK-7949:
---

Github user bartektartanus commented on the issue:

https://github.com/apache/flink/pull/4924
  
Yes, it fixes our issue - now our test in 
[nussknacker](https://github.com/TouK/nussknacker) is passing. Process restarts 
seamlessly and works fine even after another restarts, but I haven't managed to 
reproduce this error in simple unit Flink test (yet). Maybe next week :)


> AsyncWaitOperator is not restarting when queue is full
> --
>
> Key: FLINK-7949
> URL: https://issues.apache.org/jira/browse/FLINK-7949
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.2
>Reporter: Bartłomiej Tartanus
>Priority: Major
>   Original Estimate: 0.25h
>  Remaining Estimate: 0.25h
>
> Issue was describe here:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-was-declined-tasks-not-ready-td16066.html
> Issue - AsyncWaitOperator can't restart properly after failure (thread is 
> waiting forever)
> Scenario to reproduce this issue:
> 1. The queue is full (let's assume that its capacity is N elements) 
> 2. There is some pending element waiting, so the 
> pendingStreamElementQueueEntry field in AsyncWaitOperator is not null and 
> while-loop in addAsyncBufferEntry method is trying to add this element to 
> the queue (but element is not added because queue is full) 
> 3. Now the snapshot is taken - the whole queue of N elements is being 
> written into the ListState in snapshotState method and also (what is more 
> important) this pendingStreamElementQueueEntry is written to this list too. 
> 4. The process is being restarted, so it tries to recover all the elements 
> and put them again into the queue, but the list of recovered elements hold 
> N+1 element and our queue capacity is only N. Process is not started yet, so 
> it can not process any element and this one element is waiting endlessly. 
> But it's never added and the process will never process anything. Deadlock. 
> 5. Trigger is fired and indeed discarded because the process is not running 
> yet. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4924: [FLINK-7949] AsyncWaitOperator is not restarting when que...

2017-11-03 Thread bartektartanus
Github user bartektartanus commented on the issue:

https://github.com/apache/flink/pull/4924
  
Yes, it fixes our issue - now our test in 
[nussknacker](https://github.com/TouK/nussknacker) is passing. Process restarts 
seamlessly and works fine even after another restarts, but I haven't managed to 
reproduce this error in simple unit Flink test (yet). Maybe next week :)


---


[jira] [Closed] (FLINK-7973) Fix service shading relocation for S3 file systems

2017-11-03 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-7973.
---

> Fix service shading relocation for S3 file systems
> --
>
> Key: FLINK-7973
> URL: https://issues.apache.org/jira/browse/FLINK-7973
> Project: Flink
>  Issue Type: Bug
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The shade plugin relocates services incorrectly currently, applying 
> relocation patterns multiple times.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-7973) Fix service shading relocation for S3 file systems

2017-11-03 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-7973.
-

Fixed via 84d0677f98f81d50411957103be82993e40f587a

> Fix service shading relocation for S3 file systems
> --
>
> Key: FLINK-7973
> URL: https://issues.apache.org/jira/browse/FLINK-7973
> Project: Flink
>  Issue Type: Bug
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The shade plugin relocates services incorrectly currently, applying 
> relocation patterns multiple times.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4943: [FLINK-6022] [avro] Use Avro to serialize Avro in ...

2017-11-03 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/4943

[FLINK-6022] [avro] Use Avro to serialize Avro in flight and in State 

## What is the purpose of the change

This changes Avro types to be serialized with a proper Avro serializer. The 
Avro serializer efficiently handles both Specific Records (generated by Avro) 
and Avro-reflection-based serialization.

In order to maintain backwards compatibility, Avro type info generates 
actually a wrapping serializer that falls back to a Pojo (or Kryo) serializer 
when being reconfigured from an old snapshot.

## Brief change log

  - Adds a proper Avro type serializers
  - Adds a backwards-compatible Avro serializer that falls back to 
Pojo/Kryo on old snapshots
  - Adds a bunch of test

## Verifying this change

  -  Using Avro specific record types in the program and enjoying nice 
performant execution ;-)
  - Using Avro for Flink state and getting it serialized via Avro, allowing 
a schema upgrade of state
  - Running the added unit tests

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no)**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no)**
  - The serializers: **(yes** / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no)**
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink use_proper_avro

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4943.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4943


commit dd05a3bf3471702ac8c9129d2d80f2feeca0f949
Author: Stephan Ewen 
Date:   2017-11-03T13:47:33Z

[FLINK-6022] [avro] Use Avro to serialize Avro in flight and in State

This falls back to the original serializer (Pojo / Kryo) in cases where
an old snapshot is resumed.




---


[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238287#comment-16238287
 ] 

ASF GitHub Bot commented on FLINK-6022:
---

GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/4943

[FLINK-6022] [avro] Use Avro to serialize Avro in flight and in State 

## What is the purpose of the change

This changes Avro types to be serialized with a proper Avro serializer. The 
Avro serializer efficiently handles both Specific Records (generated by Avro) 
and Avro-reflection-based serialization.

In order to maintain backwards compatibility, Avro type info generates 
actually a wrapping serializer that falls back to a Pojo (or Kryo) serializer 
when being reconfigured from an old snapshot.

## Brief change log

  - Adds a proper Avro type serializers
  - Adds a backwards-compatible Avro serializer that falls back to 
Pojo/Kryo on old snapshots
  - Adds a bunch of test

## Verifying this change

  -  Using Avro specific record types in the program and enjoying nice 
performant execution ;-)
  - Using Avro for Flink state and getting it serialized via Avro, allowing 
a schema upgrade of state
  - Running the added unit tests

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no)**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no)**
  - The serializers: **(yes** / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no)**
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink use_proper_avro

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4943.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4943


commit dd05a3bf3471702ac8c9129d2d80f2feeca0f949
Author: Stephan Ewen 
Date:   2017-11-03T13:47:33Z

[FLINK-6022] [avro] Use Avro to serialize Avro in flight and in State

This falls back to the original serializer (Pojo / Kryo) in cases where
an old snapshot is resumed.




> Improve support for Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>Priority: Major
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238114#comment-16238114
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r148857309
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java
 ---
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler for the job vertex back pressure.
+ */
+public class JobVertexBackPressureHandler extends 
AbstractExecutionGraphHandler {
+   /** Back pressure stats tracker. */
+   private final BackPressureStatsTracker backPressureStatsTracker;
+
+   /** Time after which stats are considered outdated. */
+   private final int refreshInterval;
+
+   public JobVertexBackPressureHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   MessageHeaders messageHeaders,
+   ExecutionGraphCache executionGraphCache,
+   Executor executor,
+   Configuration clusterConfiguration) {
+   super(localRestAddress, leaderRetriever, timeout, 
messageHeaders, executionGraphCache, executor);
+
+   // Back pressure stats tracker config
+   this.refreshInterval = 
clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL);
+   this.backPressureStatsTracker = new BackPressureStatsTracker(
+   new StackTraceSampleCoordinator(executor, 6),
+   
clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL),
+   
clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES),
+   
Time.milliseconds(clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_DELAY)));
+   }
+
+   @Override
+   protected 

[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...

2017-11-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r148857309
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java
 ---
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler for the job vertex back pressure.
+ */
+public class JobVertexBackPressureHandler extends 
AbstractExecutionGraphHandler {
+   /** Back pressure stats tracker. */
+   private final BackPressureStatsTracker backPressureStatsTracker;
+
+   /** Time after which stats are considered outdated. */
+   private final int refreshInterval;
+
+   public JobVertexBackPressureHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   MessageHeaders messageHeaders,
+   ExecutionGraphCache executionGraphCache,
+   Executor executor,
+   Configuration clusterConfiguration) {
+   super(localRestAddress, leaderRetriever, timeout, 
messageHeaders, executionGraphCache, executor);
+
+   // Back pressure stats tracker config
+   this.refreshInterval = 
clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL);
+   this.backPressureStatsTracker = new BackPressureStatsTracker(
+   new StackTraceSampleCoordinator(executor, 6),
+   
clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL),
+   
clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES),
+   
Time.milliseconds(clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_DELAY)));
+   }
+
+   @Override
+   protected JobVertexBackPressureInfo 
handleRequest(HandlerRequest 
request, AccessExecutionGraph executionGraph) throws RestHandlerException {
+   JobVertexID jobVertexID = 

[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...

2017-11-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r148857541
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureHeaders.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureHeaders implements 
MessageHeaders {
+
+   private static final JobVertexBackPressureHeaders INSTANCE = new 
JobVertexBackPressureHeaders();
+
+   private static final String URL = 
"/jobs/:jobid/vertices/:vertexid/backpressure";
--- End diff --

Instead of writing `/jobs/:jobid` we could write `/jobs/: + 
JobIDParameter.KEY`.


---


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238115#comment-16238115
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r148857541
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureHeaders.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureHeaders implements 
MessageHeaders {
+
+   private static final JobVertexBackPressureHeaders INSTANCE = new 
JobVertexBackPressureHeaders();
+
+   private static final String URL = 
"/jobs/:jobid/vertices/:vertexid/backpressure";
--- End diff --

Instead of writing `/jobs/:jobid` we could write `/jobs/: + 
JobIDParameter.KEY`.


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7949) AsyncWaitOperator is not restarting when queue is full

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238103#comment-16238103
 ] 

ASF GitHub Bot commented on FLINK-7949:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4924
  
Thanks for the fix @bartektartanus. LGTM. 

Did you verify that this actually fixes the problem? Would be great if we 
could also add a unit test to guard against future regressions.


> AsyncWaitOperator is not restarting when queue is full
> --
>
> Key: FLINK-7949
> URL: https://issues.apache.org/jira/browse/FLINK-7949
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.2
>Reporter: Bartłomiej Tartanus
>Priority: Major
>   Original Estimate: 0.25h
>  Remaining Estimate: 0.25h
>
> Issue was describe here:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-was-declined-tasks-not-ready-td16066.html
> Issue - AsyncWaitOperator can't restart properly after failure (thread is 
> waiting forever)
> Scenario to reproduce this issue:
> 1. The queue is full (let's assume that its capacity is N elements) 
> 2. There is some pending element waiting, so the 
> pendingStreamElementQueueEntry field in AsyncWaitOperator is not null and 
> while-loop in addAsyncBufferEntry method is trying to add this element to 
> the queue (but element is not added because queue is full) 
> 3. Now the snapshot is taken - the whole queue of N elements is being 
> written into the ListState in snapshotState method and also (what is more 
> important) this pendingStreamElementQueueEntry is written to this list too. 
> 4. The process is being restarted, so it tries to recover all the elements 
> and put them again into the queue, but the list of recovered elements hold 
> N+1 element and our queue capacity is only N. Process is not started yet, so 
> it can not process any element and this one element is waiting endlessly. 
> But it's never added and the process will never process anything. Deadlock. 
> 5. Trigger is fired and indeed discarded because the process is not running 
> yet. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4924: [FLINK-7949] AsyncWaitOperator is not restarting when que...

2017-11-03 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4924
  
Thanks for the fix @bartektartanus. LGTM. 

Did you verify that this actually fixes the problem? Would be great if we 
could also add a unit test to guard against future regressions.


---


[jira] [Resolved] (FLINK-7847) Fix typo in flink-avro shading pattern

2017-11-03 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-7847.
-
Resolution: Fixed

Fixed via b4dead96a16c8772ccf86b533f5de6feb0f3d1f6

> Fix typo in flink-avro shading pattern
> --
>
> Key: FLINK-7847
> URL: https://issues.apache.org/jira/browse/FLINK-7847
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.4.0
>
>
> {code}
> 
>   org.codehaus.jackson
>   
> org.apache.flink.avro.shaded.org.codehouse.jackson
> 
> {code}
> The shaded pattern should be 
> "org.apache.flink.avro.shaded.org.codehaus.jackson".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7847) Fix typo in flink-avro shading pattern

2017-11-03 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-7847.
---

> Fix typo in flink-avro shading pattern
> --
>
> Key: FLINK-7847
> URL: https://issues.apache.org/jira/browse/FLINK-7847
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.4.0
>
>
> {code}
> 
>   org.codehaus.jackson
>   
> org.apache.flink.avro.shaded.org.codehouse.jackson
> 
> {code}
> The shaded pattern should be 
> "org.apache.flink.avro.shaded.org.codehaus.jackson".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7420) Move all Avro code to flink-avro

2017-11-03 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-7420.
---

> Move all Avro code to flink-avro
> 
>
> Key: FLINK-7420
> URL: https://issues.apache.org/jira/browse/FLINK-7420
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem*
> Currently, the {{flink-avro}} project is a shell with some tests and mostly 
> duplicate and dead code. The classes that use Avro are distributed quite 
> wildly through the code base, and introduce multiple direct dependencies on 
> Avro in a messy way.
> That way, we cannot create a proper fat Avro dependency in which we shade 
> Jackson away.
> Also, we expose Avro as a direct and hard dependency on many Flink modules, 
> while it should be a dependency that users that use Avro types selectively 
> add.
> *Suggested Changes*
> We should move all Avro related classes to {{flink-avro}}, and give 
> {{flink-avro}} a dependency on {{flink-core}} and {{flink-streaming-java}}.
>   - {{AvroTypeInfo}}
>   - {{AvroSerializer}}
>   - {{AvroRowSerializationSchema}}
>   - {{AvroRowDeserializationSchema}}
> To be able to move the the avro serialization code from {{flink-ore}} to 
> {{flink-avro}}, we need to load the {{AvroTypeInformation}} reflectively, 
> similar to how we load the {{WritableTypeInfo}} for Hadoop.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-7420) Move all Avro code to flink-avro

2017-11-03 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-7420.
-
Resolution: Fixed

Fixed in
  - 537a10ea2ff6a2d8507483c66f413f77884e77c4
  - db7c70faa0b5aed652b525e8091cdc0e265362bc
  - 29249b2eeb9cb9910a5a55ae6c3a0b648d67d2b5
  - 65e87045c48ce3200ea6690d945ed87b061808af
  - eb99181ddd4851d2f4a64377ebd4fe0ac11e2581

> Move all Avro code to flink-avro
> 
>
> Key: FLINK-7420
> URL: https://issues.apache.org/jira/browse/FLINK-7420
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem*
> Currently, the {{flink-avro}} project is a shell with some tests and mostly 
> duplicate and dead code. The classes that use Avro are distributed quite 
> wildly through the code base, and introduce multiple direct dependencies on 
> Avro in a messy way.
> That way, we cannot create a proper fat Avro dependency in which we shade 
> Jackson away.
> Also, we expose Avro as a direct and hard dependency on many Flink modules, 
> while it should be a dependency that users that use Avro types selectively 
> add.
> *Suggested Changes*
> We should move all Avro related classes to {{flink-avro}}, and give 
> {{flink-avro}} a dependency on {{flink-core}} and {{flink-streaming-java}}.
>   - {{AvroTypeInfo}}
>   - {{AvroSerializer}}
>   - {{AvroRowSerializationSchema}}
>   - {{AvroRowDeserializationSchema}}
> To be able to move the the avro serialization code from {{flink-ore}} to 
> {{flink-avro}}, we need to load the {{AvroTypeInformation}} reflectively, 
> similar to how we load the {{WritableTypeInfo}} for Hadoop.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7972) Move SerializationSchema to flink-core

2017-11-03 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-7972.
---

> Move SerializationSchema to flink-core
> --
>
> Key: FLINK-7972
> URL: https://issues.apache.org/jira/browse/FLINK-7972
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.4.0
>
>
> The {{SerializationSchema}} and its related classes are currently in 
> {{flink-streaming-java}}.
> API level projects that depend on those classes hence pull in a dependency on 
> runtime classes.
> For example, this would be required in order to make {{flink-avro}} 
> independent of runtime dependencies and Scala versions, same for the future 
> for thrift format support, for Hbase connectors, etc.
> This should not be API breaking since we can keep the classes in the same 
> namespace and only move them "updstream" in the dependency structure, or we 
> can keep classes in the original namespace that extend the moved classes in 
> {{flink-core}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-7972) Move SerializationSchema to flink-core

2017-11-03 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-7972.
-
Resolution: Fixed

Fixed in fe931d075f031e9494fd26dbeed4bb1024bd52cf

> Move SerializationSchema to flink-core
> --
>
> Key: FLINK-7972
> URL: https://issues.apache.org/jira/browse/FLINK-7972
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.4.0
>
>
> The {{SerializationSchema}} and its related classes are currently in 
> {{flink-streaming-java}}.
> API level projects that depend on those classes hence pull in a dependency on 
> runtime classes.
> For example, this would be required in order to make {{flink-avro}} 
> independent of runtime dependencies and Scala versions, same for the future 
> for thrift format support, for Hbase connectors, etc.
> This should not be API breaking since we can keep the classes in the same 
> namespace and only move them "updstream" in the dependency structure, or we 
> can keep classes in the original namespace that extend the moved classes in 
> {{flink-core}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to new RES...

2017-11-03 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4734
  
I think @zentol is right and we can register the 
`CurrentJobsOverviewHandler` under `jobs/overview`. Thus, we should add this 
handler as well. Could you rebase this handler onto the latest master such that 
we can merge it?


---


[jira] [Commented] (FLINK-7652) Port CurrentJobIdsHandler to new REST endpoint

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238027#comment-16238027
 ] 

ASF GitHub Bot commented on FLINK-7652:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4734
  
I think @zentol is right and we can register the 
`CurrentJobsOverviewHandler` under `jobs/overview`. Thus, we should add this 
handler as well. Could you rebase this handler onto the latest master such that 
we can merge it?


> Port CurrentJobIdsHandler to new REST endpoint
> --
>
> Key: FLINK-7652
> URL: https://issues.apache.org/jira/browse/FLINK-7652
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{CurrentJobIdsHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7420) Move all Avro code to flink-avro

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238018#comment-16238018
 ] 

ASF GitHub Bot commented on FLINK-7420:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4942


> Move all Avro code to flink-avro
> 
>
> Key: FLINK-7420
> URL: https://issues.apache.org/jira/browse/FLINK-7420
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem*
> Currently, the {{flink-avro}} project is a shell with some tests and mostly 
> duplicate and dead code. The classes that use Avro are distributed quite 
> wildly through the code base, and introduce multiple direct dependencies on 
> Avro in a messy way.
> That way, we cannot create a proper fat Avro dependency in which we shade 
> Jackson away.
> Also, we expose Avro as a direct and hard dependency on many Flink modules, 
> while it should be a dependency that users that use Avro types selectively 
> add.
> *Suggested Changes*
> We should move all Avro related classes to {{flink-avro}}, and give 
> {{flink-avro}} a dependency on {{flink-core}} and {{flink-streaming-java}}.
>   - {{AvroTypeInfo}}
>   - {{AvroSerializer}}
>   - {{AvroRowSerializationSchema}}
>   - {{AvroRowDeserializationSchema}}
> To be able to move the the avro serialization code from {{flink-ore}} to 
> {{flink-avro}}, we need to load the {{AvroTypeInformation}} reflectively, 
> similar to how we load the {{WritableTypeInfo}} for Hadoop.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4942: [FLINK-7420] [avro] Move all Avro code to flink-av...

2017-11-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4942


---


[jira] [Commented] (FLINK-7420) Move all Avro code to flink-avro

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238017#comment-16238017
 ] 

ASF GitHub Bot commented on FLINK-7420:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4931


> Move all Avro code to flink-avro
> 
>
> Key: FLINK-7420
> URL: https://issues.apache.org/jira/browse/FLINK-7420
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem*
> Currently, the {{flink-avro}} project is a shell with some tests and mostly 
> duplicate and dead code. The classes that use Avro are distributed quite 
> wildly through the code base, and introduce multiple direct dependencies on 
> Avro in a messy way.
> That way, we cannot create a proper fat Avro dependency in which we shade 
> Jackson away.
> Also, we expose Avro as a direct and hard dependency on many Flink modules, 
> while it should be a dependency that users that use Avro types selectively 
> add.
> *Suggested Changes*
> We should move all Avro related classes to {{flink-avro}}, and give 
> {{flink-avro}} a dependency on {{flink-core}} and {{flink-streaming-java}}.
>   - {{AvroTypeInfo}}
>   - {{AvroSerializer}}
>   - {{AvroRowSerializationSchema}}
>   - {{AvroRowDeserializationSchema}}
> To be able to move the the avro serialization code from {{flink-ore}} to 
> {{flink-avro}}, we need to load the {{AvroTypeInformation}} reflectively, 
> similar to how we load the {{WritableTypeInfo}} for Hadoop.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4931: [FLINK-7420] Move all Avro code to flink-avro

2017-11-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4931


---


[jira] [Assigned] (FLINK-7975) Queryable state client does not wait for shutdown completion

2017-11-03 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas reassigned FLINK-7975:
-

Assignee: Kostas Kloudas

> Queryable state client does not wait for shutdown completion
> 
>
> Key: FLINK-7975
> URL: https://issues.apache.org/jira/browse/FLINK-7975
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The queryable state client neither waits for its shutdown completion when 
> calling {{Client#shutdown}} nor does it return a termination future on which 
> we could wait for the termination. I think this would be good to provide.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-03 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas reassigned FLINK-7974:
-

Assignee: Kostas Kloudas

> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7975) Queryable state client does not wait for shutdown completion

2017-11-03 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-7975:


 Summary: Queryable state client does not wait for shutdown 
completion
 Key: FLINK-7975
 URL: https://issues.apache.org/jira/browse/FLINK-7975
 Project: Flink
  Issue Type: Bug
  Components: Queryable State
Affects Versions: 1.4.0
Reporter: Till Rohrmann
Priority: Critical


The queryable state client neither waits for its shutdown completion when 
calling {{Client#shutdown}} nor does it return a termination future on which we 
could wait for the termination. I think this would be good to provide.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-03 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-7974:


 Summary: AbstractServerBase#shutdown does not wait for shutdown 
completion
 Key: FLINK-7974
 URL: https://issues.apache.org/jira/browse/FLINK-7974
 Project: Flink
  Issue Type: Bug
  Components: Queryable State
Affects Versions: 1.4.0
Reporter: Till Rohrmann
Priority: Critical


The {{AbstractServerBase}} does not wait for the completion of its shutdown 
when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
leads to resource leaks and instable tests such as the {{AbstractServerTest}}. 
I propose to let the {{AbstractServerBase#shutdown}} return a termination 
future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237763#comment-16237763
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148812132
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import 

[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237749#comment-16237749
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148808721
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -63,11 +64,14 @@
  * The yarn implementation of the resource manager. Used when the system 
is started
  * via the resource framework YARN.
  */
-public class YarnResourceManager extends ResourceManager 
implements AMRMClientAsync.CallbackHandler {
+public class YarnResourceManager extends ResourceManager 
implements AMRMClientAsync.CallbackHandler {
 
/** The process environment variables. */
private final Map env;
 
+   /** YARN container map. Package private for unit test purposes. */
+   final Map workerNodeMap;
--- End diff --

Let's make the key `ResourceID`


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237761#comment-16237761
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148810490
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.io.Serializable;
+
+/**
+ * A stored YARN worker, which contains the YARN container.
+ */
+public class YarnWorkerNode implements ResourceIDRetrievable, Serializable 
{
+
+   private Container yarnContainer;
+
+   public YarnWorkerNode(Container container) {
+   this.yarnContainer = container;
--- End diff --

`Preconditions.checkNotNull`.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237750#comment-16237750
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148808362
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -227,14 +241,27 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
}
 
@Override
-   public boolean stopWorker(ResourceID resourceID) {
-   // TODO: Implement to stop the worker
-   return false;
+   public boolean stopWorker(YarnWorkerNode workerNode) {
+   workerNodeMap.remove(workerNode.getResourceID().toString());
--- End diff --

Let's remove the workerNode after we have stopped it.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237748#comment-16237748
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148809690
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -260,6 +287,7 @@ public void onContainersAllocated(List 
containers) {
numPendingContainerRequests = Math.max(0, 
numPendingContainerRequests - 1);
log.info("Received new container: {} - Remaining 
pending container requests: {}",
container.getId(), 
numPendingContainerRequests);
+   workerNodeMap.put(container.getId().toString(), new 
YarnWorkerNode(container));
--- End diff --

Here we should create a `ResourceID` to use it as the key. This 
`ResourceID` must then be send to the launched TaskManager such that it uses it 
upon registration.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237747#comment-16237747
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148808279
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -227,14 +241,27 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
}
 
@Override
-   public boolean stopWorker(ResourceID resourceID) {
-   // TODO: Implement to stop the worker
-   return false;
+   public boolean stopWorker(YarnWorkerNode workerNode) {
+   workerNodeMap.remove(workerNode.getResourceID().toString());
+   if (workerNode != null) {
--- End diff --

I think this should be checked first.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237755#comment-16237755
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148814129
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import 

[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237760#comment-16237760
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148814354
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import 

[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237746#comment-16237746
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148808482
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -227,14 +241,27 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
}
 
@Override
-   public boolean stopWorker(ResourceID resourceID) {
-   // TODO: Implement to stop the worker
-   return false;
+   public boolean stopWorker(YarnWorkerNode workerNode) {
+   workerNodeMap.remove(workerNode.getResourceID().toString());
+   if (workerNode != null) {
+   Container container = workerNode.getYarnContainer();
+   log.info("Stopping container {}.", 
container.getId().toString());
+   // release the container on the node manager
+   try {
+   
nodeManagerClient.stopContainer(container.getId(), container.getNodeId());
+   } catch (Throwable t) {
+   log.error("Error while calling YARN Node 
Manager to stop container", t);
--- End diff --

this should be a warning


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237757#comment-16237757
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148812650
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import 

[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237762#comment-16237762
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148814381
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import 

[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237759#comment-16237759
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148811956
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import 

[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237751#comment-16237751
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148811288
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import 

[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237764#comment-16237764
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148814557
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import 

[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237758#comment-16237758
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148813642
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import 

[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237745#comment-16237745
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148807573
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -63,11 +64,14 @@
  * The yarn implementation of the resource manager. Used when the system 
is started
  * via the resource framework YARN.
  */
-public class YarnResourceManager extends ResourceManager 
implements AMRMClientAsync.CallbackHandler {
+public class YarnResourceManager extends ResourceManager 
implements AMRMClientAsync.CallbackHandler {
 
/** The process environment variables. */
private final Map env;
 
+   /** YARN container map. Package private for unit test purposes. */
+   final Map workerNodeMap;
--- End diff --

Let's make it more visible that this map is a concurrent map by setting the 
type to `CocnurrentHashMap`.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237754#comment-16237754
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148811085
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import 

[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237756#comment-16237756
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148810594
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.io.Serializable;
+
+/**
+ * A stored YARN worker, which contains the YARN container.
+ */
+public class YarnWorkerNode implements ResourceIDRetrievable, Serializable 
{
+
+   private Container yarnContainer;
+
+   public YarnWorkerNode(Container container) {
+   this.yarnContainer = container;
+   }
+
+   @Override
+   public ResourceID getResourceID() {
+   return new ResourceID(yarnContainer.getId().toString());
--- End diff --

let's store the `ResourceID` explicitly.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237752#comment-16237752
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148811663
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import 

[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237753#comment-16237753
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148810078
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.io.Serializable;
+
+/**
+ * A stored YARN worker, which contains the YARN container.
+ */
+public class YarnWorkerNode implements ResourceIDRetrievable, Serializable 
{
--- End diff --

`serialVersionUID` is missing.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...

2017-11-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148814381
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static 

[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...

2017-11-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148811288
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static 

[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...

2017-11-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148811085
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static 

[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...

2017-11-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148814354
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static 

[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...

2017-11-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148810594
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.io.Serializable;
+
+/**
+ * A stored YARN worker, which contains the YARN container.
+ */
+public class YarnWorkerNode implements ResourceIDRetrievable, Serializable 
{
+
+   private Container yarnContainer;
+
+   public YarnWorkerNode(Container container) {
+   this.yarnContainer = container;
+   }
+
+   @Override
+   public ResourceID getResourceID() {
+   return new ResourceID(yarnContainer.getId().toString());
--- End diff --

let's store the `ResourceID` explicitly.


---


[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...

2017-11-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148814557
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static 

[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...

2017-11-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148812650
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static 

[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...

2017-11-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148807573
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -63,11 +64,14 @@
  * The yarn implementation of the resource manager. Used when the system 
is started
  * via the resource framework YARN.
  */
-public class YarnResourceManager extends ResourceManager 
implements AMRMClientAsync.CallbackHandler {
+public class YarnResourceManager extends ResourceManager 
implements AMRMClientAsync.CallbackHandler {
 
/** The process environment variables. */
private final Map env;
 
+   /** YARN container map. Package private for unit test purposes. */
+   final Map workerNodeMap;
--- End diff --

Let's make it more visible that this map is a concurrent map by setting the 
type to `CocnurrentHashMap`.


---


[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...

2017-11-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148810490
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.io.Serializable;
+
+/**
+ * A stored YARN worker, which contains the YARN container.
+ */
+public class YarnWorkerNode implements ResourceIDRetrievable, Serializable 
{
+
+   private Container yarnContainer;
+
+   public YarnWorkerNode(Container container) {
+   this.yarnContainer = container;
--- End diff --

`Preconditions.checkNotNull`.


---


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237744#comment-16237744
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148810409
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.io.Serializable;
+
+/**
+ * A stored YARN worker, which contains the YARN container.
+ */
+public class YarnWorkerNode implements ResourceIDRetrievable, Serializable 
{
--- End diff --

Actually this class should not be `serializable` because `Container` is 
also not serializable.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...

2017-11-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148812132
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static 

[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...

2017-11-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148810078
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.io.Serializable;
+
+/**
+ * A stored YARN worker, which contains the YARN container.
+ */
+public class YarnWorkerNode implements ResourceIDRetrievable, Serializable 
{
--- End diff --

`serialVersionUID` is missing.


---


[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...

2017-11-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148810409
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.io.Serializable;
+
+/**
+ * A stored YARN worker, which contains the YARN container.
+ */
+public class YarnWorkerNode implements ResourceIDRetrievable, Serializable 
{
--- End diff --

Actually this class should not be `serializable` because `Container` is 
also not serializable.


---


[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...

2017-11-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148811663
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static 

[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...

2017-11-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148811956
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static 

[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...

2017-11-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148814129
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static 

[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...

2017-11-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148808279
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -227,14 +241,27 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
}
 
@Override
-   public boolean stopWorker(ResourceID resourceID) {
-   // TODO: Implement to stop the worker
-   return false;
+   public boolean stopWorker(YarnWorkerNode workerNode) {
+   workerNodeMap.remove(workerNode.getResourceID().toString());
+   if (workerNode != null) {
--- End diff --

I think this should be checked first.


---


[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...

2017-11-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148813642
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static 

[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...

2017-11-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148808362
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -227,14 +241,27 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
}
 
@Override
-   public boolean stopWorker(ResourceID resourceID) {
-   // TODO: Implement to stop the worker
-   return false;
+   public boolean stopWorker(YarnWorkerNode workerNode) {
+   workerNodeMap.remove(workerNode.getResourceID().toString());
--- End diff --

Let's remove the workerNode after we have stopped it.


---


[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...

2017-11-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148809690
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -260,6 +287,7 @@ public void onContainersAllocated(List 
containers) {
numPendingContainerRequests = Math.max(0, 
numPendingContainerRequests - 1);
log.info("Received new container: {} - Remaining 
pending container requests: {}",
container.getId(), 
numPendingContainerRequests);
+   workerNodeMap.put(container.getId().toString(), new 
YarnWorkerNode(container));
--- End diff --

Here we should create a `ResourceID` to use it as the key. This 
`ResourceID` must then be send to the launched TaskManager such that it uses it 
upon registration.


---


[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...

2017-11-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148808721
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -63,11 +64,14 @@
  * The yarn implementation of the resource manager. Used when the system 
is started
  * via the resource framework YARN.
  */
-public class YarnResourceManager extends ResourceManager 
implements AMRMClientAsync.CallbackHandler {
+public class YarnResourceManager extends ResourceManager 
implements AMRMClientAsync.CallbackHandler {
 
/** The process environment variables. */
private final Map env;
 
+   /** YARN container map. Package private for unit test purposes. */
+   final Map workerNodeMap;
--- End diff --

Let's make the key `ResourceID`


---


[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...

2017-11-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148808482
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -227,14 +241,27 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
}
 
@Override
-   public boolean stopWorker(ResourceID resourceID) {
-   // TODO: Implement to stop the worker
-   return false;
+   public boolean stopWorker(YarnWorkerNode workerNode) {
+   workerNodeMap.remove(workerNode.getResourceID().toString());
+   if (workerNode != null) {
+   Container container = workerNode.getYarnContainer();
+   log.info("Stopping container {}.", 
container.getId().toString());
+   // release the container on the node manager
+   try {
+   
nodeManagerClient.stopContainer(container.getId(), container.getNodeId());
+   } catch (Throwable t) {
+   log.error("Error while calling YARN Node 
Manager to stop container", t);
--- End diff --

this should be a warning


---


[jira] [Commented] (FLINK-7706) Port JobAccumulatorsHandler to new REST endpoint

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237686#comment-16237686
 ] 

ASF GitHub Bot commented on FLINK-7706:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4898#discussion_r148802300
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobAccumulatorsHandler}.
+ */
+public class JobAccumulatorsInfo implements ResponseBody {
+   public static final String FIELD_NAME_JOB_ACCUMULATORS = 
"job-accumulators";
+   public static final String FIELD_NAME_USER_TASK_ACCUMULATORS = 
"user-task-accumulators";
+
+   @JsonProperty(FIELD_NAME_JOB_ACCUMULATORS)
+   private List jobAccumulators;
+
+   @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS)
+   private List userAccumulators;
+
+   @JsonCreator
+   public JobAccumulatorsInfo(
+   @JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) @Nullable 
List jobAccumulators,
--- End diff --

I would remove the `Nullable` and instead pass in an 
`Collections.emptyList()` if there are no job accumulators at the moment.


> Port JobAccumulatorsHandler to new REST endpoint
> 
>
> Key: FLINK-7706
> URL: https://issues.apache.org/jira/browse/FLINK-7706
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Hai Zhou UTC+8
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port existing {{JobAccumulatorsHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4898: [FLINK-7706] [flip6] Add JobAccumulatorsHandler fo...

2017-11-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4898#discussion_r148802300
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobAccumulatorsHandler}.
+ */
+public class JobAccumulatorsInfo implements ResponseBody {
+   public static final String FIELD_NAME_JOB_ACCUMULATORS = 
"job-accumulators";
+   public static final String FIELD_NAME_USER_TASK_ACCUMULATORS = 
"user-task-accumulators";
+
+   @JsonProperty(FIELD_NAME_JOB_ACCUMULATORS)
+   private List jobAccumulators;
+
+   @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS)
+   private List userAccumulators;
+
+   @JsonCreator
+   public JobAccumulatorsInfo(
+   @JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) @Nullable 
List jobAccumulators,
--- End diff --

I would remove the `Nullable` and instead pass in an 
`Collections.emptyList()` if there are no job accumulators at the moment.


---


[GitHub] flink issue #4942: [FLINK-7420] [avro] Move all Avro code to flink-avro (fol...

2017-11-03 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4942
  
Great fix for our Avro dependency @StephanEwen, @aljoscha and @twalthr. +1 
for merging.


---


[jira] [Commented] (FLINK-7420) Move all Avro code to flink-avro

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237672#comment-16237672
 ] 

ASF GitHub Bot commented on FLINK-7420:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4942
  
Great fix for our Avro dependency @StephanEwen, @aljoscha and @twalthr. +1 
for merging.


> Move all Avro code to flink-avro
> 
>
> Key: FLINK-7420
> URL: https://issues.apache.org/jira/browse/FLINK-7420
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem*
> Currently, the {{flink-avro}} project is a shell with some tests and mostly 
> duplicate and dead code. The classes that use Avro are distributed quite 
> wildly through the code base, and introduce multiple direct dependencies on 
> Avro in a messy way.
> That way, we cannot create a proper fat Avro dependency in which we shade 
> Jackson away.
> Also, we expose Avro as a direct and hard dependency on many Flink modules, 
> while it should be a dependency that users that use Avro types selectively 
> add.
> *Suggested Changes*
> We should move all Avro related classes to {{flink-avro}}, and give 
> {{flink-avro}} a dependency on {{flink-core}} and {{flink-streaming-java}}.
>   - {{AvroTypeInfo}}
>   - {{AvroSerializer}}
>   - {{AvroRowSerializationSchema}}
>   - {{AvroRowDeserializationSchema}}
> To be able to move the the avro serialization code from {{flink-ore}} to 
> {{flink-avro}}, we need to load the {{AvroTypeInformation}} reflectively, 
> similar to how we load the {{WritableTypeInfo}} for Hadoop.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7973) Fix service shading relocation for S3 file systems

2017-11-03 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-7973:
---

 Summary: Fix service shading relocation for S3 file systems
 Key: FLINK-7973
 URL: https://issues.apache.org/jira/browse/FLINK-7973
 Project: Flink
  Issue Type: Bug
Reporter: Stephan Ewen
Assignee: Stephan Ewen
Priority: Blocker
 Fix For: 1.4.0


The shade plugin relocates services incorrectly currently, applying relocation 
patterns multiple times.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7420) Move all Avro code to flink-avro

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237632#comment-16237632
 ] 

ASF GitHub Bot commented on FLINK-7420:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4942#discussion_r148789659
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java
 ---
@@ -217,7 +217,7 @@ public void read(DataInputView in) throws IOException {
/**
 * Placeholder dummy for a previously registered class that can no 
longer be found in classpath on restore.
 */
-   public static class DummyRegisteredClass {}
+   public static class DummyRegisteredClass implements Serializable {}
--- End diff --

`serialVersionUID` is missing.


> Move all Avro code to flink-avro
> 
>
> Key: FLINK-7420
> URL: https://issues.apache.org/jira/browse/FLINK-7420
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem*
> Currently, the {{flink-avro}} project is a shell with some tests and mostly 
> duplicate and dead code. The classes that use Avro are distributed quite 
> wildly through the code base, and introduce multiple direct dependencies on 
> Avro in a messy way.
> That way, we cannot create a proper fat Avro dependency in which we shade 
> Jackson away.
> Also, we expose Avro as a direct and hard dependency on many Flink modules, 
> while it should be a dependency that users that use Avro types selectively 
> add.
> *Suggested Changes*
> We should move all Avro related classes to {{flink-avro}}, and give 
> {{flink-avro}} a dependency on {{flink-core}} and {{flink-streaming-java}}.
>   - {{AvroTypeInfo}}
>   - {{AvroSerializer}}
>   - {{AvroRowSerializationSchema}}
>   - {{AvroRowDeserializationSchema}}
> To be able to move the the avro serialization code from {{flink-ore}} to 
> {{flink-avro}}, we need to load the {{AvroTypeInformation}} reflectively, 
> similar to how we load the {{WritableTypeInfo}} for Hadoop.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4942: [FLINK-7420] [avro] Move all Avro code to flink-av...

2017-11-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4942#discussion_r148789659
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java
 ---
@@ -217,7 +217,7 @@ public void read(DataInputView in) throws IOException {
/**
 * Placeholder dummy for a previously registered class that can no 
longer be found in classpath on restore.
 */
-   public static class DummyRegisteredClass {}
+   public static class DummyRegisteredClass implements Serializable {}
--- End diff --

`serialVersionUID` is missing.


---


[jira] [Comment Edited] (FLINK-7880) flink-queryable-state-java fails with core-dump

2017-11-03 Thread Gary Yao (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237616#comment-16237616
 ] 

Gary Yao edited comment on FLINK-7880 at 11/3/17 1:55 PM:
--

I have further isolated the problem. The problem still appears even if you are 
running a single test method. I tried 
{{NonHAQueryableStateRocksDBBackendITCase#testValueState}} and even replaced 
the state query with a {{Thread.sleep(400)}}. My changes to the code are 
documented here: 
https://github.com/apache/flink/compare/master...GJL:FLINK-7880?expand=1

To run the tests, I use the command below. Note that multiple iterations may be 
required. Hence, the {{while}} loop.
{noformat}
while mvn -o clean verify -Dtest=NonHAQueryableStateRocksDBBackendITCase 
-DfailIfNoTests=false -Dcheckstyle.skip; do :; done
{noformat}

The failure, I get is the same as mentioned by [~kkl0u]:
{noformat}
libc++abi.dylib: terminating with uncaught exception of type 
std::__1::system_error: mutex lock failed: Invalid argument
{noformat}




was (Author: gjy):
I have further isolated the problem. The problem still appears even if you are 
running a single test method. I tried 
{{NonHAQueryableStateRocksDBBackendITCase#testValueState}} and even replaced 
the state query with a {{Thread.sleep(400)}}. My changes to the code are 
documented here: 
https://github.com/apache/flink/compare/master...GJL:FLINK-7880?expand=1

To run the tests, I use the command below. Note that multiple iterations may be 
required. Hence, the {{while}} loop.
{noformat}
while mvn -o clean verify -Dtest=NonHAQueryableStateRocksDBBackendITCase 
-DfailIfNoTests=false -Dcheckstyle.skip; do :; done
{noformat}





> flink-queryable-state-java fails with core-dump
> ---
>
> Key: FLINK-7880
> URL: https://issues.apache.org/jira/browse/FLINK-7880
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, Tests
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> The {{flink-queryable-state-java}} module fails on Travis with a core dump.
> https://travis-ci.org/tillrohrmann/flink/jobs/289949829



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7880) flink-queryable-state-java fails with core-dump

2017-11-03 Thread Gary Yao (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237616#comment-16237616
 ] 

Gary Yao commented on FLINK-7880:
-

I have further isolated the problem. The problem still appears even if you are 
running a single test method. I tried 
{{NonHAQueryableStateRocksDBBackendITCase#testValueState}} and even replaced 
the state query with a {{Thread.sleep(400)}}. My changes to the code are 
documented here: 
https://github.com/apache/flink/compare/master...GJL:FLINK-7880?expand=1

To run the tests, I use the command below. Note that multiple iterations may be 
required. Hence, the {{while}} loop.
{noformat}
while mvn -o clean verify -Dtest=NonHAQueryableStateRocksDBBackendITCase 
-DfailIfNoTests=false -Dcheckstyle.skip; do :; done
{noformat}





> flink-queryable-state-java fails with core-dump
> ---
>
> Key: FLINK-7880
> URL: https://issues.apache.org/jira/browse/FLINK-7880
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, Tests
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> The {{flink-queryable-state-java}} module fails on Travis with a core dump.
> https://travis-ci.org/tillrohrmann/flink/jobs/289949829



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7880) flink-queryable-state-java fails with core-dump

2017-11-03 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237584#comment-16237584
 ] 

Kostas Kloudas edited comment on FLINK-7880 at 11/3/17 1:38 PM:


I will keep on investigating *BUT* I commented out the {{queryable state}} code 
from the tests and they still seem to fail even locally with:

{code}
libc++abi.dylib: terminating with uncaught exception of type 
std::__1::system_error: mutex lock failed: Invalid argument
/bin/sh: line 1: 10553 Abort trap: 6   
/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/bin/java 
-Xms256m -Xmx2048m -Dmvn.forkNumber=1 -XX:+UseSerialGC -jar 
/Users/kkloudas/repos/dataartisans/flink/flink-queryable-state/flink-queryable-state-runtime/target/surefire/surefirebooter6530581495710923655.jar
 
/Users/kkloudas/repos/dataartisans/flink/flink-queryable-state/flink-queryable-state-runtime/target/surefire/surefire7293759706604721607tmp
 
/Users/kkloudas/repos/dataartisans/flink/flink-queryable-state/flink-queryable-state-runtime/target/surefire/surefire_87379374358455308985tmp
{code}

You can find my code here https://github.com/kl0u/flink/tree/more-than-qs and 
to reproduce, go to the {{flink-queryable-state}} dir and run a couple of times 
{{mvn verify}}. At least on my machine this reproduces the problem.

So the root problem may not be in the Queryable State but in the sequence of 
steps taken when shutting down the RocksDB state backend.


was (Author: kkl0u):
I will keep on investigating *BUT* I commented out the {{queryable state}} code 
from the tests and they still seem to fail even locally with:

{code}
libc++abi.dylib: terminating with uncaught exception of type 
std::__1::system_error: mutex lock failed: Invalid argument
/bin/sh: line 1: 10553 Abort trap: 6   
/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/bin/java 
-Xms256m -Xmx2048m -Dmvn.forkNumber=1 -XX:+UseSerialGC -jar 
/Users/kkloudas/repos/dataartisans/flink/flink-queryable-state/flink-queryable-state-runtime/target/surefire/surefirebooter6530581495710923655.jar
 
/Users/kkloudas/repos/dataartisans/flink/flink-queryable-state/flink-queryable-state-runtime/target/surefire/surefire7293759706604721607tmp
 
/Users/kkloudas/repos/dataartisans/flink/flink-queryable-state/flink-queryable-state-runtime/target/surefire/surefire_87379374358455308985tmp
{code}

You can find my code here https://github.com/kl0u/flink/tree/more-than-qs and 
to reproduce, go to the {{flink-queryable-state}} dir and run a couple of times 
{{mvn verify}}. At least on my machine this reproduces the problem.

> flink-queryable-state-java fails with core-dump
> ---
>
> Key: FLINK-7880
> URL: https://issues.apache.org/jira/browse/FLINK-7880
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, Tests
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> The {{flink-queryable-state-java}} module fails on Travis with a core dump.
> https://travis-ci.org/tillrohrmann/flink/jobs/289949829



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7880) flink-queryable-state-java fails with core-dump

2017-11-03 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237584#comment-16237584
 ] 

Kostas Kloudas commented on FLINK-7880:
---

I will keep on investigating *BUT* I commented out the {{queryable state}} code 
from the tests and they still seem to fail even locally with:

{code}
libc++abi.dylib: terminating with uncaught exception of type 
std::__1::system_error: mutex lock failed: Invalid argument
/bin/sh: line 1: 10553 Abort trap: 6   
/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/bin/java 
-Xms256m -Xmx2048m -Dmvn.forkNumber=1 -XX:+UseSerialGC -jar 
/Users/kkloudas/repos/dataartisans/flink/flink-queryable-state/flink-queryable-state-runtime/target/surefire/surefirebooter6530581495710923655.jar
 
/Users/kkloudas/repos/dataartisans/flink/flink-queryable-state/flink-queryable-state-runtime/target/surefire/surefire7293759706604721607tmp
 
/Users/kkloudas/repos/dataartisans/flink/flink-queryable-state/flink-queryable-state-runtime/target/surefire/surefire_87379374358455308985tmp
{code}

You can find my code here https://github.com/kl0u/flink/tree/more-than-qs and 
to reproduce, go to the {{flink-queryable-state}} dir and run a couple of times 
{{mvn verify}}. At least on my machine this reproduces the problem.

> flink-queryable-state-java fails with core-dump
> ---
>
> Key: FLINK-7880
> URL: https://issues.apache.org/jira/browse/FLINK-7880
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, Tests
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> The {{flink-queryable-state-java}} module fails on Travis with a core dump.
> https://travis-ci.org/tillrohrmann/flink/jobs/289949829



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7420) Move all Avro code to flink-avro

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237579#comment-16237579
 ] 

ASF GitHub Bot commented on FLINK-7420:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4942
  
Personally, I like this Jenkins fellow more...  


> Move all Avro code to flink-avro
> 
>
> Key: FLINK-7420
> URL: https://issues.apache.org/jira/browse/FLINK-7420
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem*
> Currently, the {{flink-avro}} project is a shell with some tests and mostly 
> duplicate and dead code. The classes that use Avro are distributed quite 
> wildly through the code base, and introduce multiple direct dependencies on 
> Avro in a messy way.
> That way, we cannot create a proper fat Avro dependency in which we shade 
> Jackson away.
> Also, we expose Avro as a direct and hard dependency on many Flink modules, 
> while it should be a dependency that users that use Avro types selectively 
> add.
> *Suggested Changes*
> We should move all Avro related classes to {{flink-avro}}, and give 
> {{flink-avro}} a dependency on {{flink-core}} and {{flink-streaming-java}}.
>   - {{AvroTypeInfo}}
>   - {{AvroSerializer}}
>   - {{AvroRowSerializationSchema}}
>   - {{AvroRowDeserializationSchema}}
> To be able to move the the avro serialization code from {{flink-ore}} to 
> {{flink-avro}}, we need to load the {{AvroTypeInformation}} reflectively, 
> similar to how we load the {{WritableTypeInfo}} for Hadoop.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4942: [FLINK-7420] [avro] Move all Avro code to flink-avro (fol...

2017-11-03 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4942
  
Personally, I like this Jenkins fellow more... 😉 


---


[jira] [Commented] (FLINK-7420) Move all Avro code to flink-avro

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237564#comment-16237564
 ] 

ASF GitHub Bot commented on FLINK-7420:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4942
  
Lets see if Travis agrees. That guy has opinions...


> Move all Avro code to flink-avro
> 
>
> Key: FLINK-7420
> URL: https://issues.apache.org/jira/browse/FLINK-7420
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem*
> Currently, the {{flink-avro}} project is a shell with some tests and mostly 
> duplicate and dead code. The classes that use Avro are distributed quite 
> wildly through the code base, and introduce multiple direct dependencies on 
> Avro in a messy way.
> That way, we cannot create a proper fat Avro dependency in which we shade 
> Jackson away.
> Also, we expose Avro as a direct and hard dependency on many Flink modules, 
> while it should be a dependency that users that use Avro types selectively 
> add.
> *Suggested Changes*
> We should move all Avro related classes to {{flink-avro}}, and give 
> {{flink-avro}} a dependency on {{flink-core}} and {{flink-streaming-java}}.
>   - {{AvroTypeInfo}}
>   - {{AvroSerializer}}
>   - {{AvroRowSerializationSchema}}
>   - {{AvroRowDeserializationSchema}}
> To be able to move the the avro serialization code from {{flink-ore}} to 
> {{flink-avro}}, we need to load the {{AvroTypeInformation}} reflectively, 
> similar to how we load the {{WritableTypeInfo}} for Hadoop.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4942: [FLINK-7420] [avro] Move all Avro code to flink-avro (fol...

2017-11-03 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4942
  
Lets see if Travis agrees. That guy has opinions...


---


[jira] [Commented] (FLINK-7420) Move all Avro code to flink-avro

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237557#comment-16237557
 ] 

ASF GitHub Bot commented on FLINK-7420:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4942
  
The follow-up changes look good! (it's very easy to review since it's 
clearly separated. )

If you reviewed Timo's and my changes I would say this is good to go.


> Move all Avro code to flink-avro
> 
>
> Key: FLINK-7420
> URL: https://issues.apache.org/jira/browse/FLINK-7420
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem*
> Currently, the {{flink-avro}} project is a shell with some tests and mostly 
> duplicate and dead code. The classes that use Avro are distributed quite 
> wildly through the code base, and introduce multiple direct dependencies on 
> Avro in a messy way.
> That way, we cannot create a proper fat Avro dependency in which we shade 
> Jackson away.
> Also, we expose Avro as a direct and hard dependency on many Flink modules, 
> while it should be a dependency that users that use Avro types selectively 
> add.
> *Suggested Changes*
> We should move all Avro related classes to {{flink-avro}}, and give 
> {{flink-avro}} a dependency on {{flink-core}} and {{flink-streaming-java}}.
>   - {{AvroTypeInfo}}
>   - {{AvroSerializer}}
>   - {{AvroRowSerializationSchema}}
>   - {{AvroRowDeserializationSchema}}
> To be able to move the the avro serialization code from {{flink-ore}} to 
> {{flink-avro}}, we need to load the {{AvroTypeInformation}} reflectively, 
> similar to how we load the {{WritableTypeInfo}} for Hadoop.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4942: [FLINK-7420] [avro] Move all Avro code to flink-avro (fol...

2017-11-03 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4942
  
The follow-up changes look good! (it's very easy to review since it's 
clearly separated. 😃)

If you reviewed Timo's and my changes I would say this is good to go.


---


[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237468#comment-16237468
 ] 

ASF GitHub Bot commented on FLINK-4228:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r148756933
  
--- Diff: flink-filesystems/flink-s3-fs-hadoop/pom.xml ---
@@ -182,6 +182,21 @@ under the License.
${project.version}
test

+   
--- End diff --

Yes this is about the recursive upload which needs to be tested once with 
hdfs and once more with s3.

Sure we could flip the dependency and let the tests in the `yarn` 
sub-project depend on `flink-s3-fs-hadoop` (and I don't mind which depends on 
which, actually) but wouldn't this be just the same but in reverse?


> YARN artifact upload does not work with S3AFileSystem
> -
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The issue now is exclusive to running on YARN with s3a:// as your configured 
> FileSystem. If so, the Flink session will fail on staging itself because it 
> tries to copy the flink/lib directory to S3 and the S3aFileSystem does not 
> support recursive copy.
> h2. Old Issue
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputStream.java:195)
>   at java.io.FileInputStream.(FileInputStream.java:138)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294)
>   ... 9 more
> {code}
> Running with S3NFileSystem, the error does not occur. The problem might be 
> due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created 
> automatically. We might need to manually create folders and copy only actual 
> files for {{S3AFileSystem}}. More investigation is required.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   >