[GitHub] flink issue #4935: [Flink-7945][Metrics]Fix per partition-lag metr...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/4935 update the code according to the comment. ping @tzulitai ---
[jira] [Commented] (FLINK-7977) bump version of compatibility check for Flink 1.4
[ https://issues.apache.org/jira/browse/FLINK-7977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238759#comment-16238759 ] ASF GitHub Bot commented on FLINK-7977: --- GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/4945 [FLINK-7977][build] bump version of compatibility check for Flink 1.4 ## What is the purpose of the change Since Flink maintains backward compatibility check for 2 versions, Flink 1.4 should check compatibility with 1.2 ## Brief change log bump compatible version from 1.1 to 1.2 ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: none ## Documentation none You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-7977 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4945.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4945 commit 4a0cbaac403d7df895ec04fffaa2ecf3b7f9d309 Author: Bowen LiDate: 2017-11-04T04:08:24Z [FLINK-7977] bump version of compatibility check for Flink 1.4 > bump version of compatibility check for Flink 1.4 > - > > Key: FLINK-7977 > URL: https://issues.apache.org/jira/browse/FLINK-7977 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Minor > Fix For: 1.4.0 > > > Since Flink maintains backward compatibility check for 2 versions, Flink 1.4 > should check compatibility with 1.2 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4945: [FLINK-7977][build] bump version of compatibility ...
GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/4945 [FLINK-7977][build] bump version of compatibility check for Flink 1.4 ## What is the purpose of the change Since Flink maintains backward compatibility check for 2 versions, Flink 1.4 should check compatibility with 1.2 ## Brief change log bump compatible version from 1.1 to 1.2 ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: none ## Documentation none You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-7977 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4945.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4945 commit 4a0cbaac403d7df895ec04fffaa2ecf3b7f9d309 Author: Bowen LiDate: 2017-11-04T04:08:24Z [FLINK-7977] bump version of compatibility check for Flink 1.4 ---
[jira] [Updated] (FLINK-7977) bump version of compatibility check for Flink 1.4
[ https://issues.apache.org/jira/browse/FLINK-7977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-7977: Component/s: Build System > bump version of compatibility check for Flink 1.4 > - > > Key: FLINK-7977 > URL: https://issues.apache.org/jira/browse/FLINK-7977 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Minor > Fix For: 1.4.0 > > > Since Flink maintains backward compatibility check for 2 versions, Flink 1.4 > should check compatibility with 1.2 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7977) bump version of compatibility check for Flink 1.4
Bowen Li created FLINK-7977: --- Summary: bump version of compatibility check for Flink 1.4 Key: FLINK-7977 URL: https://issues.apache.org/jira/browse/FLINK-7977 Project: Flink Issue Type: Improvement Affects Versions: 1.4.0 Reporter: Bowen Li Assignee: Bowen Li Priority: Minor Fix For: 1.4.0 Since Flink maintains backward compatibility check for 2 versions, Flink 1.4 should check compatibility with 1.2 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7976) bump japicmp-maven-plugin version in Flink
[ https://issues.apache.org/jira/browse/FLINK-7976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-7976: Fix Version/s: 1.5.0 > bump japicmp-maven-plugin version in Flink > -- > > Key: FLINK-7976 > URL: https://issues.apache.org/jira/browse/FLINK-7976 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Minor > Fix For: 1.5.0 > > > bump japicmp-maven-plugin version from 0.7.0 to 0.11.0 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7976) bump japicmp-maven-plugin version in Flink
[ https://issues.apache.org/jira/browse/FLINK-7976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-7976: Affects Version/s: 1.4.0 > bump japicmp-maven-plugin version in Flink > -- > > Key: FLINK-7976 > URL: https://issues.apache.org/jira/browse/FLINK-7976 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Minor > > bump japicmp-maven-plugin version from 0.7.0 to 0.11.0 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7976) bump japicmp-maven-plugin version in Flink
Bowen Li created FLINK-7976: --- Summary: bump japicmp-maven-plugin version in Flink Key: FLINK-7976 URL: https://issues.apache.org/jira/browse/FLINK-7976 Project: Flink Issue Type: Improvement Reporter: Bowen Li Assignee: Bowen Li Priority: Minor bump japicmp-maven-plugin version from 0.7.0 to 0.11.0 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4944: [hotfix] add space bewteen error message lines
GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/4944 [hotfix] add space bewteen error message lines ## What is the purpose of the change add space bewteen error message lines ## Brief change log add space bewteen error message lines ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: none ## Documentation none You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink hotfix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4944.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4944 commit be9b95a1b528a02aa46e0a4c90b42718b6bf6035 Author: Bowen LiDate: 2017-11-03T22:46:02Z [hotfix] add space bewteen error message lines ---
[jira] [Assigned] (FLINK-7717) Port TaskManagerMetricsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li reassigned FLINK-7717: --- Assignee: (was: Bowen Li) > Port TaskManagerMetricsHandler to new REST endpoint > --- > > Key: FLINK-7717 > URL: https://issues.apache.org/jira/browse/FLINK-7717 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{TaskManagerMetricsHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7716) Port JobManagerMetricsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li reassigned FLINK-7716: --- Assignee: (was: Bowen Li) > Port JobManagerMetricsHandler to new REST endpoint > -- > > Key: FLINK-7716 > URL: https://issues.apache.org/jira/browse/FLINK-7716 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JobManagerMetricsHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7718) Port JobVertixMetricsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li reassigned FLINK-7718: --- Assignee: (was: Bowen Li) > Port JobVertixMetricsHandler to new REST endpoint > - > > Key: FLINK-7718 > URL: https://issues.apache.org/jira/browse/FLINK-7718 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JobVertixMetricsHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7949) AsyncWaitOperator is not restarting when queue is full
[ https://issues.apache.org/jira/browse/FLINK-7949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238298#comment-16238298 ] ASF GitHub Bot commented on FLINK-7949: --- Github user bartektartanus commented on the issue: https://github.com/apache/flink/pull/4924 Yes, it fixes our issue - now our test in [nussknacker](https://github.com/TouK/nussknacker) is passing. Process restarts seamlessly and works fine even after another restarts, but I haven't managed to reproduce this error in simple unit Flink test (yet). Maybe next week :) > AsyncWaitOperator is not restarting when queue is full > -- > > Key: FLINK-7949 > URL: https://issues.apache.org/jira/browse/FLINK-7949 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.2 >Reporter: Bartłomiej Tartanus >Priority: Major > Original Estimate: 0.25h > Remaining Estimate: 0.25h > > Issue was describe here: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-was-declined-tasks-not-ready-td16066.html > Issue - AsyncWaitOperator can't restart properly after failure (thread is > waiting forever) > Scenario to reproduce this issue: > 1. The queue is full (let's assume that its capacity is N elements) > 2. There is some pending element waiting, so the > pendingStreamElementQueueEntry field in AsyncWaitOperator is not null and > while-loop in addAsyncBufferEntry method is trying to add this element to > the queue (but element is not added because queue is full) > 3. Now the snapshot is taken - the whole queue of N elements is being > written into the ListState in snapshotState method and also (what is more > important) this pendingStreamElementQueueEntry is written to this list too. > 4. The process is being restarted, so it tries to recover all the elements > and put them again into the queue, but the list of recovered elements hold > N+1 element and our queue capacity is only N. Process is not started yet, so > it can not process any element and this one element is waiting endlessly. > But it's never added and the process will never process anything. Deadlock. > 5. Trigger is fired and indeed discarded because the process is not running > yet. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4924: [FLINK-7949] AsyncWaitOperator is not restarting when que...
Github user bartektartanus commented on the issue: https://github.com/apache/flink/pull/4924 Yes, it fixes our issue - now our test in [nussknacker](https://github.com/TouK/nussknacker) is passing. Process restarts seamlessly and works fine even after another restarts, but I haven't managed to reproduce this error in simple unit Flink test (yet). Maybe next week :) ---
[jira] [Closed] (FLINK-7973) Fix service shading relocation for S3 file systems
[ https://issues.apache.org/jira/browse/FLINK-7973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-7973. --- > Fix service shading relocation for S3 file systems > -- > > Key: FLINK-7973 > URL: https://issues.apache.org/jira/browse/FLINK-7973 > Project: Flink > Issue Type: Bug >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.4.0 > > > The shade plugin relocates services incorrectly currently, applying > relocation patterns multiple times. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-7973) Fix service shading relocation for S3 file systems
[ https://issues.apache.org/jira/browse/FLINK-7973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-7973. - Fixed via 84d0677f98f81d50411957103be82993e40f587a > Fix service shading relocation for S3 file systems > -- > > Key: FLINK-7973 > URL: https://issues.apache.org/jira/browse/FLINK-7973 > Project: Flink > Issue Type: Bug >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.4.0 > > > The shade plugin relocates services incorrectly currently, applying > relocation patterns multiple times. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4943: [FLINK-6022] [avro] Use Avro to serialize Avro in ...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/4943 [FLINK-6022] [avro] Use Avro to serialize Avro in flight and in State ## What is the purpose of the change This changes Avro types to be serialized with a proper Avro serializer. The Avro serializer efficiently handles both Specific Records (generated by Avro) and Avro-reflection-based serialization. In order to maintain backwards compatibility, Avro type info generates actually a wrapping serializer that falls back to a Pojo (or Kryo) serializer when being reconfigured from an old snapshot. ## Brief change log - Adds a proper Avro type serializers - Adds a backwards-compatible Avro serializer that falls back to Pojo/Kryo on old snapshots - Adds a bunch of test ## Verifying this change - Using Avro specific record types in the program and enjoying nice performant execution ;-) - Using Avro for Flink state and getting it serialized via Avro, allowing a schema upgrade of state - Running the added unit tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no)** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no)** - The serializers: **(yes** / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no)** - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink use_proper_avro Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4943.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4943 commit dd05a3bf3471702ac8c9129d2d80f2feeca0f949 Author: Stephan EwenDate: 2017-11-03T13:47:33Z [FLINK-6022] [avro] Use Avro to serialize Avro in flight and in State This falls back to the original serializer (Pojo / Kryo) in cases where an old snapshot is resumed. ---
[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238287#comment-16238287 ] ASF GitHub Bot commented on FLINK-6022: --- GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/4943 [FLINK-6022] [avro] Use Avro to serialize Avro in flight and in State ## What is the purpose of the change This changes Avro types to be serialized with a proper Avro serializer. The Avro serializer efficiently handles both Specific Records (generated by Avro) and Avro-reflection-based serialization. In order to maintain backwards compatibility, Avro type info generates actually a wrapping serializer that falls back to a Pojo (or Kryo) serializer when being reconfigured from an old snapshot. ## Brief change log - Adds a proper Avro type serializers - Adds a backwards-compatible Avro serializer that falls back to Pojo/Kryo on old snapshots - Adds a bunch of test ## Verifying this change - Using Avro specific record types in the program and enjoying nice performant execution ;-) - Using Avro for Flink state and getting it serialized via Avro, allowing a schema upgrade of state - Running the added unit tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no)** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no)** - The serializers: **(yes** / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no)** - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink use_proper_avro Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4943.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4943 commit dd05a3bf3471702ac8c9129d2d80f2feeca0f949 Author: Stephan EwenDate: 2017-11-03T13:47:33Z [FLINK-6022] [avro] Use Avro to serialize Avro in flight and in State This falls back to the original serializer (Pojo / Kryo) in cases where an old snapshot is resumed. > Improve support for Avro GenericRecord > -- > > Key: FLINK-6022 > URL: https://issues.apache.org/jira/browse/FLINK-6022 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Robert Metzger >Priority: Major > > Currently, Flink is serializing the schema for each Avro GenericRecord in the > stream. > This leads to a lot of overhead over the wire/disk + high serialization costs. > Therefore, I'm proposing to improve the support for GenericRecord in Flink by > shipping the schema to each serializer through the AvroTypeInformation. > Then, we can only support GenericRecords with the same type per stream, but > the performance will be much better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238114#comment-16238114 ] ASF GitHub Bot commented on FLINK-7856: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4893#discussion_r148857309 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java --- @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler for the job vertex back pressure. + */ +public class JobVertexBackPressureHandler extends AbstractExecutionGraphHandler{ + /** Back pressure stats tracker. */ + private final BackPressureStatsTracker backPressureStatsTracker; + + /** Time after which stats are considered outdated. */ + private final int refreshInterval; + + public JobVertexBackPressureHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + MessageHeaders messageHeaders, + ExecutionGraphCache executionGraphCache, + Executor executor, + Configuration clusterConfiguration) { + super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor); + + // Back pressure stats tracker config + this.refreshInterval = clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL); + this.backPressureStatsTracker = new BackPressureStatsTracker( + new StackTraceSampleCoordinator(executor, 6), + clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL), + clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES), + Time.milliseconds(clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_DELAY))); + } + + @Override + protected
[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4893#discussion_r148857309 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java --- @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler for the job vertex back pressure. + */ +public class JobVertexBackPressureHandler extends AbstractExecutionGraphHandler{ + /** Back pressure stats tracker. */ + private final BackPressureStatsTracker backPressureStatsTracker; + + /** Time after which stats are considered outdated. */ + private final int refreshInterval; + + public JobVertexBackPressureHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + MessageHeaders messageHeaders, + ExecutionGraphCache executionGraphCache, + Executor executor, + Configuration clusterConfiguration) { + super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor); + + // Back pressure stats tracker config + this.refreshInterval = clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL); + this.backPressureStatsTracker = new BackPressureStatsTracker( + new StackTraceSampleCoordinator(executor, 6), + clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL), + clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES), + Time.milliseconds(clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_DELAY))); + } + + @Override + protected JobVertexBackPressureInfo handleRequest(HandlerRequest request, AccessExecutionGraph executionGraph) throws RestHandlerException { + JobVertexID jobVertexID =
[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4893#discussion_r148857541 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureHeaders.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Message headers for the {@link JobVertexBackPressureHandler}. + */ +public class JobVertexBackPressureHeaders implements MessageHeaders{ + + private static final JobVertexBackPressureHeaders INSTANCE = new JobVertexBackPressureHeaders(); + + private static final String URL = "/jobs/:jobid/vertices/:vertexid/backpressure"; --- End diff -- Instead of writing `/jobs/:jobid` we could write `/jobs/: + JobIDParameter.KEY`. ---
[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238115#comment-16238115 ] ASF GitHub Bot commented on FLINK-7856: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4893#discussion_r148857541 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureHeaders.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Message headers for the {@link JobVertexBackPressureHandler}. + */ +public class JobVertexBackPressureHeaders implements MessageHeaders{ + + private static final JobVertexBackPressureHeaders INSTANCE = new JobVertexBackPressureHeaders(); + + private static final String URL = "/jobs/:jobid/vertices/:vertexid/backpressure"; --- End diff -- Instead of writing `/jobs/:jobid` we could write `/jobs/: + JobIDParameter.KEY`. > Port JobVertexBackPressureHandler to REST endpoint > -- > > Key: FLINK-7856 > URL: https://issues.apache.org/jira/browse/FLINK-7856 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > Port JobVertexBackPressureHandler to REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7949) AsyncWaitOperator is not restarting when queue is full
[ https://issues.apache.org/jira/browse/FLINK-7949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238103#comment-16238103 ] ASF GitHub Bot commented on FLINK-7949: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4924 Thanks for the fix @bartektartanus. LGTM. Did you verify that this actually fixes the problem? Would be great if we could also add a unit test to guard against future regressions. > AsyncWaitOperator is not restarting when queue is full > -- > > Key: FLINK-7949 > URL: https://issues.apache.org/jira/browse/FLINK-7949 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.2 >Reporter: Bartłomiej Tartanus >Priority: Major > Original Estimate: 0.25h > Remaining Estimate: 0.25h > > Issue was describe here: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-was-declined-tasks-not-ready-td16066.html > Issue - AsyncWaitOperator can't restart properly after failure (thread is > waiting forever) > Scenario to reproduce this issue: > 1. The queue is full (let's assume that its capacity is N elements) > 2. There is some pending element waiting, so the > pendingStreamElementQueueEntry field in AsyncWaitOperator is not null and > while-loop in addAsyncBufferEntry method is trying to add this element to > the queue (but element is not added because queue is full) > 3. Now the snapshot is taken - the whole queue of N elements is being > written into the ListState in snapshotState method and also (what is more > important) this pendingStreamElementQueueEntry is written to this list too. > 4. The process is being restarted, so it tries to recover all the elements > and put them again into the queue, but the list of recovered elements hold > N+1 element and our queue capacity is only N. Process is not started yet, so > it can not process any element and this one element is waiting endlessly. > But it's never added and the process will never process anything. Deadlock. > 5. Trigger is fired and indeed discarded because the process is not running > yet. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4924: [FLINK-7949] AsyncWaitOperator is not restarting when que...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4924 Thanks for the fix @bartektartanus. LGTM. Did you verify that this actually fixes the problem? Would be great if we could also add a unit test to guard against future regressions. ---
[jira] [Resolved] (FLINK-7847) Fix typo in flink-avro shading pattern
[ https://issues.apache.org/jira/browse/FLINK-7847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-7847. - Resolution: Fixed Fixed via b4dead96a16c8772ccf86b533f5de6feb0f3d1f6 > Fix typo in flink-avro shading pattern > -- > > Key: FLINK-7847 > URL: https://issues.apache.org/jira/browse/FLINK-7847 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.4.0 > > > {code} > > org.codehaus.jackson > > org.apache.flink.avro.shaded.org.codehouse.jackson > > {code} > The shaded pattern should be > "org.apache.flink.avro.shaded.org.codehaus.jackson". -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7847) Fix typo in flink-avro shading pattern
[ https://issues.apache.org/jira/browse/FLINK-7847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-7847. --- > Fix typo in flink-avro shading pattern > -- > > Key: FLINK-7847 > URL: https://issues.apache.org/jira/browse/FLINK-7847 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.4.0 > > > {code} > > org.codehaus.jackson > > org.apache.flink.avro.shaded.org.codehouse.jackson > > {code} > The shaded pattern should be > "org.apache.flink.avro.shaded.org.codehaus.jackson". -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7420) Move all Avro code to flink-avro
[ https://issues.apache.org/jira/browse/FLINK-7420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-7420. --- > Move all Avro code to flink-avro > > > Key: FLINK-7420 > URL: https://issues.apache.org/jira/browse/FLINK-7420 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0 > > > *Problem* > Currently, the {{flink-avro}} project is a shell with some tests and mostly > duplicate and dead code. The classes that use Avro are distributed quite > wildly through the code base, and introduce multiple direct dependencies on > Avro in a messy way. > That way, we cannot create a proper fat Avro dependency in which we shade > Jackson away. > Also, we expose Avro as a direct and hard dependency on many Flink modules, > while it should be a dependency that users that use Avro types selectively > add. > *Suggested Changes* > We should move all Avro related classes to {{flink-avro}}, and give > {{flink-avro}} a dependency on {{flink-core}} and {{flink-streaming-java}}. > - {{AvroTypeInfo}} > - {{AvroSerializer}} > - {{AvroRowSerializationSchema}} > - {{AvroRowDeserializationSchema}} > To be able to move the the avro serialization code from {{flink-ore}} to > {{flink-avro}}, we need to load the {{AvroTypeInformation}} reflectively, > similar to how we load the {{WritableTypeInfo}} for Hadoop. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-7420) Move all Avro code to flink-avro
[ https://issues.apache.org/jira/browse/FLINK-7420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-7420. - Resolution: Fixed Fixed in - 537a10ea2ff6a2d8507483c66f413f77884e77c4 - db7c70faa0b5aed652b525e8091cdc0e265362bc - 29249b2eeb9cb9910a5a55ae6c3a0b648d67d2b5 - 65e87045c48ce3200ea6690d945ed87b061808af - eb99181ddd4851d2f4a64377ebd4fe0ac11e2581 > Move all Avro code to flink-avro > > > Key: FLINK-7420 > URL: https://issues.apache.org/jira/browse/FLINK-7420 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0 > > > *Problem* > Currently, the {{flink-avro}} project is a shell with some tests and mostly > duplicate and dead code. The classes that use Avro are distributed quite > wildly through the code base, and introduce multiple direct dependencies on > Avro in a messy way. > That way, we cannot create a proper fat Avro dependency in which we shade > Jackson away. > Also, we expose Avro as a direct and hard dependency on many Flink modules, > while it should be a dependency that users that use Avro types selectively > add. > *Suggested Changes* > We should move all Avro related classes to {{flink-avro}}, and give > {{flink-avro}} a dependency on {{flink-core}} and {{flink-streaming-java}}. > - {{AvroTypeInfo}} > - {{AvroSerializer}} > - {{AvroRowSerializationSchema}} > - {{AvroRowDeserializationSchema}} > To be able to move the the avro serialization code from {{flink-ore}} to > {{flink-avro}}, we need to load the {{AvroTypeInformation}} reflectively, > similar to how we load the {{WritableTypeInfo}} for Hadoop. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7972) Move SerializationSchema to flink-core
[ https://issues.apache.org/jira/browse/FLINK-7972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-7972. --- > Move SerializationSchema to flink-core > -- > > Key: FLINK-7972 > URL: https://issues.apache.org/jira/browse/FLINK-7972 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.4.0 > > > The {{SerializationSchema}} and its related classes are currently in > {{flink-streaming-java}}. > API level projects that depend on those classes hence pull in a dependency on > runtime classes. > For example, this would be required in order to make {{flink-avro}} > independent of runtime dependencies and Scala versions, same for the future > for thrift format support, for Hbase connectors, etc. > This should not be API breaking since we can keep the classes in the same > namespace and only move them "updstream" in the dependency structure, or we > can keep classes in the original namespace that extend the moved classes in > {{flink-core}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-7972) Move SerializationSchema to flink-core
[ https://issues.apache.org/jira/browse/FLINK-7972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-7972. - Resolution: Fixed Fixed in fe931d075f031e9494fd26dbeed4bb1024bd52cf > Move SerializationSchema to flink-core > -- > > Key: FLINK-7972 > URL: https://issues.apache.org/jira/browse/FLINK-7972 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.4.0 > > > The {{SerializationSchema}} and its related classes are currently in > {{flink-streaming-java}}. > API level projects that depend on those classes hence pull in a dependency on > runtime classes. > For example, this would be required in order to make {{flink-avro}} > independent of runtime dependencies and Scala versions, same for the future > for thrift format support, for Hbase connectors, etc. > This should not be API breaking since we can keep the classes in the same > namespace and only move them "updstream" in the dependency structure, or we > can keep classes in the original namespace that extend the moved classes in > {{flink-core}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to new RES...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4734 I think @zentol is right and we can register the `CurrentJobsOverviewHandler` under `jobs/overview`. Thus, we should add this handler as well. Could you rebase this handler onto the latest master such that we can merge it? ---
[jira] [Commented] (FLINK-7652) Port CurrentJobIdsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238027#comment-16238027 ] ASF GitHub Bot commented on FLINK-7652: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4734 I think @zentol is right and we can register the `CurrentJobsOverviewHandler` under `jobs/overview`. Thus, we should add this handler as well. Could you rebase this handler onto the latest master such that we can merge it? > Port CurrentJobIdsHandler to new REST endpoint > -- > > Key: FLINK-7652 > URL: https://issues.apache.org/jira/browse/FLINK-7652 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{CurrentJobIdsHandler}} to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7420) Move all Avro code to flink-avro
[ https://issues.apache.org/jira/browse/FLINK-7420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238018#comment-16238018 ] ASF GitHub Bot commented on FLINK-7420: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4942 > Move all Avro code to flink-avro > > > Key: FLINK-7420 > URL: https://issues.apache.org/jira/browse/FLINK-7420 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0 > > > *Problem* > Currently, the {{flink-avro}} project is a shell with some tests and mostly > duplicate and dead code. The classes that use Avro are distributed quite > wildly through the code base, and introduce multiple direct dependencies on > Avro in a messy way. > That way, we cannot create a proper fat Avro dependency in which we shade > Jackson away. > Also, we expose Avro as a direct and hard dependency on many Flink modules, > while it should be a dependency that users that use Avro types selectively > add. > *Suggested Changes* > We should move all Avro related classes to {{flink-avro}}, and give > {{flink-avro}} a dependency on {{flink-core}} and {{flink-streaming-java}}. > - {{AvroTypeInfo}} > - {{AvroSerializer}} > - {{AvroRowSerializationSchema}} > - {{AvroRowDeserializationSchema}} > To be able to move the the avro serialization code from {{flink-ore}} to > {{flink-avro}}, we need to load the {{AvroTypeInformation}} reflectively, > similar to how we load the {{WritableTypeInfo}} for Hadoop. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4942: [FLINK-7420] [avro] Move all Avro code to flink-av...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4942 ---
[jira] [Commented] (FLINK-7420) Move all Avro code to flink-avro
[ https://issues.apache.org/jira/browse/FLINK-7420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238017#comment-16238017 ] ASF GitHub Bot commented on FLINK-7420: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4931 > Move all Avro code to flink-avro > > > Key: FLINK-7420 > URL: https://issues.apache.org/jira/browse/FLINK-7420 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0 > > > *Problem* > Currently, the {{flink-avro}} project is a shell with some tests and mostly > duplicate and dead code. The classes that use Avro are distributed quite > wildly through the code base, and introduce multiple direct dependencies on > Avro in a messy way. > That way, we cannot create a proper fat Avro dependency in which we shade > Jackson away. > Also, we expose Avro as a direct and hard dependency on many Flink modules, > while it should be a dependency that users that use Avro types selectively > add. > *Suggested Changes* > We should move all Avro related classes to {{flink-avro}}, and give > {{flink-avro}} a dependency on {{flink-core}} and {{flink-streaming-java}}. > - {{AvroTypeInfo}} > - {{AvroSerializer}} > - {{AvroRowSerializationSchema}} > - {{AvroRowDeserializationSchema}} > To be able to move the the avro serialization code from {{flink-ore}} to > {{flink-avro}}, we need to load the {{AvroTypeInformation}} reflectively, > similar to how we load the {{WritableTypeInfo}} for Hadoop. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4931: [FLINK-7420] Move all Avro code to flink-avro
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4931 ---
[jira] [Assigned] (FLINK-7975) Queryable state client does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas reassigned FLINK-7975: - Assignee: Kostas Kloudas > Queryable state client does not wait for shutdown completion > > > Key: FLINK-7975 > URL: https://issues.apache.org/jira/browse/FLINK-7975 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The queryable state client neither waits for its shutdown completion when > calling {{Client#shutdown}} nor does it return a termination future on which > we could wait for the termination. I think this would be good to provide. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas reassigned FLINK-7974: - Assignee: Kostas Kloudas > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7975) Queryable state client does not wait for shutdown completion
Till Rohrmann created FLINK-7975: Summary: Queryable state client does not wait for shutdown completion Key: FLINK-7975 URL: https://issues.apache.org/jira/browse/FLINK-7975 Project: Flink Issue Type: Bug Components: Queryable State Affects Versions: 1.4.0 Reporter: Till Rohrmann Priority: Critical The queryable state client neither waits for its shutdown completion when calling {{Client#shutdown}} nor does it return a termination future on which we could wait for the termination. I think this would be good to provide. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
Till Rohrmann created FLINK-7974: Summary: AbstractServerBase#shutdown does not wait for shutdown completion Key: FLINK-7974 URL: https://issues.apache.org/jira/browse/FLINK-7974 Project: Flink Issue Type: Bug Components: Queryable State Affects Versions: 1.4.0 Reporter: Till Rohrmann Priority: Critical The {{AbstractServerBase}} does not wait for the completion of its shutdown when calling {{AbstractServerBase#shutdown}}. This is problematic since it leads to resource leaks and instable tests such as the {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237763#comment-16237763 ] ASF GitHub Bot commented on FLINK-7076: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148812132 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME; +import
[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237749#comment-16237749 ] ASF GitHub Bot commented on FLINK-7076: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148808721 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -63,11 +64,14 @@ * The yarn implementation of the resource manager. Used when the system is started * via the resource framework YARN. */ -public class YarnResourceManager extends ResourceManager implements AMRMClientAsync.CallbackHandler { +public class YarnResourceManager extends ResourceManager implements AMRMClientAsync.CallbackHandler { /** The process environment variables. */ private final Mapenv; + /** YARN container map. Package private for unit test purposes. */ + final Map workerNodeMap; --- End diff -- Let's make the key `ResourceID` > Implement container release to support dynamic scaling > -- > > Key: FLINK-7076 > URL: https://issues.apache.org/jira/browse/FLINK-7076 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Shuyi Chen >Priority: Major > Labels: flip-6 > > In order to support dynamic scaling, the {{YarnResourceManager}} has to be > able to dynamically free containers. We have to implement the > {{YarnResourceManager#stopWorker}} method. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237761#comment-16237761 ] ASF GitHub Bot commented on FLINK-7076: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148810490 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; + +import org.apache.hadoop.yarn.api.records.Container; + +import java.io.Serializable; + +/** + * A stored YARN worker, which contains the YARN container. + */ +public class YarnWorkerNode implements ResourceIDRetrievable, Serializable { + + private Container yarnContainer; + + public YarnWorkerNode(Container container) { + this.yarnContainer = container; --- End diff -- `Preconditions.checkNotNull`. > Implement container release to support dynamic scaling > -- > > Key: FLINK-7076 > URL: https://issues.apache.org/jira/browse/FLINK-7076 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Shuyi Chen >Priority: Major > Labels: flip-6 > > In order to support dynamic scaling, the {{YarnResourceManager}} has to be > able to dynamically free containers. We have to implement the > {{YarnResourceManager#stopWorker}} method. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237750#comment-16237750 ] ASF GitHub Bot commented on FLINK-7076: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148808362 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -227,14 +241,27 @@ public void startNewWorker(ResourceProfile resourceProfile) { } @Override - public boolean stopWorker(ResourceID resourceID) { - // TODO: Implement to stop the worker - return false; + public boolean stopWorker(YarnWorkerNode workerNode) { + workerNodeMap.remove(workerNode.getResourceID().toString()); --- End diff -- Let's remove the workerNode after we have stopped it. > Implement container release to support dynamic scaling > -- > > Key: FLINK-7076 > URL: https://issues.apache.org/jira/browse/FLINK-7076 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Shuyi Chen >Priority: Major > Labels: flip-6 > > In order to support dynamic scaling, the {{YarnResourceManager}} has to be > able to dynamically free containers. We have to implement the > {{YarnResourceManager#stopWorker}} method. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237748#comment-16237748 ] ASF GitHub Bot commented on FLINK-7076: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148809690 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -260,6 +287,7 @@ public void onContainersAllocated(List containers) { numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1); log.info("Received new container: {} - Remaining pending container requests: {}", container.getId(), numPendingContainerRequests); + workerNodeMap.put(container.getId().toString(), new YarnWorkerNode(container)); --- End diff -- Here we should create a `ResourceID` to use it as the key. This `ResourceID` must then be send to the launched TaskManager such that it uses it upon registration. > Implement container release to support dynamic scaling > -- > > Key: FLINK-7076 > URL: https://issues.apache.org/jira/browse/FLINK-7076 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Shuyi Chen >Priority: Major > Labels: flip-6 > > In order to support dynamic scaling, the {{YarnResourceManager}} has to be > able to dynamically free containers. We have to implement the > {{YarnResourceManager#stopWorker}} method. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237747#comment-16237747 ] ASF GitHub Bot commented on FLINK-7076: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148808279 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -227,14 +241,27 @@ public void startNewWorker(ResourceProfile resourceProfile) { } @Override - public boolean stopWorker(ResourceID resourceID) { - // TODO: Implement to stop the worker - return false; + public boolean stopWorker(YarnWorkerNode workerNode) { + workerNodeMap.remove(workerNode.getResourceID().toString()); + if (workerNode != null) { --- End diff -- I think this should be checked first. > Implement container release to support dynamic scaling > -- > > Key: FLINK-7076 > URL: https://issues.apache.org/jira/browse/FLINK-7076 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Shuyi Chen >Priority: Major > Labels: flip-6 > > In order to support dynamic scaling, the {{YarnResourceManager}} has to be > able to dynamically free containers. We have to implement the > {{YarnResourceManager#stopWorker}} method. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237755#comment-16237755 ] ASF GitHub Bot commented on FLINK-7076: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148814129 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME; +import
[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237760#comment-16237760 ] ASF GitHub Bot commented on FLINK-7076: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148814354 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME; +import
[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237746#comment-16237746 ] ASF GitHub Bot commented on FLINK-7076: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148808482 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -227,14 +241,27 @@ public void startNewWorker(ResourceProfile resourceProfile) { } @Override - public boolean stopWorker(ResourceID resourceID) { - // TODO: Implement to stop the worker - return false; + public boolean stopWorker(YarnWorkerNode workerNode) { + workerNodeMap.remove(workerNode.getResourceID().toString()); + if (workerNode != null) { + Container container = workerNode.getYarnContainer(); + log.info("Stopping container {}.", container.getId().toString()); + // release the container on the node manager + try { + nodeManagerClient.stopContainer(container.getId(), container.getNodeId()); + } catch (Throwable t) { + log.error("Error while calling YARN Node Manager to stop container", t); --- End diff -- this should be a warning > Implement container release to support dynamic scaling > -- > > Key: FLINK-7076 > URL: https://issues.apache.org/jira/browse/FLINK-7076 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Shuyi Chen >Priority: Major > Labels: flip-6 > > In order to support dynamic scaling, the {{YarnResourceManager}} has to be > able to dynamically free containers. We have to implement the > {{YarnResourceManager#stopWorker}} method. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237757#comment-16237757 ] ASF GitHub Bot commented on FLINK-7076: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148812650 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME; +import
[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237762#comment-16237762 ] ASF GitHub Bot commented on FLINK-7076: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148814381 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME; +import
[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237759#comment-16237759 ] ASF GitHub Bot commented on FLINK-7076: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148811956 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME; +import
[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237751#comment-16237751 ] ASF GitHub Bot commented on FLINK-7076: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148811288 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME; +import
[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237764#comment-16237764 ] ASF GitHub Bot commented on FLINK-7076: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148814557 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME; +import
[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237758#comment-16237758 ] ASF GitHub Bot commented on FLINK-7076: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148813642 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME; +import
[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237745#comment-16237745 ] ASF GitHub Bot commented on FLINK-7076: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148807573 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -63,11 +64,14 @@ * The yarn implementation of the resource manager. Used when the system is started * via the resource framework YARN. */ -public class YarnResourceManager extends ResourceManager implements AMRMClientAsync.CallbackHandler { +public class YarnResourceManager extends ResourceManager implements AMRMClientAsync.CallbackHandler { /** The process environment variables. */ private final Mapenv; + /** YARN container map. Package private for unit test purposes. */ + final Map workerNodeMap; --- End diff -- Let's make it more visible that this map is a concurrent map by setting the type to `CocnurrentHashMap`. > Implement container release to support dynamic scaling > -- > > Key: FLINK-7076 > URL: https://issues.apache.org/jira/browse/FLINK-7076 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Shuyi Chen >Priority: Major > Labels: flip-6 > > In order to support dynamic scaling, the {{YarnResourceManager}} has to be > able to dynamically free containers. We have to implement the > {{YarnResourceManager#stopWorker}} method. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237754#comment-16237754 ] ASF GitHub Bot commented on FLINK-7076: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148811085 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME; +import
[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237756#comment-16237756 ] ASF GitHub Bot commented on FLINK-7076: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148810594 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; + +import org.apache.hadoop.yarn.api.records.Container; + +import java.io.Serializable; + +/** + * A stored YARN worker, which contains the YARN container. + */ +public class YarnWorkerNode implements ResourceIDRetrievable, Serializable { + + private Container yarnContainer; + + public YarnWorkerNode(Container container) { + this.yarnContainer = container; + } + + @Override + public ResourceID getResourceID() { + return new ResourceID(yarnContainer.getId().toString()); --- End diff -- let's store the `ResourceID` explicitly. > Implement container release to support dynamic scaling > -- > > Key: FLINK-7076 > URL: https://issues.apache.org/jira/browse/FLINK-7076 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Shuyi Chen >Priority: Major > Labels: flip-6 > > In order to support dynamic scaling, the {{YarnResourceManager}} has to be > able to dynamically free containers. We have to implement the > {{YarnResourceManager#stopWorker}} method. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237752#comment-16237752 ] ASF GitHub Bot commented on FLINK-7076: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148811663 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME; +import
[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237753#comment-16237753 ] ASF GitHub Bot commented on FLINK-7076: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148810078 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; + +import org.apache.hadoop.yarn.api.records.Container; + +import java.io.Serializable; + +/** + * A stored YARN worker, which contains the YARN container. + */ +public class YarnWorkerNode implements ResourceIDRetrievable, Serializable { --- End diff -- `serialVersionUID` is missing. > Implement container release to support dynamic scaling > -- > > Key: FLINK-7076 > URL: https://issues.apache.org/jira/browse/FLINK-7076 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Shuyi Chen >Priority: Major > Labels: flip-6 > > In order to support dynamic scaling, the {{YarnResourceManager}} has to be > able to dynamically free containers. We have to implement the > {{YarnResourceManager#stopWorker}} method. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148814381 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME; +import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static
[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148811288 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME; +import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static
[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148811085 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME; +import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static
[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148814354 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME; +import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static
[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148810594 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; + +import org.apache.hadoop.yarn.api.records.Container; + +import java.io.Serializable; + +/** + * A stored YARN worker, which contains the YARN container. + */ +public class YarnWorkerNode implements ResourceIDRetrievable, Serializable { + + private Container yarnContainer; + + public YarnWorkerNode(Container container) { + this.yarnContainer = container; + } + + @Override + public ResourceID getResourceID() { + return new ResourceID(yarnContainer.getId().toString()); --- End diff -- let's store the `ResourceID` explicitly. ---
[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148814557 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME; +import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static
[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148812650 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME; +import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static
[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148807573 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -63,11 +64,14 @@ * The yarn implementation of the resource manager. Used when the system is started * via the resource framework YARN. */ -public class YarnResourceManager extends ResourceManager implements AMRMClientAsync.CallbackHandler { +public class YarnResourceManager extends ResourceManager implements AMRMClientAsync.CallbackHandler { /** The process environment variables. */ private final Mapenv; + /** YARN container map. Package private for unit test purposes. */ + final Map workerNodeMap; --- End diff -- Let's make it more visible that this map is a concurrent map by setting the type to `CocnurrentHashMap`. ---
[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148810490 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; + +import org.apache.hadoop.yarn.api.records.Container; + +import java.io.Serializable; + +/** + * A stored YARN worker, which contains the YARN container. + */ +public class YarnWorkerNode implements ResourceIDRetrievable, Serializable { + + private Container yarnContainer; + + public YarnWorkerNode(Container container) { + this.yarnContainer = container; --- End diff -- `Preconditions.checkNotNull`. ---
[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237744#comment-16237744 ] ASF GitHub Bot commented on FLINK-7076: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148810409 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; + +import org.apache.hadoop.yarn.api.records.Container; + +import java.io.Serializable; + +/** + * A stored YARN worker, which contains the YARN container. + */ +public class YarnWorkerNode implements ResourceIDRetrievable, Serializable { --- End diff -- Actually this class should not be `serializable` because `Container` is also not serializable. > Implement container release to support dynamic scaling > -- > > Key: FLINK-7076 > URL: https://issues.apache.org/jira/browse/FLINK-7076 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Shuyi Chen >Priority: Major > Labels: flip-6 > > In order to support dynamic scaling, the {{YarnResourceManager}} has to be > able to dynamically free containers. We have to implement the > {{YarnResourceManager#stopWorker}} method. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148812132 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME; +import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static
[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148810078 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; + +import org.apache.hadoop.yarn.api.records.Container; + +import java.io.Serializable; + +/** + * A stored YARN worker, which contains the YARN container. + */ +public class YarnWorkerNode implements ResourceIDRetrievable, Serializable { --- End diff -- `serialVersionUID` is missing. ---
[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148810409 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; + +import org.apache.hadoop.yarn.api.records.Container; + +import java.io.Serializable; + +/** + * A stored YARN worker, which contains the YARN container. + */ +public class YarnWorkerNode implements ResourceIDRetrievable, Serializable { --- End diff -- Actually this class should not be `serializable` because `Container` is also not serializable. ---
[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148811663 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME; +import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static
[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148811956 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME; +import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static
[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148814129 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME; +import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static
[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148808279 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -227,14 +241,27 @@ public void startNewWorker(ResourceProfile resourceProfile) { } @Override - public boolean stopWorker(ResourceID resourceID) { - // TODO: Implement to stop the worker - return false; + public boolean stopWorker(YarnWorkerNode workerNode) { + workerNodeMap.remove(workerNode.getResourceID().toString()); + if (workerNode != null) { --- End diff -- I think this should be checked first. ---
[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148813642 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME; +import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static
[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148808362 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -227,14 +241,27 @@ public void startNewWorker(ResourceProfile resourceProfile) { } @Override - public boolean stopWorker(ResourceID resourceID) { - // TODO: Implement to stop the worker - return false; + public boolean stopWorker(YarnWorkerNode workerNode) { + workerNodeMap.remove(workerNode.getResourceID().toString()); --- End diff -- Let's remove the workerNode after we have stopped it. ---
[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148809690 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -260,6 +287,7 @@ public void onContainersAllocated(List containers) { numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1); log.info("Received new container: {} - Remaining pending container requests: {}", container.getId(), numPendingContainerRequests); + workerNodeMap.put(container.getId().toString(), new YarnWorkerNode(container)); --- End diff -- Here we should create a `ResourceID` to use it as the key. This `ResourceID` must then be send to the launched TaskManager such that it uses it upon registration. ---
[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148808721 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -63,11 +64,14 @@ * The yarn implementation of the resource manager. Used when the system is started * via the resource framework YARN. */ -public class YarnResourceManager extends ResourceManager implements AMRMClientAsync.CallbackHandler { +public class YarnResourceManager extends ResourceManager implements AMRMClientAsync.CallbackHandler { /** The process environment variables. */ private final Mapenv; + /** YARN container map. Package private for unit test purposes. */ + final Map workerNodeMap; --- End diff -- Let's make the key `ResourceID` ---
[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148808482 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -227,14 +241,27 @@ public void startNewWorker(ResourceProfile resourceProfile) { } @Override - public boolean stopWorker(ResourceID resourceID) { - // TODO: Implement to stop the worker - return false; + public boolean stopWorker(YarnWorkerNode workerNode) { + workerNodeMap.remove(workerNode.getResourceID().toString()); + if (workerNode != null) { + Container container = workerNode.getYarnContainer(); + log.info("Stopping container {}.", container.getId().toString()); + // release the container on the node manager + try { + nodeManagerClient.stopContainer(container.getId(), container.getNodeId()); + } catch (Throwable t) { + log.error("Error while calling YARN Node Manager to stop container", t); --- End diff -- this should be a warning ---
[jira] [Commented] (FLINK-7706) Port JobAccumulatorsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237686#comment-16237686 ] ASF GitHub Bot commented on FLINK-7706: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4898#discussion_r148802300 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java --- @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Objects; + +/** + * Response type of the {@link JobAccumulatorsHandler}. + */ +public class JobAccumulatorsInfo implements ResponseBody { + public static final String FIELD_NAME_JOB_ACCUMULATORS = "job-accumulators"; + public static final String FIELD_NAME_USER_TASK_ACCUMULATORS = "user-task-accumulators"; + + @JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) + private List jobAccumulators; + + @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) + private List userAccumulators; + + @JsonCreator + public JobAccumulatorsInfo( + @JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) @Nullable List jobAccumulators, --- End diff -- I would remove the `Nullable` and instead pass in an `Collections.emptyList()` if there are no job accumulators at the moment. > Port JobAccumulatorsHandler to new REST endpoint > > > Key: FLINK-7706 > URL: https://issues.apache.org/jira/browse/FLINK-7706 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Hai Zhou UTC+8 >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port existing {{JobAccumulatorsHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4898: [FLINK-7706] [flip6] Add JobAccumulatorsHandler fo...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4898#discussion_r148802300 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java --- @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Objects; + +/** + * Response type of the {@link JobAccumulatorsHandler}. + */ +public class JobAccumulatorsInfo implements ResponseBody { + public static final String FIELD_NAME_JOB_ACCUMULATORS = "job-accumulators"; + public static final String FIELD_NAME_USER_TASK_ACCUMULATORS = "user-task-accumulators"; + + @JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) + private List jobAccumulators; + + @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) + private List userAccumulators; + + @JsonCreator + public JobAccumulatorsInfo( + @JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) @Nullable List jobAccumulators, --- End diff -- I would remove the `Nullable` and instead pass in an `Collections.emptyList()` if there are no job accumulators at the moment. ---
[GitHub] flink issue #4942: [FLINK-7420] [avro] Move all Avro code to flink-avro (fol...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4942 Great fix for our Avro dependency @StephanEwen, @aljoscha and @twalthr. +1 for merging. ---
[jira] [Commented] (FLINK-7420) Move all Avro code to flink-avro
[ https://issues.apache.org/jira/browse/FLINK-7420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237672#comment-16237672 ] ASF GitHub Bot commented on FLINK-7420: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4942 Great fix for our Avro dependency @StephanEwen, @aljoscha and @twalthr. +1 for merging. > Move all Avro code to flink-avro > > > Key: FLINK-7420 > URL: https://issues.apache.org/jira/browse/FLINK-7420 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0 > > > *Problem* > Currently, the {{flink-avro}} project is a shell with some tests and mostly > duplicate and dead code. The classes that use Avro are distributed quite > wildly through the code base, and introduce multiple direct dependencies on > Avro in a messy way. > That way, we cannot create a proper fat Avro dependency in which we shade > Jackson away. > Also, we expose Avro as a direct and hard dependency on many Flink modules, > while it should be a dependency that users that use Avro types selectively > add. > *Suggested Changes* > We should move all Avro related classes to {{flink-avro}}, and give > {{flink-avro}} a dependency on {{flink-core}} and {{flink-streaming-java}}. > - {{AvroTypeInfo}} > - {{AvroSerializer}} > - {{AvroRowSerializationSchema}} > - {{AvroRowDeserializationSchema}} > To be able to move the the avro serialization code from {{flink-ore}} to > {{flink-avro}}, we need to load the {{AvroTypeInformation}} reflectively, > similar to how we load the {{WritableTypeInfo}} for Hadoop. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7973) Fix service shading relocation for S3 file systems
Stephan Ewen created FLINK-7973: --- Summary: Fix service shading relocation for S3 file systems Key: FLINK-7973 URL: https://issues.apache.org/jira/browse/FLINK-7973 Project: Flink Issue Type: Bug Reporter: Stephan Ewen Assignee: Stephan Ewen Priority: Blocker Fix For: 1.4.0 The shade plugin relocates services incorrectly currently, applying relocation patterns multiple times. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7420) Move all Avro code to flink-avro
[ https://issues.apache.org/jira/browse/FLINK-7420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237632#comment-16237632 ] ASF GitHub Bot commented on FLINK-7420: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4942#discussion_r148789659 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java --- @@ -217,7 +217,7 @@ public void read(DataInputView in) throws IOException { /** * Placeholder dummy for a previously registered class that can no longer be found in classpath on restore. */ - public static class DummyRegisteredClass {} + public static class DummyRegisteredClass implements Serializable {} --- End diff -- `serialVersionUID` is missing. > Move all Avro code to flink-avro > > > Key: FLINK-7420 > URL: https://issues.apache.org/jira/browse/FLINK-7420 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0 > > > *Problem* > Currently, the {{flink-avro}} project is a shell with some tests and mostly > duplicate and dead code. The classes that use Avro are distributed quite > wildly through the code base, and introduce multiple direct dependencies on > Avro in a messy way. > That way, we cannot create a proper fat Avro dependency in which we shade > Jackson away. > Also, we expose Avro as a direct and hard dependency on many Flink modules, > while it should be a dependency that users that use Avro types selectively > add. > *Suggested Changes* > We should move all Avro related classes to {{flink-avro}}, and give > {{flink-avro}} a dependency on {{flink-core}} and {{flink-streaming-java}}. > - {{AvroTypeInfo}} > - {{AvroSerializer}} > - {{AvroRowSerializationSchema}} > - {{AvroRowDeserializationSchema}} > To be able to move the the avro serialization code from {{flink-ore}} to > {{flink-avro}}, we need to load the {{AvroTypeInformation}} reflectively, > similar to how we load the {{WritableTypeInfo}} for Hadoop. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4942: [FLINK-7420] [avro] Move all Avro code to flink-av...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4942#discussion_r148789659 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java --- @@ -217,7 +217,7 @@ public void read(DataInputView in) throws IOException { /** * Placeholder dummy for a previously registered class that can no longer be found in classpath on restore. */ - public static class DummyRegisteredClass {} + public static class DummyRegisteredClass implements Serializable {} --- End diff -- `serialVersionUID` is missing. ---
[jira] [Comment Edited] (FLINK-7880) flink-queryable-state-java fails with core-dump
[ https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237616#comment-16237616 ] Gary Yao edited comment on FLINK-7880 at 11/3/17 1:55 PM: -- I have further isolated the problem. The problem still appears even if you are running a single test method. I tried {{NonHAQueryableStateRocksDBBackendITCase#testValueState}} and even replaced the state query with a {{Thread.sleep(400)}}. My changes to the code are documented here: https://github.com/apache/flink/compare/master...GJL:FLINK-7880?expand=1 To run the tests, I use the command below. Note that multiple iterations may be required. Hence, the {{while}} loop. {noformat} while mvn -o clean verify -Dtest=NonHAQueryableStateRocksDBBackendITCase -DfailIfNoTests=false -Dcheckstyle.skip; do :; done {noformat} The failure, I get is the same as mentioned by [~kkl0u]: {noformat} libc++abi.dylib: terminating with uncaught exception of type std::__1::system_error: mutex lock failed: Invalid argument {noformat} was (Author: gjy): I have further isolated the problem. The problem still appears even if you are running a single test method. I tried {{NonHAQueryableStateRocksDBBackendITCase#testValueState}} and even replaced the state query with a {{Thread.sleep(400)}}. My changes to the code are documented here: https://github.com/apache/flink/compare/master...GJL:FLINK-7880?expand=1 To run the tests, I use the command below. Note that multiple iterations may be required. Hence, the {{while}} loop. {noformat} while mvn -o clean verify -Dtest=NonHAQueryableStateRocksDBBackendITCase -DfailIfNoTests=false -Dcheckstyle.skip; do :; done {noformat} > flink-queryable-state-java fails with core-dump > --- > > Key: FLINK-7880 > URL: https://issues.apache.org/jira/browse/FLINK-7880 > Project: Flink > Issue Type: Bug > Components: Queryable State, Tests >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Blocker > Labels: test-stability > Fix For: 1.4.0 > > > The {{flink-queryable-state-java}} module fails on Travis with a core dump. > https://travis-ci.org/tillrohrmann/flink/jobs/289949829 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7880) flink-queryable-state-java fails with core-dump
[ https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237616#comment-16237616 ] Gary Yao commented on FLINK-7880: - I have further isolated the problem. The problem still appears even if you are running a single test method. I tried {{NonHAQueryableStateRocksDBBackendITCase#testValueState}} and even replaced the state query with a {{Thread.sleep(400)}}. My changes to the code are documented here: https://github.com/apache/flink/compare/master...GJL:FLINK-7880?expand=1 To run the tests, I use the command below. Note that multiple iterations may be required. Hence, the {{while}} loop. {noformat} while mvn -o clean verify -Dtest=NonHAQueryableStateRocksDBBackendITCase -DfailIfNoTests=false -Dcheckstyle.skip; do :; done {noformat} > flink-queryable-state-java fails with core-dump > --- > > Key: FLINK-7880 > URL: https://issues.apache.org/jira/browse/FLINK-7880 > Project: Flink > Issue Type: Bug > Components: Queryable State, Tests >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Blocker > Labels: test-stability > Fix For: 1.4.0 > > > The {{flink-queryable-state-java}} module fails on Travis with a core dump. > https://travis-ci.org/tillrohrmann/flink/jobs/289949829 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-7880) flink-queryable-state-java fails with core-dump
[ https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237584#comment-16237584 ] Kostas Kloudas edited comment on FLINK-7880 at 11/3/17 1:38 PM: I will keep on investigating *BUT* I commented out the {{queryable state}} code from the tests and they still seem to fail even locally with: {code} libc++abi.dylib: terminating with uncaught exception of type std::__1::system_error: mutex lock failed: Invalid argument /bin/sh: line 1: 10553 Abort trap: 6 /Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/bin/java -Xms256m -Xmx2048m -Dmvn.forkNumber=1 -XX:+UseSerialGC -jar /Users/kkloudas/repos/dataartisans/flink/flink-queryable-state/flink-queryable-state-runtime/target/surefire/surefirebooter6530581495710923655.jar /Users/kkloudas/repos/dataartisans/flink/flink-queryable-state/flink-queryable-state-runtime/target/surefire/surefire7293759706604721607tmp /Users/kkloudas/repos/dataartisans/flink/flink-queryable-state/flink-queryable-state-runtime/target/surefire/surefire_87379374358455308985tmp {code} You can find my code here https://github.com/kl0u/flink/tree/more-than-qs and to reproduce, go to the {{flink-queryable-state}} dir and run a couple of times {{mvn verify}}. At least on my machine this reproduces the problem. So the root problem may not be in the Queryable State but in the sequence of steps taken when shutting down the RocksDB state backend. was (Author: kkl0u): I will keep on investigating *BUT* I commented out the {{queryable state}} code from the tests and they still seem to fail even locally with: {code} libc++abi.dylib: terminating with uncaught exception of type std::__1::system_error: mutex lock failed: Invalid argument /bin/sh: line 1: 10553 Abort trap: 6 /Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/bin/java -Xms256m -Xmx2048m -Dmvn.forkNumber=1 -XX:+UseSerialGC -jar /Users/kkloudas/repos/dataartisans/flink/flink-queryable-state/flink-queryable-state-runtime/target/surefire/surefirebooter6530581495710923655.jar /Users/kkloudas/repos/dataartisans/flink/flink-queryable-state/flink-queryable-state-runtime/target/surefire/surefire7293759706604721607tmp /Users/kkloudas/repos/dataartisans/flink/flink-queryable-state/flink-queryable-state-runtime/target/surefire/surefire_87379374358455308985tmp {code} You can find my code here https://github.com/kl0u/flink/tree/more-than-qs and to reproduce, go to the {{flink-queryable-state}} dir and run a couple of times {{mvn verify}}. At least on my machine this reproduces the problem. > flink-queryable-state-java fails with core-dump > --- > > Key: FLINK-7880 > URL: https://issues.apache.org/jira/browse/FLINK-7880 > Project: Flink > Issue Type: Bug > Components: Queryable State, Tests >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Blocker > Labels: test-stability > Fix For: 1.4.0 > > > The {{flink-queryable-state-java}} module fails on Travis with a core dump. > https://travis-ci.org/tillrohrmann/flink/jobs/289949829 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7880) flink-queryable-state-java fails with core-dump
[ https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237584#comment-16237584 ] Kostas Kloudas commented on FLINK-7880: --- I will keep on investigating *BUT* I commented out the {{queryable state}} code from the tests and they still seem to fail even locally with: {code} libc++abi.dylib: terminating with uncaught exception of type std::__1::system_error: mutex lock failed: Invalid argument /bin/sh: line 1: 10553 Abort trap: 6 /Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/bin/java -Xms256m -Xmx2048m -Dmvn.forkNumber=1 -XX:+UseSerialGC -jar /Users/kkloudas/repos/dataartisans/flink/flink-queryable-state/flink-queryable-state-runtime/target/surefire/surefirebooter6530581495710923655.jar /Users/kkloudas/repos/dataartisans/flink/flink-queryable-state/flink-queryable-state-runtime/target/surefire/surefire7293759706604721607tmp /Users/kkloudas/repos/dataartisans/flink/flink-queryable-state/flink-queryable-state-runtime/target/surefire/surefire_87379374358455308985tmp {code} You can find my code here https://github.com/kl0u/flink/tree/more-than-qs and to reproduce, go to the {{flink-queryable-state}} dir and run a couple of times {{mvn verify}}. At least on my machine this reproduces the problem. > flink-queryable-state-java fails with core-dump > --- > > Key: FLINK-7880 > URL: https://issues.apache.org/jira/browse/FLINK-7880 > Project: Flink > Issue Type: Bug > Components: Queryable State, Tests >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Blocker > Labels: test-stability > Fix For: 1.4.0 > > > The {{flink-queryable-state-java}} module fails on Travis with a core dump. > https://travis-ci.org/tillrohrmann/flink/jobs/289949829 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7420) Move all Avro code to flink-avro
[ https://issues.apache.org/jira/browse/FLINK-7420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237579#comment-16237579 ] ASF GitHub Bot commented on FLINK-7420: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4942 Personally, I like this Jenkins fellow more... > Move all Avro code to flink-avro > > > Key: FLINK-7420 > URL: https://issues.apache.org/jira/browse/FLINK-7420 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0 > > > *Problem* > Currently, the {{flink-avro}} project is a shell with some tests and mostly > duplicate and dead code. The classes that use Avro are distributed quite > wildly through the code base, and introduce multiple direct dependencies on > Avro in a messy way. > That way, we cannot create a proper fat Avro dependency in which we shade > Jackson away. > Also, we expose Avro as a direct and hard dependency on many Flink modules, > while it should be a dependency that users that use Avro types selectively > add. > *Suggested Changes* > We should move all Avro related classes to {{flink-avro}}, and give > {{flink-avro}} a dependency on {{flink-core}} and {{flink-streaming-java}}. > - {{AvroTypeInfo}} > - {{AvroSerializer}} > - {{AvroRowSerializationSchema}} > - {{AvroRowDeserializationSchema}} > To be able to move the the avro serialization code from {{flink-ore}} to > {{flink-avro}}, we need to load the {{AvroTypeInformation}} reflectively, > similar to how we load the {{WritableTypeInfo}} for Hadoop. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4942: [FLINK-7420] [avro] Move all Avro code to flink-avro (fol...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4942 Personally, I like this Jenkins fellow more... ð ---
[jira] [Commented] (FLINK-7420) Move all Avro code to flink-avro
[ https://issues.apache.org/jira/browse/FLINK-7420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237564#comment-16237564 ] ASF GitHub Bot commented on FLINK-7420: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4942 Lets see if Travis agrees. That guy has opinions... > Move all Avro code to flink-avro > > > Key: FLINK-7420 > URL: https://issues.apache.org/jira/browse/FLINK-7420 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0 > > > *Problem* > Currently, the {{flink-avro}} project is a shell with some tests and mostly > duplicate and dead code. The classes that use Avro are distributed quite > wildly through the code base, and introduce multiple direct dependencies on > Avro in a messy way. > That way, we cannot create a proper fat Avro dependency in which we shade > Jackson away. > Also, we expose Avro as a direct and hard dependency on many Flink modules, > while it should be a dependency that users that use Avro types selectively > add. > *Suggested Changes* > We should move all Avro related classes to {{flink-avro}}, and give > {{flink-avro}} a dependency on {{flink-core}} and {{flink-streaming-java}}. > - {{AvroTypeInfo}} > - {{AvroSerializer}} > - {{AvroRowSerializationSchema}} > - {{AvroRowDeserializationSchema}} > To be able to move the the avro serialization code from {{flink-ore}} to > {{flink-avro}}, we need to load the {{AvroTypeInformation}} reflectively, > similar to how we load the {{WritableTypeInfo}} for Hadoop. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4942: [FLINK-7420] [avro] Move all Avro code to flink-avro (fol...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4942 Lets see if Travis agrees. That guy has opinions... ---
[jira] [Commented] (FLINK-7420) Move all Avro code to flink-avro
[ https://issues.apache.org/jira/browse/FLINK-7420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237557#comment-16237557 ] ASF GitHub Bot commented on FLINK-7420: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4942 The follow-up changes look good! (it's very easy to review since it's clearly separated. ) If you reviewed Timo's and my changes I would say this is good to go. > Move all Avro code to flink-avro > > > Key: FLINK-7420 > URL: https://issues.apache.org/jira/browse/FLINK-7420 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0 > > > *Problem* > Currently, the {{flink-avro}} project is a shell with some tests and mostly > duplicate and dead code. The classes that use Avro are distributed quite > wildly through the code base, and introduce multiple direct dependencies on > Avro in a messy way. > That way, we cannot create a proper fat Avro dependency in which we shade > Jackson away. > Also, we expose Avro as a direct and hard dependency on many Flink modules, > while it should be a dependency that users that use Avro types selectively > add. > *Suggested Changes* > We should move all Avro related classes to {{flink-avro}}, and give > {{flink-avro}} a dependency on {{flink-core}} and {{flink-streaming-java}}. > - {{AvroTypeInfo}} > - {{AvroSerializer}} > - {{AvroRowSerializationSchema}} > - {{AvroRowDeserializationSchema}} > To be able to move the the avro serialization code from {{flink-ore}} to > {{flink-avro}}, we need to load the {{AvroTypeInformation}} reflectively, > similar to how we load the {{WritableTypeInfo}} for Hadoop. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4942: [FLINK-7420] [avro] Move all Avro code to flink-avro (fol...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4942 The follow-up changes look good! (it's very easy to review since it's clearly separated. ð) If you reviewed Timo's and my changes I would say this is good to go. ---
[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem
[ https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237468#comment-16237468 ] ASF GitHub Bot commented on FLINK-4228: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r148756933 --- Diff: flink-filesystems/flink-s3-fs-hadoop/pom.xml --- @@ -182,6 +182,21 @@ under the License. ${project.version} test + --- End diff -- Yes this is about the recursive upload which needs to be tested once with hdfs and once more with s3. Sure we could flip the dependency and let the tests in the `yarn` sub-project depend on `flink-s3-fs-hadoop` (and I don't mind which depends on which, actually) but wouldn't this be just the same but in reverse? > YARN artifact upload does not work with S3AFileSystem > - > > Key: FLINK-4228 > URL: https://issues.apache.org/jira/browse/FLINK-4228 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Priority: Blocker > Fix For: 1.4.0 > > > The issue now is exclusive to running on YARN with s3a:// as your configured > FileSystem. If so, the Flink session will fail on staging itself because it > tries to copy the flink/lib directory to S3 and the S3aFileSystem does not > support recursive copy. > h2. Old Issue > Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) > leads to an Exception when uploading the snapshot to S3 when using the > {{S3AFileSystem}}. > {code} > AsynchronousException{com.amazonaws.AmazonClientException: Unable to > calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory)} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870) > Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.FileNotFoundException: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at java.io.FileInputStream.open0(Native Method) > at java.io.FileInputStream.open(FileInputStream.java:195) > at java.io.FileInputStream.(FileInputStream.java:138) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294) > ... 9 more > {code} > Running with S3NFileSystem, the error does not occur. The problem might be > due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created > automatically. We might need to manually create folders and copy only actual > files for {{S3AFileSystem}}. More investigation is required. -- This message was sent by Atlassian JIRA (v6.4.14#64029)