[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...
Github user ifndef-SleePy commented on the issue: https://github.com/apache/flink/pull/5580 @zentol All right, got your point. That's a problem indeed. ---
[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...
Github user ifndef-SleePy commented on the issue: https://github.com/apache/flink/pull/5580 If some day the behavior of `RestClusterClient` changes. Maybe uploading the jars via REST API. We could just upload the user-defined files via REST API either. That's not a big problem, right? ---
[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...
Github user ifndef-SleePy commented on the issue: https://github.com/apache/flink/pull/5580 Hi @zentol In Flip-6 the client communicates with cluster via REST API, that's true. However this not include blobs. Currently in `RestClusterClient.submit()` method, the client uses blob service to upload jars of user. So this PR does not break the rule. I think this feature is quite good. Spark, even MapReduce support uploading user-defined jars/files/archives. I think we should support it. Users would be able to migrate to Flink more fluently with this feature. What do you think? ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user ifndef-SleePy commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r170804963 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java --- @@ -314,6 +315,9 @@ public static TaskExecutor startTaskManager( TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration); + final FileCache fileCache = new FileCache(taskManagerServicesConfiguration.getTmpDirPaths(), --- End diff -- It seems that the `fileCache` here is not been used at all. ---
[GitHub] flink issue #5048: [Flink-7871] [flip6] SlotPool should release unused slots...
Github user ifndef-SleePy commented on the issue: https://github.com/apache/flink/pull/5048 Hi @tillrohrmann , I just updated the PR, sorry for delaying so long. ---
[GitHub] flink pull request #5322: [hotfix] [REST] Fix WebMonitorEndpoint that missin...
GitHub user ifndef-SleePy opened a pull request: https://github.com/apache/flink/pull/5322 [hotfix] [REST] Fix WebMonitorEndpoint that missing parameters while initializing handlers ## What is the purpose of the change * Fix bugs in my recent PRs about migrating subtask REST handlers ## Brief change log * Some `MessageHeaders` are missing while initializing handlers in `WebMonitorEndpoint` * There is wrong parameter type in `SubtaskExecutionAttemptAccumulatorsHeaders` ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink FLINK-REST-handlers-hotfix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5322.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5322 commit 23dd66822352ae7d1d6f0e60494f9541c57714e8 Author: biao.liub Date: 2018-01-20T05:26:29Z [hotfix] [REST] Fix WebMonitorEndpoint that missing parameters while initializing handlers ---
[GitHub] flink pull request #5287: [FLINK-8367] [REST] Migrate SubtaskCurrentAttemptD...
GitHub user ifndef-SleePy opened a pull request: https://github.com/apache/flink/pull/5287 [FLINK-8367] [REST] Migrate SubtaskCurrentAttemptDetailsHandler to Flip-6 REST handler ## What is the purpose of the change * Migrate `org.apache.flink.runtime.rest.handler.legacy.SubtaskCurrentAttemptDetailsHandler` to Flip-6 `WebMonitorEndpoint`. * This PR is based on [#5285](https://github.com/apache/flink/pull/5285) ## Brief change log * Add `SubtaskCurrentAttemptDetailsHandler` in Flip-6 REST framework. ## Verifying this change * This change added unit tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink FLINK-8367 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5287.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5287 commit 16b3f11b96b1a8b0f290945af3d255d47739992e Author: biao.liub Date: 2018-01-10T06:25:07Z [FLINK-8368] Migrate org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler to new a REST handler that registered in WebMonitorEndpoint commit 50eb6e72293abc02ef17707f3d70e069d2f2a463 Author: biao.liub Date: 2018-01-12T08:38:55Z [FLINK-8368] Add attempts path info that is missing in SubtaskExecutionAttemptDetailsHeaders commit 97124dfe717702b46f0a105b78cf86f18ac063a9 Author: biao.liub Date: 2018-01-12T08:33:08Z [FLINK-8369] Migrate SubtaskExecutionAttemptAccumulatorsHandler to Flip-6 REST endpoint commit 2617cf22f69bd1e7ec6e34056bce6120a1164bd9 Author: biao.liub Date: 2018-01-12T08:33:57Z [FLINK-8369] Migrate SubtaskExecutionAttemptAccumulatorsHandler to Flip-6 REST endpoint commit 52199031f81c7a267c28cf340035bc4299ac7f3c Author: biao.liub Date: 2018-01-12T09:57:05Z [FLINK-8369] Fix some compiling issues. commit bd8e42b8e5442978e865ba0629534c555fcb1e16 Author: biao.liub Date: 2018-01-12T13:37:48Z [FLINK-8367] Migrate SubtaskCurrentAttemptDetailsHandler to new a REST handler ---
[GitHub] flink pull request #5285: [FLINK-8369] [REST] Migrate SubtaskExecutionAttemp...
GitHub user ifndef-SleePy opened a pull request: https://github.com/apache/flink/pull/5285 [FLINK-8369] [REST] Migrate SubtaskExecutionAttemptAccumulatorsHandler to new a REST handler ## What is the purpose of the change * Migrate `org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptAccumulatorsHandler` to Flip-6 `WebMonitorEndpoint`. * This PR is based on [#5270](https://github.com/apache/flink/pull/5270) ## Brief change log * Add `SubtaskExecutionAttemptAccumulatorsHandler` in Flip-6 REST framework. * Move inner class `UserAccumulator` to public, make it reusable. ## Verifying this change * This change added unit tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink FLINK-8369 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5285.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5285 commit 16b3f11b96b1a8b0f290945af3d255d47739992e Author: biao.liub Date: 2018-01-10T06:25:07Z [FLINK-8368] Migrate org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler to new a REST handler that registered in WebMonitorEndpoint commit 50eb6e72293abc02ef17707f3d70e069d2f2a463 Author: biao.liub Date: 2018-01-12T08:38:55Z [FLINK-8368] Add attempts path info that is missing in SubtaskExecutionAttemptDetailsHeaders commit 97124dfe717702b46f0a105b78cf86f18ac063a9 Author: biao.liub Date: 2018-01-12T08:33:08Z [FLINK-8369] Migrate SubtaskExecutionAttemptAccumulatorsHandler to Flip-6 REST endpoint commit 2617cf22f69bd1e7ec6e34056bce6120a1164bd9 Author: biao.liub Date: 2018-01-12T08:33:57Z [FLINK-8369] Migrate SubtaskExecutionAttemptAccumulatorsHandler to Flip-6 REST endpoint commit 52199031f81c7a267c28cf340035bc4299ac7f3c Author: biao.liub Date: 2018-01-12T09:57:05Z [FLINK-8369] Fix some compiling issues. ---
[GitHub] flink pull request #5270: [FLINK-8368] [REST] Migrate SubtaskExecutionAttemp...
GitHub user ifndef-SleePy opened a pull request: https://github.com/apache/flink/pull/5270 [FLINK-8368] [REST] Migrate SubtaskExecutionAttemptDetailsHandler to new a REST handler ## What is the purpose of the change * Migrate `org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler` to flip-6 `WebMonitorEndpoint`. ## Brief change log * Make some abstraction about `JobVertexHandler` and `SubtaskAttemptHandler`. * Add `SubtaskExecutionAttemptDetailsHandler` in flip-6 REST framework. * Rename inner class `JobVertexMetrics` to public class `IOMetricsInfo`, make it more reusable. ## Verifying this change * This change added unit tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink FLINK-8368 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5270.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5270 commit 29182f04255f77288e207aef9e7015862d3e9a8c Author: biao.liub Date: 2018-01-10T06:25:07Z [FLINK-8368] Migrate org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler to new a REST handler that registered in WebMonitorEndpoint ---
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user ifndef-SleePy commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r156621784 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java --- @@ -266,104 +279,367 @@ public void disconnectResourceManager() { // @Override - public CompletableFuture allocateSlot( - SlotRequestID requestId, - ScheduledUnit task, - ResourceProfile resources, - Iterable locationPreferences, + public CompletableFuture allocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit scheduledUnit, + ResourceProfile resourceProfile, + Collection locationPreferences, + boolean allowQueuedScheduling, Time timeout) { - return internalAllocateSlot(requestId, task, resources, locationPreferences); + return internalAllocateSlot( + slotRequestId, + scheduledUnit, + resourceProfile, + locationPreferences, + allowQueuedScheduling); } - @Override - public void returnAllocatedSlot(Slot slot) { - internalReturnAllocatedSlot(slot); + private CompletableFuture internalAllocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit task, + ResourceProfile resourceProfile, + Collection locationPreferences, + boolean allowQueuedScheduling) { + + final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId(); + + if (slotSharingGroupId != null) { + // allocate slot with slot sharing + final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent( + slotSharingGroupId, + id -> new SlotSharingManager( + id, + this, + providerAndOwner)); + + final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotFuture; + + try { + if (task.getCoLocationConstraint() != null) { + multiTaskSlotFuture = allocateCoLocatedMultiTaskSlot( + task.getCoLocationConstraint(), + multiTaskSlotManager, + resourceProfile, + locationPreferences, + allowQueuedScheduling); + } else { + multiTaskSlotFuture = allocateMultiTaskSlot( + task.getJobVertexId(), multiTaskSlotManager, + resourceProfile, + locationPreferences, + allowQueuedScheduling); + } + } catch (NoResourceAvailableException noResourceException) { + return FutureUtils.completedExceptionally(noResourceException); + } + + // sanity check + Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId())); + + final SlotSharingManager.SingleTaskSlot leave = multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot( + slotRequestId, + task.getJobVertexId(), + multiTaskSlotFuture.getLocality()); + + return leave.getLogicalSlotFuture(); + } else { + // request an allocated slot to assign a single logical slot to + CompletableFuture slotAndLocalityFuture = requestAllocatedSlot( + slotRequestId, + resourceProfile, + locationPreferences, + allowQueuedScheduling); + + return slotAndLocalityFuture.thenApply( + (SlotAndLocality slotAndLocality) -> { + final AllocatedSlot allocatedSlot = slotAndLocality.g
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user ifndef-SleePy commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r154871284 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java --- @@ -19,68 +19,104 @@ package org.apache.flink.runtime.jobmanager.scheduler; import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.Preconditions; +import javax.annotation.Nullable; + public class ScheduledUnit { - + + @Nullable private final Execution vertexExecution; - - private final SlotSharingGroup sharingGroup; - - private final CoLocationConstraint locationConstraint; + + private final JobVertexID jobVertexId; + + @Nullable + private final SlotSharingGroupId slotSharingGroupId; + + @Nullable + private final CoLocationConstraint coLocationConstraint; // public ScheduledUnit(Execution task) { - Preconditions.checkNotNull(task); - - this.vertexExecution = task; - this.sharingGroup = null; - this.locationConstraint = null; + this( + Preconditions.checkNotNull(task), + task.getVertex().getJobvertexId(), + null, + null); } - public ScheduledUnit(Execution task, SlotSharingGroup sharingUnit) { - Preconditions.checkNotNull(task); - - this.vertexExecution = task; - this.sharingGroup = sharingUnit; - this.locationConstraint = null; + public ScheduledUnit(Execution task, SlotSharingGroupId slotSharingGroupId) { + this( + Preconditions.checkNotNull(task), + task.getVertex().getJobvertexId(), + slotSharingGroupId, + null); } - public ScheduledUnit(Execution task, SlotSharingGroup sharingUnit, CoLocationConstraint locationConstraint) { - Preconditions.checkNotNull(task); - Preconditions.checkNotNull(sharingUnit); - Preconditions.checkNotNull(locationConstraint); - + public ScheduledUnit( + Execution task, + SlotSharingGroupId slotSharingGroupId, + CoLocationConstraint coLocationConstraint) { + this( + Preconditions.checkNotNull(task), + task.getVertex().getJobvertexId(), + slotSharingGroupId, + coLocationConstraint); + } + + public ScheduledUnit( + JobVertexID jobVertexId, + SlotSharingGroupId slotSharingGroupId, + CoLocationConstraint coLocationConstraint) { + this( + null, + jobVertexId, + slotSharingGroupId, + coLocationConstraint); + } + + public ScheduledUnit( + Execution task, + JobVertexID jobVertexId, --- End diff -- We can get JobVertexID from Execution. Do we need this in Constructor? ---
[GitHub] flink issue #5048: [Flink-7871] [flip6] SlotPool should release unused slots...
Github user ifndef-SleePy commented on the issue: https://github.com/apache/flink/pull/5048 Hi Till, thank you for reviewing. I agree that it is much simpler to use `scheduleRunAsync`. [PR-5091](https://github.com/apache/flink/pull/5091) is not a small reworking. But it looks really great to me. Will rebase it and address comments in a few days. ---
[GitHub] flink pull request #5048: [Flink-7871] [flip6] SlotPool should release unuse...
GitHub user ifndef-SleePy opened a pull request: https://github.com/apache/flink/pull/5048 [Flink-7871] [flip6] SlotPool should release unused slots to RM ## What is the purpose of the change * This pull request makes SlotPool release unused slots back to RM if the slots is idle for some time ## Brief change log * Add a TimerService in SlotPool * Each available slot will register a timer in TimerService, and will unregister when the slot is removed or becomes allocated * If timeout happened, the slot will be released by notifying TM * Add notifySlotUnused method in TaskExecutorGateway * Make some fixed params configurable in SlotPool ## Verifying this change This change added tests in AvailableSlotsTest ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no, but add a new `@Public(Evolving)` class SlotOptions) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink FLINK-7871 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5048.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5048 commit b16d206b1ff1a459de10dd5d08094172386d3707 Author: biao.liub Date: 2017-11-16T10:11:20Z [FLINK-7871] [flip6] SlotPool will release unused slot if it is idle for a period. commit 19e9ccfb69b6e1529ed346a00389b682eecdfcfd Author: biao.liub Date: 2017-11-16T11:15:16Z [FLINK-7871] [flip6] Stop timer service in SlotPool elegantly. commit 0484f7457e46180fe49ae4f3483afa92c0b20ac5 Author: biao.liub Date: 2017-11-17T02:58:57Z [FLINK-7871] [flip6] Fix rebasing error. commit 2a012582b1e169a90c5674ed294d95c12029ee8d Author: biao.liub Date: 2017-11-17T03:19:40Z [FLINK-7871] [flip6] Fix AvailableSlotsTest. commit bb8b8f889735ab7a93c837db3c33b3f0900755db Author: biao.liub Date: 2017-11-17T08:15:01Z [FLINK-7871] [flip6] Add super.postStop() in SlotPool.postStop() commit 29fe70cec7de136e5959c9f128d32ece97fb68fc Author: biao.liub Date: 2017-11-17T08:36:41Z [Flink-7871] [flip6] SlotPool should release unused slots to RM Summary: ## What is the purpose of the change * This pull request makes SlotPool release unused slots back to RM if the slots is idle for some time ## Brief change log * Add a TimerService in SlotPool * Each available slot will register a timer in TimerService, and will unregister when the slot is removed or becomes allocated * If timeout happened, the slot will be released by notifying TM * Add notifySlotUnused method in TaskExecutorGateway * Make some fixed params configurable in SlotPool ## Verifying this change This change added tests in AvailableSlotsTest ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no, but add a new '@Public(Evolving)' class SlotOptions) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) Test Plan: UT done Reviewers: æµ·æ¶, è¾ æº Differential Revision: https://aone.alibaba-inc.com/code/D350078 commit 7b144e853df1d9a807464832a3582922c91672d1 Author: biao.liub Date: 2017-11-17T09:19:40Z [Flink-7871] [flip6] SlotPool should release unused slots to RM Summary: ## What is the purpose of the change * This pull request makes SlotPool release unused slots back to RM if the slots is idle for some time ## Brief change log * Add a TimerService in SlotPool * Each available slot will register a timer in TimerService, and will unregister when the slot is removed or becomes allocated * If timeout happened, the sl
[GitHub] flink issue #3810: [FLINK-6397] MultipleProgramsTestBase does not reset Cont...
Github user ifndef-SleePy commented on the issue: https://github.com/apache/flink/pull/3810 Nice work! I have rebased master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3810: [FLINK-6397] MultipleProgramsTestBase does not reset Cont...
Github user ifndef-SleePy commented on the issue: https://github.com/apache/flink/pull/3810 OK, got it. Thank you for explanation. Is there any more problems with this pull request? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3810: [FLINK-6397] MultipleProgramsTestBase does not reset Cont...
Github user ifndef-SleePy commented on the issue: https://github.com/apache/flink/pull/3810 Hi, I updated this review. I don't know why there is no reminding in jira and email. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3810: [FLINK-6397] MultipleProgramsTestBase does not res...
Github user ifndef-SleePy commented on a diff in the pull request: https://github.com/apache/flink/pull/3810#discussion_r115170542 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java --- @@ -80,29 +80,36 @@ protected final TestExecutionMode mode; - + private TestEnvironment testEnvironment; + + private CollectionTestEnvironment collectionTestEnvironment; + public MultipleProgramsTestBase(TestExecutionMode mode) { this.mode = mode; - + switch(mode){ case CLUSTER: - new TestEnvironment(cluster, 4).setAsContext(); + testEnvironment = new TestEnvironment(cluster, 4); --- End diff -- Thank you for responding. Last my comment is not correct. Actually what I want to say is that we can not move these code to \@BeforeClass and \@AfterClass. But we can move them to \@Before and \@After, however it will bring a little overhead since \@Before and \@After will be called for each test method. I pushed a new commit using \@Before and \@After. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3810: [FLINK-6397] MultipleProgramsTestBase does not reset Cont...
Github user ifndef-SleePy commented on the issue: https://github.com/apache/flink/pull/3810 I think Till has implemented a static unsetAsContext method in TestEnvironment recently. I rebased the implementation from master, and recommit the pull request. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3810: [FLINK-6397] MultipleProgramsTestBase does not res...
Github user ifndef-SleePy commented on a diff in the pull request: https://github.com/apache/flink/pull/3810#discussion_r115133175 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java --- @@ -80,29 +80,36 @@ protected final TestExecutionMode mode; - + private TestEnvironment testEnvironment; + + private CollectionTestEnvironment collectionTestEnvironment; + public MultipleProgramsTestBase(TestExecutionMode mode) { this.mode = mode; - + switch(mode){ case CLUSTER: - new TestEnvironment(cluster, 4).setAsContext(); + testEnvironment = new TestEnvironment(cluster, 4); --- End diff -- @zentol I just found that I can not move these codes into \@Before method, because it will not work with JUnit Parameterized. I proposal to keep these codes in constructor. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3810: [Flink-6397] MultipleProgramsTestBase does not res...
Github user ifndef-SleePy commented on a diff in the pull request: https://github.com/apache/flink/pull/3810#discussion_r114693043 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java --- @@ -80,29 +80,36 @@ protected final TestExecutionMode mode; - + private TestEnvironment testEnvironment; + + private CollectionTestEnvironment collectionTestEnvironment; + public MultipleProgramsTestBase(TestExecutionMode mode) { this.mode = mode; - + switch(mode){ case CLUSTER: - new TestEnvironment(cluster, 4).setAsContext(); + testEnvironment = new TestEnvironment(cluster, 4); --- End diff -- Sounds reasonable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3810: [Flink-6397] MultipleProgramsTestBase does not reset Cont...
Github user ifndef-SleePy commented on the issue: https://github.com/apache/flink/pull/3810 @StephanEwen I agree that a static unset method would be a much easier implementation. Do you think it's acceptable that the unset method is static but set method is not static in TestEnvironment? Or we can implement the set method as static too, but that will make more changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3810: [Flink-6397] MultipleProgramsTestBase does not res...
GitHub user ifndef-SleePy opened a pull request: https://github.com/apache/flink/pull/3810 [Flink-6397] MultipleProgramsTestBase does not reset ContextEnvironment Reset ContextEnvironment when finished testing in MultipleProgramsTestBase.java and some other ITCase. Remove some useless importing. Add unsetContext method in TestEnvironment.java. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink FLINK-6397 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3810.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3810 commit 0932dcdf0d78a783b2921fa26ae80a6a4889c3ca Author: biao.liub Date: 2017-05-02T08:07:34Z [FLINK-6397] Unset context when integration test is finished. commit d0becf4331c7649b0711ec37154501fc39a7177b Author: biao.liub Date: 2017-05-02T11:01:47Z [FLINK-6397] Fix test cases. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3395: [FLINK-5861] Components of TaskManager support updating J...
Github user ifndef-SleePy commented on the issue: https://github.com/apache/flink/pull/3395 Hi Till, I almost miss this comments! I didn't see it until a few minutes ago. I fully understand your concern. Just let me explain more about your comments. 1. I agree most of your suggestions. Such as null check, formatting problem and TestLogger. 2. Currently synchronize problem will not happen. I think replacing the field value is safe. That's a atomic operation. Correct me if I'm wrong. 3. This PR will not work with other reconciliation PRs. Nobody will notify these listeners. Actually we implemented reconnection between TM and JM. It will work with those codes. The reason I make this PR without other reconciliation PRs is that I think this PR is independent with other parts. I believe filing a huge PR is both terrible for reviewer and writer. However this single PR makes you confused. Sorry about that. 4. Actually I'm not sure listener pattern is the best way to do this. But I think it's the simplest way which makes least modifications of current implementation. If the TM reconnected with new JM, how can we update the JobMasterGateway handled by components? I can't figure out a better way except reimplementing these components. Anyway, thank you for reviewing and commenting so many! I agree with you that we should close this PR at this moment. After making an agreement about main reconciliation PRs, we can talk about what this PR try to implement. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3395: [FLINK-5861] Components of TaskManager support upd...
Github user ifndef-SleePy closed the pull request at: https://github.com/apache/flink/pull/3395 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3487: [FLINK-5980] Expose max-parallelism value in RuntimeConte...
Github user ifndef-SleePy commented on the issue: https://github.com/apache/flink/pull/3487 Thank you, Stephan and zentol. Good question and suggestion. I didn't consider it too much for batch jobs. Also I think it's a bad idea that naming the variable "numberOfKeyGroups" in TaskInfo. Keeping max-parallelism is better. It's more common and makes much more sense for other scenarios. I will make sure it can work with batch jobs, and update this PR later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3487: [FLINK-5980] Expose max-parallelism value in Runti...
Github user ifndef-SleePy commented on a diff in the pull request: https://github.com/apache/flink/pull/3487#discussion_r104825967 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java --- @@ -85,6 +85,13 @@ int getIndexOfThisSubtask(); /** +* Gets the number of max-parallelism with which the parallel task runs. +* +* @return The max-parallelism with which the parallel task runs. +*/ + int getMaxParallelismOfSubtasks(); --- End diff -- That's a better name. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3487: [FLINK-5980] Expose max-parallelism value in Runti...
GitHub user ifndef-SleePy opened a pull request: https://github.com/apache/flink/pull/3487 [FLINK-5980] Expose max-parallelism value in RuntimeContext. Add new method named getMaxParallelismOfSubtasks in RuntimeContext. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink jira-5980 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3487.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3487 commit b7844d43d053d447a905fa727916683e06ec583d Author: biao.liub Date: 2017-03-07T12:11:10Z [FLINK-5980] Expose max-parallelism value in RuntimeContext. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3396: [FLINK-5879] Fix bug about ExecutionAttemptID, add...
Github user ifndef-SleePy closed the pull request at: https://github.com/apache/flink/pull/3396 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3396: [FLINK-5879] Fix bug about ExecutionAttemptID, add super(...
Github user ifndef-SleePy commented on the issue: https://github.com/apache/flink/pull/3396 Ah, you are right. I think I mixed up this issue with another one. Thank you for pointing it out. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3396: [FLINK-5879] Fix bug about ExecutionAttemptID, add...
GitHub user ifndef-SleePy opened a pull request: https://github.com/apache/flink/pull/3396 [FLINK-5879] Fix bug about ExecutionAttemptID, add super() in constructor. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink jira-5879 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3396.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3396 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3395: [FLINK-5861] Components of TaskManager support upd...
GitHub user ifndef-SleePy opened a pull request: https://github.com/apache/flink/pull/3395 [FLINK-5861] Components of TaskManager support updating JobManagerConnection You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink jira-5861 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3395.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3395 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3296: [FLINK-5784] Fix bug about registering JobManagerL...
GitHub user ifndef-SleePy opened a pull request: https://github.com/apache/flink/pull/3296 [FLINK-5784] Fix bug about registering JobManagerLeaderListener in LeaderRetrievalService multiple times [FLINK-5784] Fix bug about registering JobManagerLeaderListener in LeaderRetrievalService multiple times. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink jira-5784 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3296.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3296 commit d827f1d87ab8a15458b328fb10beeb70f4515493 Author: biao.liub Date: 2017-02-13T03:06:46Z [FLINK-5784] Fix bug about registering JobManagerLeaderListener in LeaderRetrievalService multiple times. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2877: [FLINK-5141] Implement Flip6LocalStreamEnvironment...
Github user ifndef-SleePy closed the pull request at: https://github.com/apache/flink/pull/2877 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2828: [FLINK-5093] java.util.ConcurrentModificationExcep...
Github user ifndef-SleePy closed the pull request at: https://github.com/apache/flink/pull/2828 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2817: [FLINK-5076] Shutting down TM when shutting down m...
Github user ifndef-SleePy closed the pull request at: https://github.com/apache/flink/pull/2817 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2877: [FLINK-5141] Implement Flip6LocalStreamEnvironment...
GitHub user ifndef-SleePy opened a pull request: https://github.com/apache/flink/pull/2877 [FLINK-5141] Implement Flip6LocalStreamEnvironment to run new mini cluster in flip-6 branch Implement a StreamEnvironment to run flip-6 mini cluster. According to Till's suggestion, I named it Flip6LocalStreamEnvironment for this moment. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink jira-5141 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2877.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2877 commit 1ed9ea9a929d1000e46687912b23b715931bb384 Author: biao.liub Date: 2016-11-23T09:02:11Z [FLINK-5141] Implement MiniClusterStreamEnvironment for new mini cluster. commit cca637ea1a3bee82802dbd237c7fc4b8d4bdee84 Author: biao.liub Date: 2016-11-24T10:38:50Z [FLINK-5141] Rename MiniClusterStreamEnvironment to Flip6LocalStreamEnvironment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2828: [FLINK-5093] java.util.ConcurrentModificationExcep...
GitHub user ifndef-SleePy opened a pull request: https://github.com/apache/flink/pull/2828 [FLINK-5093] java.util.ConcurrentModificationException is thrown when stopping TimerService [FLINK-5093] Fix bug about java.util.ConcurrentModificationException thrown while stopping TimerService. Add a new method "unregisterAllTimeouts" and UT for it. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink jira-5093 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2828.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2828 commit 21fdd8da33c634f5b6a19ed8ea7af9c18bcbe9bc Author: biao.liub Date: 2016-11-18T10:15:37Z [FLINK-5093] Fix bug about throwing ConcurrentModificationException when stopping TimerService. commit 24406bbe77ba211ce99f268a85adbb391cc605df Author: biao.liub Date: 2016-11-18T10:32:36Z [FLINK-5093] Remove useless import. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2817: [FLINK-5076] Shutting down TM when shutting down m...
GitHub user ifndef-SleePy opened a pull request: https://github.com/apache/flink/pull/2817 [FLINK-5076] Shutting down TM when shutting down mini cluster. This PR [#5076](https://issues.apache.org/jira/browse/FLINK-5076) adds shutting down task manager when shutting down new mini cluster. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink jira-5076 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2817.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2817 commit fabb666c2c4f29392287b1f7a04d47a3e021f75e Author: biao.liub Date: 2016-11-16T09:54:48Z [FLINK-5076] Shutting down TM when shutting down mini cluster. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---