[jira] [Commented] (FLINK-9640) Checkpointing is aways aborted if any task has been finished
[ https://issues.apache.org/jira/browse/FLINK-9640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527141#comment-16527141 ] Hai Zhou commented on FLINK-9640: - CC [~StephanEwen], what do you think about this ticket? > Checkpointing is aways aborted if any task has been finished > > > Key: FLINK-9640 > URL: https://issues.apache.org/jira/browse/FLINK-9640 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0, 1.4.0, 1.5.0, 1.6.0 >Reporter: Hai Zhou >Assignee: Hai Zhou >Priority: Major > Fix For: 1.6.0 > > > steps to reproduce: > 1. build a standalone flink cluster. > 2. submit a test job like this below: > {code:java} > public class DemoJob { > public static void main(String[] args) throws Exception { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.disableOperatorChaining(); > env.setParallelism(4); > env.enableCheckpointing(3000); > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > DataStream inputStream = env.addSource(new > StringGeneratorParallelSourceFunction()); > inputStream.map(String::hashCode).print(); > env.execute(); > } > public static class StringGeneratorParallelSourceFunction extends > RichParallelSourceFunction { > private static final Logger LOG = > LoggerFactory.getLogger(StringGeneratorParallelSourceFunction.class); > private volatile boolean running = true; > private int index; > private int subtask_nums; > @Override > public void open(Configuration parameters) throws Exception { > index = getRuntimeContext().getIndexOfThisSubtask(); > subtask_nums = getRuntimeContext().getNumberOfParallelSubtasks(); > } > @Override > public void run(SourceContext ctx) throws Exception { > while (running) { > String data = UUID.randomUUID().toString(); > ctx.collect(data); > LOG.info("subtask_index = {}, emit string = {}", index, data); > Thread.sleep(50); > if (index == subtask_nums / 2) { > running = false; > LOG.info("subtask_index = {}, finished.", index); > } > } > } > @Override > public void cancel() { > running = false; > } > } > } > {code} > 3. observer jm and tm logs can be found. > *taskmanager.log* > {code:java} > 2018-06-21 17:05:54,144 INFO > com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction - > subtask_index = 2, emit string = 5b0c2467-ad2e-4b53-b1a4-7a0f64560570 > 2018-06-21 17:05:54,151 INFO > com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction - > subtask_index = 0, emit string = 11af78b3-59ea-467c-a267-7c2238e44ffe > 2018-06-21 17:05:54,195 INFO > com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction - > subtask_index = 2, finished. > 2018-06-21 17:05:54,200 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom Source (3/4) > (6b2a374bec5f31112811613537dd4fd9) switched from RUNNING to FINISHED. > 2018-06-21 17:05:54,201 INFO org.apache.flink.runtime.taskmanager.Task > - Freeing task resources for Source: Custom Source (3/4) > (6b2a374bec5f31112811613537dd4fd9). > 2018-06-21 17:05:54,201 INFO org.apache.flink.runtime.taskmanager.Task > - Ensuring all FileSystem streams are closed for task Source: > Custom Source (3/4) (6b2a374bec5f31112811613537dd4fd9) [FINISHED] > 2018-06-21 17:05:54,202 INFO org.apache.flink.yarn.YarnTaskManager > - Un-registering task and sending final execution state > FINISHED to JobManager for task Source: Custom Source > (6b2a374bec5f31112811613537dd4fd9) > 2018-06-21 17:05:54,211 INFO > com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction - > subtask_index = 0, emit string = f29f48fd-ca53-4a96-b596-93948c09581d > {code} > *jobmanager.log* > {code:java} > 2018-06-21 17:05:52,682 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed > (2/4) (3aee8bf5103065e8b57ebd0e214141ae) switched from DEPLOYING to RUNNING. > 2018-06-21 17:05:52,683 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- Map (2/4) > (de90106d04f63cb9ea531308202e233f) switched from DEPLOYING to RUNNING. > 2018-06-21 17:05:54,219 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: > Custom Source (3/4) (6b2a374bec5f31112811613537dd4fd9) switched from RUNNING > to FINISHED.
[jira] [Issue Comment Deleted] (FLINK-9091) Failure while enforcing releasability in building flink-json module
[ https://issues.apache.org/jira/browse/FLINK-9091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-9091: -- Comment: was deleted (was: What should be the next step ? Thanks) > Failure while enforcing releasability in building flink-json module > --- > > Key: FLINK-9091 > URL: https://issues.apache.org/jira/browse/FLINK-9091 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Ted Yu >Assignee: Hai Zhou >Priority: Major > Attachments: f-json.out > > > Got the following when building flink-json module: > {code} > [WARNING] Rule 0: org.apache.maven.plugins.enforcer.DependencyConvergence > failed with message: > Failed while enforcing releasability. See above detailed error message. > ... > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-enforcer-plugin:3.0.0-M1:enforce > (dependency-convergence) on project flink-json: Some Enforcer rules have > failed. Look above for specific messages explaining why the rule failed. -> > [Help 1] > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9236) Use Apache Parent POM 19
[ https://issues.apache.org/jira/browse/FLINK-9236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-9236: -- Description: Flink is still using Apache Parent POM 18. Apache Parent POM 19 is out. This will also fix Javadoc generation with JDK 10+ was: Flink is still using Apache Parent POM 18. Apache Parent POM 19 is out. This will also fix Javadoc generation with JDK 10+ > Use Apache Parent POM 19 > > > Key: FLINK-9236 > URL: https://issues.apache.org/jira/browse/FLINK-9236 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Ted Yu >Assignee: Stephen Jason >Priority: Major > > Flink is still using Apache Parent POM 18. Apache Parent POM 19 is out. > This will also fix Javadoc generation with JDK 10+ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-9393) LocatableInputSplit#hashCode should take hostnames into account
[ https://issues.apache.org/jira/browse/FLINK-9393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu resolved FLINK-9393. --- Resolution: Later > LocatableInputSplit#hashCode should take hostnames into account > --- > > Key: FLINK-9393 > URL: https://issues.apache.org/jira/browse/FLINK-9393 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: vinoyang >Priority: Major > > Currently: > {code} > public int hashCode() { > return this.splitNumber; > {code} > This is not symmetrical with {{equals}} method where hostnames are compared. > LocatableInputSplit#hashCode should take hostnames into account. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9363) Bump up the Jackson version
[ https://issues.apache.org/jira/browse/FLINK-9363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-9363: -- Labels: security (was: ) > Bump up the Jackson version > --- > > Key: FLINK-9363 > URL: https://issues.apache.org/jira/browse/FLINK-9363 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Assignee: vinoyang >Priority: Major > Labels: security > > CVE's for Jackson: > CVE-2017-17485 > CVE-2018-5968 > CVE-2018-7489 > We can upgrade to 2.9.5 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-7588) Document RocksDB tuning for spinning disks
[ https://issues.apache.org/jira/browse/FLINK-7588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258309#comment-16258309 ] Ted Yu edited comment on FLINK-7588 at 6/29/18 2:56 AM: bq. Be careful about whether you have enough memory to keep all bloom filters Other than the above being tricky, the other guidelines are actionable . was (Author: yuzhih...@gmail.com): bq. Be careful about whether you have enough memory to keep all bloom filters Other than the above being tricky, the other guidelines are actionable. > Document RocksDB tuning for spinning disks > -- > > Key: FLINK-7588 > URL: https://issues.apache.org/jira/browse/FLINK-7588 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Ted Yu >Priority: Major > Labels: performance > > In docs/ops/state/large_state_tuning.md , it was mentioned that: > bq. the default configuration is tailored towards SSDs and performs > suboptimal on spinning disks > We should add recommendation targeting spinning disks: > https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#difference-of-spinning-disk -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi
Github user xueyumusic commented on a diff in the pull request: https://github.com/apache/flink/pull/6188#discussion_r199040249 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala --- @@ -1029,6 +1029,29 @@ object temporalOverlaps { TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal) } } +/** + * Adds a (signed) integer interval to a timestamp. The unit for the interval is given + * by the unit argument, which should be one of the following values: "SECOND", "MINUTE", + * "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR". + * + * e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to "2003-01-09". + */ +object timestampAdd { + + /** +* Adds a (signed) integer interval to a timestamp. The unit for the interval is given +* by the unit argument, which should be one of the following values: "SECOND", "MINUTE", +* "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR". +* +* e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to "2003-01-09". + */ + def apply( + unit: Expression, --- End diff -- thanks for your review and advice, @fhueske , yes, At the beginning I had thought these two kinds of function signature, and chose the current one following the sql signature. I think your suggestions is reasonable, it follows the table api style and makes the code concise. There are two problems that I found are: one is that `1.quarter` has been realized in other meaning (Quarter table api) and another is `1.week` seems to be not existed right now (maybe we can add).Thank you ---
[jira] [Commented] (FLINK-6846) Add TIMESTAMPADD supported in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-6846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527093#comment-16527093 ] ASF GitHub Bot commented on FLINK-6846: --- Github user xueyumusic commented on a diff in the pull request: https://github.com/apache/flink/pull/6188#discussion_r199040249 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala --- @@ -1029,6 +1029,29 @@ object temporalOverlaps { TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal) } } +/** + * Adds a (signed) integer interval to a timestamp. The unit for the interval is given + * by the unit argument, which should be one of the following values: "SECOND", "MINUTE", + * "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR". + * + * e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to "2003-01-09". + */ +object timestampAdd { + + /** +* Adds a (signed) integer interval to a timestamp. The unit for the interval is given +* by the unit argument, which should be one of the following values: "SECOND", "MINUTE", +* "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR". +* +* e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to "2003-01-09". + */ + def apply( + unit: Expression, --- End diff -- thanks for your review and advice, @fhueske , yes, At the beginning I had thought these two kinds of function signature, and chose the current one following the sql signature. I think your suggestions is reasonable, it follows the table api style and makes the code concise. There are two problems that I found are: one is that `1.quarter` has been realized in other meaning (Quarter table api) and another is `1.week` seems to be not existed right now (maybe we can add).Thank you > Add TIMESTAMPADD supported in TableAPI > -- > > Key: FLINK-6846 > URL: https://issues.apache.org/jira/browse/FLINK-6846 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Labels: pull-request-available, starter > > See FLINK-6811 for detail. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9666) short-circuit logic should be used in boolean contexts
[ https://issues.apache.org/jira/browse/FLINK-9666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527083#comment-16527083 ] ASF GitHub Bot commented on FLINK-9666: --- Github user lamber-ken commented on the issue: https://github.com/apache/flink/pull/6212 hi, @tillrohrmann , what does this mean? I don't understand, need I delete the branch? ![image](https://user-images.githubusercontent.com/20113411/42070083-75c684f2-7b87-11e8-96d8-575e11345eff.png) > short-circuit logic should be used in boolean contexts > -- > > Key: FLINK-9666 > URL: https://issues.apache.org/jira/browse/FLINK-9666 > Project: Flink > Issue Type: Improvement > Components: Core, DataStream API >Affects Versions: 1.5.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Minor > Labels: pull-request-available > Fix For: 1.6.0 > > > short-circuit logic should be used in boolean contexts -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6212: [FLINK-9666] short-circuit logic should be used in boolea...
Github user lamber-ken commented on the issue: https://github.com/apache/flink/pull/6212 hi, @tillrohrmann , what does this mean? I don't understand, need I delete the branch? ![image](https://user-images.githubusercontent.com/20113411/42070083-75c684f2-7b87-11e8-96d8-575e11345eff.png) ---
[jira] [Commented] (FLINK-9567) Flink does not release resource in Yarn Cluster mode
[ https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527046#comment-16527046 ] ASF GitHub Bot commented on FLINK-9567: --- Github user Clark commented on a diff in the pull request: https://github.com/apache/flink/pull/6192#discussion_r199036840 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -334,8 +335,11 @@ public void onContainersCompleted(final List list) { if (yarnWorkerNode != null) { // Container completed unexpectedly ~> start a new one final Container container = yarnWorkerNode.getContainer(); - requestYarnContainer(container.getResource(), yarnWorkerNode.getContainer().getPriority()); - closeTaskManagerConnection(resourceId, new Exception(containerStatus.getDiagnostics())); + // check WorkerRegistration status to avoid requesting containers more than required + if (checkWorkerRegistrationWithResourceId(resourceId)) { --- End diff -- Yes, I might happen. The problem is not as easy as I thought. The actual cause of this problem is the resource was released before a full restart but the onContainerCompleted callback method happened after the full restart. As the full restart will requesting all the containers needed as configured, if the onContainerCompleted method was called after that, it will request for a new container and possess it which is not needed. > Flink does not release resource in Yarn Cluster mode > > > Key: FLINK-9567 > URL: https://issues.apache.org/jira/browse/FLINK-9567 > Project: Flink > Issue Type: Bug > Components: Cluster Management, YARN >Affects Versions: 1.5.0 >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Critical > Labels: pull-request-available > Fix For: 1.6.0 > > Attachments: FlinkYarnProblem, fulllog.txt > > > After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not > release task manager containers in some specific case. In the worst case, I > had a job configured to 5 task managers, but possess more than 100 containers > in the end. Although the task didn't failed, but it affect other jobs in the > Yarn Cluster. > In the first log I posted, the container with id 24 is the reason why Yarn > did not release resources. As the container was killed before restart, but it > has not received the callback of *onContainerComplete* in > *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. > After restart, as we can see in line 347 of FlinkYarnProblem log, > 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - > Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has > failed, address is now gated for [50] ms. Reason: [Disassociated] > Flink lost the connection of container 24 which is on bd-r1hdp69 machine. > When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it > did not has the connection to TaskManager on container 24, so it just ignore > the close of TaskManger. > 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No > open TaskExecutor connection container_1528707394163_29461_02_24. > Ignoring close TaskExecutor connection. > However, bafore calling *closeTaskManagerConnection,* it already called > *requestYarnContainer* which lead to *numPendingContainerRequests variable > in* *YarnResourceManager* increased by 1. > As the excessive container return is determined by the > *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot > return this container although it is not required. Meanwhile, the restart > logic has already allocated enough containers for Task Managers, Flink will > possess the extra container for a long time for nothing. > In the full log, the job ended with 7 containers while only 3 are running > TaskManagers. > ps: Another strange thing I found is that when sometimes request for a yarn > container, it will return much more than requested. Is it a normal scenario > for AMRMAsyncClient? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6192: [FLINK-9567][runtime][yarn] Fix the bug that Flink...
Github user Clark commented on a diff in the pull request: https://github.com/apache/flink/pull/6192#discussion_r199036840 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -334,8 +335,11 @@ public void onContainersCompleted(final List list) { if (yarnWorkerNode != null) { // Container completed unexpectedly ~> start a new one final Container container = yarnWorkerNode.getContainer(); - requestYarnContainer(container.getResource(), yarnWorkerNode.getContainer().getPriority()); - closeTaskManagerConnection(resourceId, new Exception(containerStatus.getDiagnostics())); + // check WorkerRegistration status to avoid requesting containers more than required + if (checkWorkerRegistrationWithResourceId(resourceId)) { --- End diff -- Yes, I might happen. The problem is not as easy as I thought. The actual cause of this problem is the resource was released before a full restart but the onContainerCompleted callback method happened after the full restart. As the full restart will requesting all the containers needed as configured, if the onContainerCompleted method was called after that, it will request for a new container and possess it which is not needed. ---
[jira] [Commented] (FLINK-9567) Flink does not release resource in Yarn Cluster mode
[ https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527034#comment-16527034 ] ASF GitHub Bot commented on FLINK-9567: --- Github user Clark commented on a diff in the pull request: https://github.com/apache/flink/pull/6192#discussion_r199035271 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java --- @@ -421,4 +425,139 @@ public void testDeleteApplicationFiles() throws Exception { assertFalse("YARN application directory was not removed", Files.exists(applicationDir.toPath())); }}; } + + @Test + public void testOnContainerCompleted() throws Exception { + new Context() {{ + startResourceManager(); + CompletableFuture registerSlotRequestFuture = resourceManager.runInMainThread(() -> { + rmServices.slotManager.registerSlotRequest( + new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost)); + return null; + }); + // wait for the registerSlotRequest completion + registerSlotRequestFuture.get(); + // Callback from YARN when container is allocated. + Container testingContainer = mock(Container.class); + when(testingContainer.getId()).thenReturn( + ContainerId.newInstance( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(System.currentTimeMillis(), 1), + 1), + 1)); + when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234)); + when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1)); + when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED); + + ImmutableList testingContainerList = ImmutableList.of(testingContainer); + resourceManager.onContainersAllocated(testingContainerList); + verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); + verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); + + // Remote task executor registers with YarnResourceManager. + TaskExecutorGateway mockTaskExecutorGateway = mock(TaskExecutorGateway.class); --- End diff -- Sure, I will modify it later. > Flink does not release resource in Yarn Cluster mode > > > Key: FLINK-9567 > URL: https://issues.apache.org/jira/browse/FLINK-9567 > Project: Flink > Issue Type: Bug > Components: Cluster Management, YARN >Affects Versions: 1.5.0 >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Critical > Labels: pull-request-available > Fix For: 1.6.0 > > Attachments: FlinkYarnProblem, fulllog.txt > > > After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not > release task manager containers in some specific case. In the worst case, I > had a job configured to 5 task managers, but possess more than 100 containers > in the end. Although the task didn't failed, but it affect other jobs in the > Yarn Cluster. > In the first log I posted, the container with id 24 is the reason why Yarn > did not release resources. As the container was killed before restart, but it > has not received the callback of *onContainerComplete* in > *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. > After restart, as we can see in line 347 of FlinkYarnProblem log, > 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - > Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has > failed, address is now gated for [50] ms. Reason: [Disassociated] > Flink lost the connection of container 24 which is on bd-r1hdp69 machine. > When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it > did not has the connection to TaskManager on container 24, so it just ignore > the close of TaskManger. > 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No > open TaskExecutor connection container_1528707394163_29461_02_24. > Ignoring close TaskExecutor connection. > However, bafore calling *closeTaskManagerConnection,* it already called >
[GitHub] flink pull request #6192: [FLINK-9567][runtime][yarn] Fix the bug that Flink...
Github user Clark commented on a diff in the pull request: https://github.com/apache/flink/pull/6192#discussion_r199035271 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java --- @@ -421,4 +425,139 @@ public void testDeleteApplicationFiles() throws Exception { assertFalse("YARN application directory was not removed", Files.exists(applicationDir.toPath())); }}; } + + @Test + public void testOnContainerCompleted() throws Exception { + new Context() {{ + startResourceManager(); + CompletableFuture registerSlotRequestFuture = resourceManager.runInMainThread(() -> { + rmServices.slotManager.registerSlotRequest( + new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost)); + return null; + }); + // wait for the registerSlotRequest completion + registerSlotRequestFuture.get(); + // Callback from YARN when container is allocated. + Container testingContainer = mock(Container.class); + when(testingContainer.getId()).thenReturn( + ContainerId.newInstance( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(System.currentTimeMillis(), 1), + 1), + 1)); + when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234)); + when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1)); + when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED); + + ImmutableList testingContainerList = ImmutableList.of(testingContainer); + resourceManager.onContainersAllocated(testingContainerList); + verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); + verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); + + // Remote task executor registers with YarnResourceManager. + TaskExecutorGateway mockTaskExecutorGateway = mock(TaskExecutorGateway.class); --- End diff -- Sure, I will modify it later. ---
[jira] [Commented] (FLINK-9567) Flink does not release resource in Yarn Cluster mode
[ https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527033#comment-16527033 ] ASF GitHub Bot commented on FLINK-9567: --- Github user Clark commented on a diff in the pull request: https://github.com/apache/flink/pull/6192#discussion_r199035236 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -1120,5 +1120,28 @@ public void reportPayload(ResourceID resourceID, Void payload) { return CompletableFuture.completedFuture(null); } } + + // + // Work Registration status checking + // + + /** +* Check if the executor with given resourceID is still in taskExecutors map +* @param resourceID an ID mapping to a task executor +* @return +*/ + protected boolean checkWorkerRegistrationWithResourceId(ResourceID resourceID) { + boolean status = taskExecutors.containsKey(resourceID); + if (!status) { + log.debug("No open TaskExecutor connection {}. Ignoring close TaskExecutor connection.", resourceID); + } + return status; + } + + @VisibleForTesting + public void triggerTaskManagerHeartbeatTimeout(ResourceID resourceID) { --- End diff -- OK. > Flink does not release resource in Yarn Cluster mode > > > Key: FLINK-9567 > URL: https://issues.apache.org/jira/browse/FLINK-9567 > Project: Flink > Issue Type: Bug > Components: Cluster Management, YARN >Affects Versions: 1.5.0 >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Critical > Labels: pull-request-available > Fix For: 1.6.0 > > Attachments: FlinkYarnProblem, fulllog.txt > > > After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not > release task manager containers in some specific case. In the worst case, I > had a job configured to 5 task managers, but possess more than 100 containers > in the end. Although the task didn't failed, but it affect other jobs in the > Yarn Cluster. > In the first log I posted, the container with id 24 is the reason why Yarn > did not release resources. As the container was killed before restart, but it > has not received the callback of *onContainerComplete* in > *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. > After restart, as we can see in line 347 of FlinkYarnProblem log, > 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - > Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has > failed, address is now gated for [50] ms. Reason: [Disassociated] > Flink lost the connection of container 24 which is on bd-r1hdp69 machine. > When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it > did not has the connection to TaskManager on container 24, so it just ignore > the close of TaskManger. > 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No > open TaskExecutor connection container_1528707394163_29461_02_24. > Ignoring close TaskExecutor connection. > However, bafore calling *closeTaskManagerConnection,* it already called > *requestYarnContainer* which lead to *numPendingContainerRequests variable > in* *YarnResourceManager* increased by 1. > As the excessive container return is determined by the > *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot > return this container although it is not required. Meanwhile, the restart > logic has already allocated enough containers for Task Managers, Flink will > possess the extra container for a long time for nothing. > In the full log, the job ended with 7 containers while only 3 are running > TaskManagers. > ps: Another strange thing I found is that when sometimes request for a yarn > container, it will return much more than requested. Is it a normal scenario > for AMRMAsyncClient? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6192: [FLINK-9567][runtime][yarn] Fix the bug that Flink...
Github user Clark commented on a diff in the pull request: https://github.com/apache/flink/pull/6192#discussion_r199035236 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -1120,5 +1120,28 @@ public void reportPayload(ResourceID resourceID, Void payload) { return CompletableFuture.completedFuture(null); } } + + // + // Work Registration status checking + // + + /** +* Check if the executor with given resourceID is still in taskExecutors map +* @param resourceID an ID mapping to a task executor +* @return +*/ + protected boolean checkWorkerRegistrationWithResourceId(ResourceID resourceID) { + boolean status = taskExecutors.containsKey(resourceID); + if (!status) { + log.debug("No open TaskExecutor connection {}. Ignoring close TaskExecutor connection.", resourceID); + } + return status; + } + + @VisibleForTesting + public void triggerTaskManagerHeartbeatTimeout(ResourceID resourceID) { --- End diff -- OK. ---
[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527026#comment-16527026 ] ASF GitHub Bot commented on FLINK-9456: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6132 @tillrohrmann Thanks for your review and good suggestions, changing the code according to it. > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6132: [FLINK-9456][Distributed Coordination]Let ResourceManager...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6132 @tillrohrmann Thanks for your review and good suggestions, changing the code according to it. ---
[jira] [Commented] (FLINK-9687) Delay the state fetch only when the triggerResult is fire
[ https://issues.apache.org/jira/browse/FLINK-9687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526910#comment-16526910 ] ASF GitHub Bot commented on FLINK-9687: --- Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6224 The failed info in travis error shows the test with `checkClusterEmpty` is wrong, Is this happened due to the reuse of yarn cluster? It seems unrelated with this pull request. > Delay the state fetch only when the triggerResult is fire > - > > Key: FLINK-9687 > URL: https://issues.apache.org/jira/browse/FLINK-9687 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > When the window operator is fired by the event timer or processing timer, it > fetch the state content first. I think it only need to fetch the content from > windowState when the triggerResult is Fire. So we have to change the order to > avoid this cost ( the cost of fetch content from state is more than judge the > triggerResult). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9687) Delay the state fetch only when the triggerResult is fire
[ https://issues.apache.org/jira/browse/FLINK-9687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9687: -- Labels: pull-request-available (was: ) > Delay the state fetch only when the triggerResult is fire > - > > Key: FLINK-9687 > URL: https://issues.apache.org/jira/browse/FLINK-9687 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > When the window operator is fired by the event timer or processing timer, it > fetch the state content first. I think it only need to fetch the content from > windowState when the triggerResult is Fire. So we have to change the order to > avoid this cost ( the cost of fetch content from state is more than judge the > triggerResult). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6224: [FLINK-9687]Delay the state fetch only when the triggerRe...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6224 The failed info in travis error shows the test with `checkClusterEmpty` is wrong, Is this happened due to the reuse of yarn cluster? It seems unrelated with this pull request. ---
[jira] [Created] (FLINK-9691) Modify run loop in Kinesis ShardConsumer to not sleep for a fixed fetchIntervalMillis
Lakshmi Rao created FLINK-9691: -- Summary: Modify run loop in Kinesis ShardConsumer to not sleep for a fixed fetchIntervalMillis Key: FLINK-9691 URL: https://issues.apache.org/jira/browse/FLINK-9691 Project: Flink Issue Type: Improvement Components: Kinesis Connector Reporter: Lakshmi Rao Currently the ShardConsumer in the Kinesis connector sleeps for a fixed [fetchIntervalMillis|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L210] resulting in the shard consumer sleeping for more time than necessary and not optimally reading from Kinesis. It should only be sleeping for (fetchIntervalMillis - time taken to process records) before making the subsequent getRecords call. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9690) Restoring state with FlinkKafkaProducer and Kafka 1.1.0 client fails
[ https://issues.apache.org/jira/browse/FLINK-9690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526709#comment-16526709 ] Piotr Nowojski commented on FLINK-9690: --- Yep, it looks so. Once I was trying to upgrade our connector to Kafka 1.0.0 and it required (very minor) changes in FlinkKafkaProducer. I have never committed it because there were also some failures in our consumer tests that I didn't have time to fix. From the stack trace it looks like there were even further changes in 1.1.0, don't know how big and easy to fix. However what's the actual problem? We have never said that our connector supports using Kafka producers > 0.11.2.0. > Restoring state with FlinkKafkaProducer and Kafka 1.1.0 client fails > > > Key: FLINK-9690 > URL: https://issues.apache.org/jira/browse/FLINK-9690 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.2 >Reporter: Ufuk Celebi >Priority: Major > > Restoring a job from a savepoint that includes {{FlinkKafkaProducer}} > packaged with {{kafka.version}} set to {{1.1.0}} in Flink 1.4.2. > {code} > java.lang.RuntimeException: Incompatible KafkaProducer version > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.getValue(FlinkKafkaProducer.java:301) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.getValue(FlinkKafkaProducer.java:292) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.resumeTransaction(FlinkKafkaProducer.java:195) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:723) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:93) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.recoverAndCommitInternal(TwoPhaseCommitSinkFunction.java:370) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:330) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:856) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NoSuchFieldException: sequenceNumbers > at java.lang.Class.getDeclaredField(Class.java:2070) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.getValue(FlinkKafkaProducer.java:297) > ... 16 more > {code} > [~pnowojski] Any ideas about this issue? Judging from the stack trace it was > anticipated that reflective access might break with Kafka versions > 0.11.2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526671#comment-16526671 ] ASF GitHub Bot commented on FLINK-9514: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r198947650 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.functions.FoldFunction; + +/** + * This class wraps folding function with TTL logic. + * + * @param Type of the values folded into the state + * @param Type of the value in the state + * + * @deprecated use {@link TtlAggregateFunction} instead + */ +@Deprecated +class TtlFoldFunction + extends AbstractTtlDecorator> + implements FoldFunction> { + TtlFoldFunction(FoldFunction original, TtlConfig config, TtlTimeProvider timeProvider) { + super(original, config, timeProvider); + } + + @Override + public TtlValue fold(TtlValue accumulator, T value) throws Exception { + return wrapWithTs(original.fold(getUnexpried(accumulator), value)); --- End diff -- It should be covered with `updateExpired` in `testExactExpirationOnWrite` > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r198947650 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.functions.FoldFunction; + +/** + * This class wraps folding function with TTL logic. + * + * @param Type of the values folded into the state + * @param Type of the value in the state + * + * @deprecated use {@link TtlAggregateFunction} instead + */ +@Deprecated +class TtlFoldFunction + extends AbstractTtlDecorator> + implements FoldFunction> { + TtlFoldFunction(FoldFunction original, TtlConfig config, TtlTimeProvider timeProvider) { + super(original, config, timeProvider); + } + + @Override + public TtlValue fold(TtlValue accumulator, T value) throws Exception { + return wrapWithTs(original.fold(getUnexpried(accumulator), value)); --- End diff -- It should be covered with `updateExpired` in `testExactExpirationOnWrite` ---
[jira] [Created] (FLINK-9690) Restoring state with FlinkKafkaProducer and Kafka 1.1.0 client fails
Ufuk Celebi created FLINK-9690: -- Summary: Restoring state with FlinkKafkaProducer and Kafka 1.1.0 client fails Key: FLINK-9690 URL: https://issues.apache.org/jira/browse/FLINK-9690 Project: Flink Issue Type: Improvement Components: Kafka Connector Affects Versions: 1.4.2 Reporter: Ufuk Celebi Restoring a job from a savepoint that includes {{FlinkKafkaProducer}} packaged with {{kafka.version}} set to {{1.1.0}} in Flink 1.4.2. {code} java.lang.RuntimeException: Incompatible KafkaProducer version at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.getValue(FlinkKafkaProducer.java:301) at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.getValue(FlinkKafkaProducer.java:292) at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.resumeTransaction(FlinkKafkaProducer.java:195) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:723) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:93) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.recoverAndCommitInternal(TwoPhaseCommitSinkFunction.java:370) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:330) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:856) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NoSuchFieldException: sequenceNumbers at java.lang.Class.getDeclaredField(Class.java:2070) at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.getValue(FlinkKafkaProducer.java:297) ... 16 more {code} [~pnowojski] Any ideas about this issue? Judging from the stack trace it was anticipated that reflective access might break with Kafka versions > 0.11.2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6226: [FLINK-8650] Tests for WINDOW clause and documenta...
Github user snuyanzin commented on a diff in the pull request: https://github.com/apache/flink/pull/6226#discussion_r198928207 --- Diff: docs/dev/table/sql.md --- @@ -115,6 +115,10 @@ The following BNF-grammar describes the superset of supported SQL features in ba {% highlight sql %} +insert: + INSERT INTO tableReference --- End diff -- Move up to be sync with the similar Calcite's doc ---
[jira] [Commented] (FLINK-8650) Add tests and documentation for WINDOW clause
[ https://issues.apache.org/jira/browse/FLINK-8650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526602#comment-16526602 ] ASF GitHub Bot commented on FLINK-8650: --- Github user snuyanzin commented on a diff in the pull request: https://github.com/apache/flink/pull/6226#discussion_r198928207 --- Diff: docs/dev/table/sql.md --- @@ -115,6 +115,10 @@ The following BNF-grammar describes the superset of supported SQL features in ba {% highlight sql %} +insert: + INSERT INTO tableReference --- End diff -- Move up to be sync with the similar Calcite's doc > Add tests and documentation for WINDOW clause > - > > Key: FLINK-8650 > URL: https://issues.apache.org/jira/browse/FLINK-8650 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Timo Walther >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > We support queries with a {{WINDOW}} clause like: > {code} > SELECT a, SUM(c) OVER w, MIN(c) OVER w FROM MyTable WINDOW w AS (PARTITION BY > a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) > {code} > But this is neither documented nor tested. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8650) Add tests and documentation for WINDOW clause
[ https://issues.apache.org/jira/browse/FLINK-8650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526601#comment-16526601 ] ASF GitHub Bot commented on FLINK-8650: --- Github user snuyanzin commented on a diff in the pull request: https://github.com/apache/flink/pull/6226#discussion_r198928117 --- Diff: docs/dev/table/sql.md --- @@ -139,7 +143,8 @@ select: [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] [ HAVING booleanExpression ] - + [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ] --- End diff -- Done in a way Calcite does it > Add tests and documentation for WINDOW clause > - > > Key: FLINK-8650 > URL: https://issues.apache.org/jira/browse/FLINK-8650 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Timo Walther >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > We support queries with a {{WINDOW}} clause like: > {code} > SELECT a, SUM(c) OVER w, MIN(c) OVER w FROM MyTable WINDOW w AS (PARTITION BY > a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) > {code} > But this is neither documented nor tested. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6226: [FLINK-8650] Tests for WINDOW clause and documenta...
Github user snuyanzin commented on a diff in the pull request: https://github.com/apache/flink/pull/6226#discussion_r198928117 --- Diff: docs/dev/table/sql.md --- @@ -139,7 +143,8 @@ select: [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] [ HAVING booleanExpression ] - + [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ] --- End diff -- Done in a way Calcite does it ---
[jira] [Updated] (FLINK-8650) Add tests and documentation for WINDOW clause
[ https://issues.apache.org/jira/browse/FLINK-8650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-8650: -- Labels: pull-request-available (was: ) > Add tests and documentation for WINDOW clause > - > > Key: FLINK-8650 > URL: https://issues.apache.org/jira/browse/FLINK-8650 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Timo Walther >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > We support queries with a {{WINDOW}} clause like: > {code} > SELECT a, SUM(c) OVER w, MIN(c) OVER w FROM MyTable WINDOW w AS (PARTITION BY > a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) > {code} > But this is neither documented nor tested. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8650) Add tests and documentation for WINDOW clause
[ https://issues.apache.org/jira/browse/FLINK-8650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526597#comment-16526597 ] ASF GitHub Bot commented on FLINK-8650: --- GitHub user snuyanzin opened a pull request: https://github.com/apache/flink/pull/6226 [FLINK-8650] Tests for WINDOW clause and documentation update *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-] [component] Title of the pull request", where *FLINK-* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change test and documentation coverage of WINDOW clause ## Brief change log - *Test that the same queries but with different specification of windows have the same plan* - *Mentioning in doc WINDOW syntax* ## Verifying this change This change added tests and can be verified as follows: via running of org.apache.flink.table.api.stream.sql.OverWindowTest ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/snuyanzin/flink FLINK_8560 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6226.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6226 commit 85451a2f9e6d1cd5d8b98fa2bba5c92326e817ce Author: snuyanzin Date: 2018-06-28T16:19:25Z [FLINK-8650] Tests for WINDOW clause and documentation update > Add tests and documentation for WINDOW clause > - > > Key: FLINK-8650 > URL: https://issues.apache.org/jira/browse/FLINK-8650 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Timo Walther >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > We support queries with a {{WINDOW}} clause like: > {code} > SELECT a, SUM(c) OVER w, MIN(c) OVER w FROM
[GitHub] flink pull request #6226: [FLINK-8650] Tests for WINDOW clause and documenta...
GitHub user snuyanzin opened a pull request: https://github.com/apache/flink/pull/6226 [FLINK-8650] Tests for WINDOW clause and documentation update *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-] [component] Title of the pull request", where *FLINK-* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change test and documentation coverage of WINDOW clause ## Brief change log - *Test that the same queries but with different specification of windows have the same plan* - *Mentioning in doc WINDOW syntax* ## Verifying this change This change added tests and can be verified as follows: via running of org.apache.flink.table.api.stream.sql.OverWindowTest ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/snuyanzin/flink FLINK_8560 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6226.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6226 commit 85451a2f9e6d1cd5d8b98fa2bba5c92326e817ce Author: snuyanzin Date: 2018-06-28T16:19:25Z [FLINK-8650] Tests for WINDOW clause and documentation update ---
[jira] [Commented] (FLINK-9567) Flink does not release resource in Yarn Cluster mode
[ https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526563#comment-16526563 ] ASF GitHub Bot commented on FLINK-9567: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6192#discussion_r198914433 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -1120,5 +1120,28 @@ public void reportPayload(ResourceID resourceID, Void payload) { return CompletableFuture.completedFuture(null); } } + + // + // Work Registration status checking + // + + /** +* Check if the executor with given resourceID is still in taskExecutors map +* @param resourceID an ID mapping to a task executor +* @return +*/ + protected boolean checkWorkerRegistrationWithResourceId(ResourceID resourceID) { + boolean status = taskExecutors.containsKey(resourceID); + if (!status) { + log.debug("No open TaskExecutor connection {}. Ignoring close TaskExecutor connection.", resourceID); + } + return status; + } + + @VisibleForTesting + public void triggerTaskManagerHeartbeatTimeout(ResourceID resourceID) { --- End diff -- Let's not add this method which is only used for testing purposes to the production code. Instead, I would suggest to subclass `ResourceManager` in your test and add this method. > Flink does not release resource in Yarn Cluster mode > > > Key: FLINK-9567 > URL: https://issues.apache.org/jira/browse/FLINK-9567 > Project: Flink > Issue Type: Bug > Components: Cluster Management, YARN >Affects Versions: 1.5.0 >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Critical > Labels: pull-request-available > Fix For: 1.6.0 > > Attachments: FlinkYarnProblem, fulllog.txt > > > After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not > release task manager containers in some specific case. In the worst case, I > had a job configured to 5 task managers, but possess more than 100 containers > in the end. Although the task didn't failed, but it affect other jobs in the > Yarn Cluster. > In the first log I posted, the container with id 24 is the reason why Yarn > did not release resources. As the container was killed before restart, but it > has not received the callback of *onContainerComplete* in > *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. > After restart, as we can see in line 347 of FlinkYarnProblem log, > 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - > Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has > failed, address is now gated for [50] ms. Reason: [Disassociated] > Flink lost the connection of container 24 which is on bd-r1hdp69 machine. > When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it > did not has the connection to TaskManager on container 24, so it just ignore > the close of TaskManger. > 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No > open TaskExecutor connection container_1528707394163_29461_02_24. > Ignoring close TaskExecutor connection. > However, bafore calling *closeTaskManagerConnection,* it already called > *requestYarnContainer* which lead to *numPendingContainerRequests variable > in* *YarnResourceManager* increased by 1. > As the excessive container return is determined by the > *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot > return this container although it is not required. Meanwhile, the restart > logic has already allocated enough containers for Task Managers, Flink will > possess the extra container for a long time for nothing. > In the full log, the job ended with 7 containers while only 3 are running > TaskManagers. > ps: Another strange thing I found is that when sometimes request for a yarn > container, it will return much more than requested. Is it a normal scenario > for AMRMAsyncClient? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9567) Flink does not release resource in Yarn Cluster mode
[ https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526564#comment-16526564 ] ASF GitHub Bot commented on FLINK-9567: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6192#discussion_r198915014 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java --- @@ -421,4 +425,139 @@ public void testDeleteApplicationFiles() throws Exception { assertFalse("YARN application directory was not removed", Files.exists(applicationDir.toPath())); }}; } + + @Test + public void testOnContainerCompleted() throws Exception { + new Context() {{ + startResourceManager(); + CompletableFuture registerSlotRequestFuture = resourceManager.runInMainThread(() -> { + rmServices.slotManager.registerSlotRequest( + new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost)); + return null; + }); + // wait for the registerSlotRequest completion + registerSlotRequestFuture.get(); + // Callback from YARN when container is allocated. + Container testingContainer = mock(Container.class); + when(testingContainer.getId()).thenReturn( + ContainerId.newInstance( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(System.currentTimeMillis(), 1), + 1), + 1)); + when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234)); + when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1)); + when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED); + + ImmutableList testingContainerList = ImmutableList.of(testingContainer); + resourceManager.onContainersAllocated(testingContainerList); + verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); + verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); + + // Remote task executor registers with YarnResourceManager. + TaskExecutorGateway mockTaskExecutorGateway = mock(TaskExecutorGateway.class); --- End diff -- Can we use the `TestingTaskExecutorGateway` here? > Flink does not release resource in Yarn Cluster mode > > > Key: FLINK-9567 > URL: https://issues.apache.org/jira/browse/FLINK-9567 > Project: Flink > Issue Type: Bug > Components: Cluster Management, YARN >Affects Versions: 1.5.0 >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Critical > Labels: pull-request-available > Fix For: 1.6.0 > > Attachments: FlinkYarnProblem, fulllog.txt > > > After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not > release task manager containers in some specific case. In the worst case, I > had a job configured to 5 task managers, but possess more than 100 containers > in the end. Although the task didn't failed, but it affect other jobs in the > Yarn Cluster. > In the first log I posted, the container with id 24 is the reason why Yarn > did not release resources. As the container was killed before restart, but it > has not received the callback of *onContainerComplete* in > *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. > After restart, as we can see in line 347 of FlinkYarnProblem log, > 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - > Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has > failed, address is now gated for [50] ms. Reason: [Disassociated] > Flink lost the connection of container 24 which is on bd-r1hdp69 machine. > When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it > did not has the connection to TaskManager on container 24, so it just ignore > the close of TaskManger. > 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No > open TaskExecutor connection container_1528707394163_29461_02_24. > Ignoring close TaskExecutor connection. > However, bafore calling *closeTaskManagerConnection,* it already called
[jira] [Commented] (FLINK-9567) Flink does not release resource in Yarn Cluster mode
[ https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526565#comment-16526565 ] ASF GitHub Bot commented on FLINK-9567: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6192#discussion_r198917467 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -334,8 +335,11 @@ public void onContainersCompleted(final List list) { if (yarnWorkerNode != null) { // Container completed unexpectedly ~> start a new one final Container container = yarnWorkerNode.getContainer(); - requestYarnContainer(container.getResource(), yarnWorkerNode.getContainer().getPriority()); - closeTaskManagerConnection(resourceId, new Exception(containerStatus.getDiagnostics())); + // check WorkerRegistration status to avoid requesting containers more than required + if (checkWorkerRegistrationWithResourceId(resourceId)) { --- End diff -- Wouldn't that prevent container restarts if the container failure happened before the `TaskManager` registered at the `ResourceManager`, because then, `ResourceManager#taskExecutors` would not contain the given `ResourceID`? > Flink does not release resource in Yarn Cluster mode > > > Key: FLINK-9567 > URL: https://issues.apache.org/jira/browse/FLINK-9567 > Project: Flink > Issue Type: Bug > Components: Cluster Management, YARN >Affects Versions: 1.5.0 >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Critical > Labels: pull-request-available > Fix For: 1.6.0 > > Attachments: FlinkYarnProblem, fulllog.txt > > > After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not > release task manager containers in some specific case. In the worst case, I > had a job configured to 5 task managers, but possess more than 100 containers > in the end. Although the task didn't failed, but it affect other jobs in the > Yarn Cluster. > In the first log I posted, the container with id 24 is the reason why Yarn > did not release resources. As the container was killed before restart, but it > has not received the callback of *onContainerComplete* in > *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. > After restart, as we can see in line 347 of FlinkYarnProblem log, > 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - > Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has > failed, address is now gated for [50] ms. Reason: [Disassociated] > Flink lost the connection of container 24 which is on bd-r1hdp69 machine. > When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it > did not has the connection to TaskManager on container 24, so it just ignore > the close of TaskManger. > 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No > open TaskExecutor connection container_1528707394163_29461_02_24. > Ignoring close TaskExecutor connection. > However, bafore calling *closeTaskManagerConnection,* it already called > *requestYarnContainer* which lead to *numPendingContainerRequests variable > in* *YarnResourceManager* increased by 1. > As the excessive container return is determined by the > *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot > return this container although it is not required. Meanwhile, the restart > logic has already allocated enough containers for Task Managers, Flink will > possess the extra container for a long time for nothing. > In the full log, the job ended with 7 containers while only 3 are running > TaskManagers. > ps: Another strange thing I found is that when sometimes request for a yarn > container, it will return much more than requested. Is it a normal scenario > for AMRMAsyncClient? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6192: [FLINK-9567][runtime][yarn] Fix the bug that Flink...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6192#discussion_r198914433 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -1120,5 +1120,28 @@ public void reportPayload(ResourceID resourceID, Void payload) { return CompletableFuture.completedFuture(null); } } + + // + // Work Registration status checking + // + + /** +* Check if the executor with given resourceID is still in taskExecutors map +* @param resourceID an ID mapping to a task executor +* @return +*/ + protected boolean checkWorkerRegistrationWithResourceId(ResourceID resourceID) { + boolean status = taskExecutors.containsKey(resourceID); + if (!status) { + log.debug("No open TaskExecutor connection {}. Ignoring close TaskExecutor connection.", resourceID); + } + return status; + } + + @VisibleForTesting + public void triggerTaskManagerHeartbeatTimeout(ResourceID resourceID) { --- End diff -- Let's not add this method which is only used for testing purposes to the production code. Instead, I would suggest to subclass `ResourceManager` in your test and add this method. ---
[GitHub] flink pull request #6192: [FLINK-9567][runtime][yarn] Fix the bug that Flink...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6192#discussion_r198915014 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java --- @@ -421,4 +425,139 @@ public void testDeleteApplicationFiles() throws Exception { assertFalse("YARN application directory was not removed", Files.exists(applicationDir.toPath())); }}; } + + @Test + public void testOnContainerCompleted() throws Exception { + new Context() {{ + startResourceManager(); + CompletableFuture registerSlotRequestFuture = resourceManager.runInMainThread(() -> { + rmServices.slotManager.registerSlotRequest( + new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost)); + return null; + }); + // wait for the registerSlotRequest completion + registerSlotRequestFuture.get(); + // Callback from YARN when container is allocated. + Container testingContainer = mock(Container.class); + when(testingContainer.getId()).thenReturn( + ContainerId.newInstance( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(System.currentTimeMillis(), 1), + 1), + 1)); + when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234)); + when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1)); + when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED); + + ImmutableList testingContainerList = ImmutableList.of(testingContainer); + resourceManager.onContainersAllocated(testingContainerList); + verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); + verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); + + // Remote task executor registers with YarnResourceManager. + TaskExecutorGateway mockTaskExecutorGateway = mock(TaskExecutorGateway.class); --- End diff -- Can we use the `TestingTaskExecutorGateway` here? ---
[GitHub] flink pull request #6192: [FLINK-9567][runtime][yarn] Fix the bug that Flink...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6192#discussion_r198917467 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -334,8 +335,11 @@ public void onContainersCompleted(final List list) { if (yarnWorkerNode != null) { // Container completed unexpectedly ~> start a new one final Container container = yarnWorkerNode.getContainer(); - requestYarnContainer(container.getResource(), yarnWorkerNode.getContainer().getPriority()); - closeTaskManagerConnection(resourceId, new Exception(containerStatus.getDiagnostics())); + // check WorkerRegistration status to avoid requesting containers more than required + if (checkWorkerRegistrationWithResourceId(resourceId)) { --- End diff -- Wouldn't that prevent container restarts if the container failure happened before the `TaskManager` registered at the `ResourceManager`, because then, `ResourceManager#taskExecutors` would not contain the given `ResourceID`? ---
[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526534#comment-16526534 ] ASF GitHub Bot commented on FLINK-9456: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198491333 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java --- @@ -278,4 +279,13 @@ void heartbeatFromTaskManager( * not available (yet). */ CompletableFuture requestOperatorBackPressureStats(JobVertexID jobVertexId); + + /** +* Notifies that the task manager has terminated. +* +* @param resourceID identifying the task manager +* @param allocationIDs held by this job that belong to the task manager +* @param cause of the task manager termination +*/ + void taskManagerTerminated(ResourceID resourceID, Set allocationIDs, Exception cause); --- End diff -- methods should usually be a verb. What about `notifyTaskManagerTermination`? > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9456: -- Labels: pull-request-available (was: ) > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526541#comment-16526541 ] ASF GitHub Bot commented on FLINK-9456: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198490029 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java --- @@ -45,6 +46,9 @@ /** Allocation id for which this slot has been allocated. */ private AllocationID allocationId; + /** Allocation id for which this slot has been allocated. */ + private JobID jobId; --- End diff -- Should be annotated with `@Nullable` > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526537#comment-16526537 ] ASF GitHub Bot commented on FLINK-9456: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198491884 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java --- @@ -53,4 +56,13 @@ * @param cause of the allocation failure */ void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause); + + /** +* Notifies that the task manager has been terminated. --- End diff -- line break is missing here > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198910651 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java --- @@ -53,4 +56,13 @@ * @param cause of the allocation failure */ void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause); + + /** +* Notifies that the task manager has been terminated. +* @param jobId to be notified +* @param resourceID identifying the terminated task manager +* @param allocationIDs of the job held that belong to this task manager +* @param cause of the task manager termination. +*/ + void notifyTaskManagerTerminated(JobID jobId, ResourceID resourceID, Set allocationIDs, Exception cause); --- End diff -- I think the notification about a terminated `TaskManager` should not come from the `SlotManager` but from the `ResourceManager`. Thus, we should not need this method. ---
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198491021 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java --- @@ -278,4 +279,13 @@ void heartbeatFromTaskManager( * not available (yet). */ CompletableFuture requestOperatorBackPressureStats(JobVertexID jobVertexId); + + /** +* Notifies that the task manager has terminated. +* +* @param resourceID identifying the task manager +* @param allocationIDs held by this job that belong to the task manager --- End diff -- I think this parameter is not needed. ---
[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526540#comment-16526540 ] ASF GitHub Bot commented on FLINK-9456: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198489897 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java --- @@ -619,7 +619,11 @@ public void taskTerminated(TaskMonitor.TaskTerminated message) { startNewWorker(launched.profile()); } - closeTaskManagerConnection(id, new Exception(status.getMessage())); + final Exception terminatedCause = new Exception(status.getMessage()); --- End diff -- let's call it `terminationCause` > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9456: -- Labels: pull-request-available (was: ) > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198490490 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -984,6 +985,15 @@ private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoor operatorBackPressureStats.orElse(null))); } + @Override + public void taskManagerTerminated(ResourceID resourceID, Set allocationIds, Exception cause) { --- End diff -- For what do we need the `allocationIds` parameter here? ---
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198491683 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -1120,5 +1131,14 @@ public void reportPayload(ResourceID resourceID, Void payload) { return CompletableFuture.completedFuture(null); } } + + protected void notifyTaskManagerCompleted(ResourceID resourceID, Exception cause) { + WorkerRegistration workerRegistration = taskExecutors.remove(resourceID); + if (workerRegistration != null) { + slotManager.notifyTaskManagerFailed(resourceID, workerRegistration.getInstanceID(), cause); + } else { + log.warn("TaskManager failed before registering with ResourceManager successfully."); --- End diff -- This should be a debug log message. ---
[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526535#comment-16526535 ] ASF GitHub Bot commented on FLINK-9456: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198491021 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java --- @@ -278,4 +279,13 @@ void heartbeatFromTaskManager( * not available (yet). */ CompletableFuture requestOperatorBackPressureStats(JobVertexID jobVertexId); + + /** +* Notifies that the task manager has terminated. +* +* @param resourceID identifying the task manager +* @param allocationIDs held by this job that belong to the task manager --- End diff -- I think this parameter is not needed. > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198493145 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java --- @@ -1202,6 +1209,111 @@ public void testSlotRequestFailure() throws Exception { } } + /** +* Tests notify the job manager when the task manager is failed/killed. +*/ + @Test + public void testNotifyTaskManagerFailed() throws Exception { + + final List, Exception>> notifiedTaskManagerInfos = new ArrayList<>(); + + try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), new TestingResourceActions() { + @Override + public void notifyTaskManagerTerminated(JobID jobId, ResourceID resourceID, Set allocationIDs, Exception cause) { + notifiedTaskManagerInfos.add(new Tuple4<>(jobId, resourceID, allocationIDs, cause)); + } + })) { --- End diff -- Indentation looks a bit off here ---
[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526538#comment-16526538 ] ASF GitHub Bot commented on FLINK-9456: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198912464 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -717,6 +728,32 @@ private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pe mainThreadExecutor); } + public void notifyTaskManagerFailed(ResourceID resourceID, InstanceID instanceID, Exception cause) { + final TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceID); + if (taskManagerRegistration != null) { + final HashMap> jobAndAllocationIDMap = new HashMap<>(4); + for (SlotID slotID : taskManagerRegistration.getSlots()) { + TaskManagerSlot taskManagerSlot = slots.get(slotID); + AllocationID allocationID = taskManagerSlot.getAllocationId(); + if (allocationID != null) { + JobID jobId = taskManagerSlot.getJobId(); + Set jobAllocationIDSet = jobAndAllocationIDMap.get(jobId); + if (jobAllocationIDSet == null) { + jobAllocationIDSet = new HashSet<>(2); + jobAndAllocationIDMap.put(jobId, jobAllocationIDSet); + } + jobAllocationIDSet.add(allocationID); + } + } + + for (Map.Entry> entry : jobAndAllocationIDMap.entrySet()) { + resourceActions.notifyTaskManagerTerminated(entry.getKey(), resourceID, entry.getValue(), cause); + } + } else { + LOG.warn("TaskManager failed before registering with slot manager successfully."); + } --- End diff -- This looks a little bit complicated. Moreover, I don't really like that the control flow is: ResourceManager -> SlotManager -> ResourceManager -> JobManager. What about leveraging the existing `ResourceAction#notifyAllocationFailure` method. We could say that we not only call this method in case of a failed pending slot request but also if we remove a slot. Then unregistering a `TaskManager` from the `SlotManager` would remove the slots which then would trigger for each allocated slot the `notifyAllocationFailure` message. We would then have to introduce a `JobMasterGateway#notifyAllocationFailure` which we can call from `ResourceActionsImpl#notifyAllocationFailure`. The implementation on the `JobMaster` side would then simply call `SlotPool#failAllocation`. By doing it that way, we send multiple messages (might not be ideal) but we reuse most of the existing code paths without introducing special case logic. What do you think? > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198912464 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -717,6 +728,32 @@ private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pe mainThreadExecutor); } + public void notifyTaskManagerFailed(ResourceID resourceID, InstanceID instanceID, Exception cause) { + final TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceID); + if (taskManagerRegistration != null) { + final HashMap> jobAndAllocationIDMap = new HashMap<>(4); + for (SlotID slotID : taskManagerRegistration.getSlots()) { + TaskManagerSlot taskManagerSlot = slots.get(slotID); + AllocationID allocationID = taskManagerSlot.getAllocationId(); + if (allocationID != null) { + JobID jobId = taskManagerSlot.getJobId(); + Set jobAllocationIDSet = jobAndAllocationIDMap.get(jobId); + if (jobAllocationIDSet == null) { + jobAllocationIDSet = new HashSet<>(2); + jobAndAllocationIDMap.put(jobId, jobAllocationIDSet); + } + jobAllocationIDSet.add(allocationID); + } + } + + for (Map.Entry> entry : jobAndAllocationIDMap.entrySet()) { + resourceActions.notifyTaskManagerTerminated(entry.getKey(), resourceID, entry.getValue(), cause); + } + } else { + LOG.warn("TaskManager failed before registering with slot manager successfully."); + } --- End diff -- This looks a little bit complicated. Moreover, I don't really like that the control flow is: ResourceManager -> SlotManager -> ResourceManager -> JobManager. What about leveraging the existing `ResourceAction#notifyAllocationFailure` method. We could say that we not only call this method in case of a failed pending slot request but also if we remove a slot. Then unregistering a `TaskManager` from the `SlotManager` would remove the slots which then would trigger for each allocated slot the `notifyAllocationFailure` message. We would then have to introduce a `JobMasterGateway#notifyAllocationFailure` which we can call from `ResourceActionsImpl#notifyAllocationFailure`. The implementation on the `JobMaster` side would then simply call `SlotPool#failAllocation`. By doing it that way, we send multiple messages (might not be ideal) but we reuse most of the existing code paths without introducing special case logic. What do you think? ---
[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526536#comment-16526536 ] ASF GitHub Bot commented on FLINK-9456: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198490490 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -984,6 +985,15 @@ private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoor operatorBackPressureStats.orElse(null))); } + @Override + public void taskManagerTerminated(ResourceID resourceID, Set allocationIds, Exception cause) { --- End diff -- For what do we need the `allocationIds` parameter here? > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526542#comment-16526542 ] ASF GitHub Bot commented on FLINK-9456: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198491683 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -1120,5 +1131,14 @@ public void reportPayload(ResourceID resourceID, Void payload) { return CompletableFuture.completedFuture(null); } } + + protected void notifyTaskManagerCompleted(ResourceID resourceID, Exception cause) { + WorkerRegistration workerRegistration = taskExecutors.remove(resourceID); + if (workerRegistration != null) { + slotManager.notifyTaskManagerFailed(resourceID, workerRegistration.getInstanceID(), cause); + } else { + log.warn("TaskManager failed before registering with ResourceManager successfully."); --- End diff -- This should be a debug log message. > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198491884 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java --- @@ -53,4 +56,13 @@ * @param cause of the allocation failure */ void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause); + + /** +* Notifies that the task manager has been terminated. --- End diff -- line break is missing here ---
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198490029 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java --- @@ -45,6 +46,9 @@ /** Allocation id for which this slot has been allocated. */ private AllocationID allocationId; + /** Allocation id for which this slot has been allocated. */ + private JobID jobId; --- End diff -- Should be annotated with `@Nullable` ---
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198489897 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java --- @@ -619,7 +619,11 @@ public void taskTerminated(TaskMonitor.TaskTerminated message) { startNewWorker(launched.profile()); } - closeTaskManagerConnection(id, new Exception(status.getMessage())); + final Exception terminatedCause = new Exception(status.getMessage()); --- End diff -- let's call it `terminationCause` ---
[jira] [Created] (FLINK-9689) Flink consumer deserialization example
Satheesh created FLINK-9689: --- Summary: Flink consumer deserialization example Key: FLINK-9689 URL: https://issues.apache.org/jira/browse/FLINK-9689 Project: Flink Issue Type: Improvement Reporter: Satheesh Its hard to find relevant custom deserialization example for Flink Kafka consumer. It will be much useful to add a sample program for implementing custom deserialization in the blink-examples folder. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526533#comment-16526533 ] ASF GitHub Bot commented on FLINK-9456: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198910651 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java --- @@ -53,4 +56,13 @@ * @param cause of the allocation failure */ void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause); + + /** +* Notifies that the task manager has been terminated. +* @param jobId to be notified +* @param resourceID identifying the terminated task manager +* @param allocationIDs of the job held that belong to this task manager +* @param cause of the task manager termination. +*/ + void notifyTaskManagerTerminated(JobID jobId, ResourceID resourceID, Set allocationIDs, Exception cause); --- End diff -- I think the notification about a terminated `TaskManager` should not come from the `SlotManager` but from the `ResourceManager`. Thus, we should not need this method. > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198491333 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java --- @@ -278,4 +279,13 @@ void heartbeatFromTaskManager( * not available (yet). */ CompletableFuture requestOperatorBackPressureStats(JobVertexID jobVertexId); + + /** +* Notifies that the task manager has terminated. +* +* @param resourceID identifying the task manager +* @param allocationIDs held by this job that belong to the task manager +* @param cause of the task manager termination +*/ + void taskManagerTerminated(ResourceID resourceID, Set allocationIDs, Exception cause); --- End diff -- methods should usually be a verb. What about `notifyTaskManagerTermination`? ---
[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526539#comment-16526539 ] ASF GitHub Bot commented on FLINK-9456: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198493145 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java --- @@ -1202,6 +1209,111 @@ public void testSlotRequestFailure() throws Exception { } } + /** +* Tests notify the job manager when the task manager is failed/killed. +*/ + @Test + public void testNotifyTaskManagerFailed() throws Exception { + + final List, Exception>> notifiedTaskManagerInfos = new ArrayList<>(); + + try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), new TestingResourceActions() { + @Override + public void notifyTaskManagerTerminated(JobID jobId, ResourceID resourceID, Set allocationIDs, Exception cause) { + notifiedTaskManagerInfos.add(new Tuple4<>(jobId, resourceID, allocationIDs, cause)); + } + })) { --- End diff -- Indentation looks a bit off here > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9672) Fail fatally if we cannot submit job on added JobGraph signal
[ https://issues.apache.org/jira/browse/FLINK-9672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526528#comment-16526528 ] ASF GitHub Bot commented on FLINK-9672: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6213#discussion_r198912529 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -322,9 +322,6 @@ private JobManagerRunner createJobManagerRunner(JobGraph jobGraph) throws Except @Override public CompletableFuture> listJobs(Time timeout) { - if (jobManagerRunners.isEmpty()) { --- End diff -- nice > Fail fatally if we cannot submit job on added JobGraph signal > - > > Key: FLINK-9672 > URL: https://issues.apache.org/jira/browse/FLINK-9672 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > The {{SubmittedJobGraphStore}} signals when new {{JobGraphs}} are added. If > this happens, then the leader should recover this job and submit it. If the > recovery/submission should fail for some reason, then we should fail fatally > to restart the process which will then try to recover the jobs again. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6213: [FLINK-9672] Fail fatally if job submission fails ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6213#discussion_r198912529 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -322,9 +322,6 @@ private JobManagerRunner createJobManagerRunner(JobGraph jobGraph) throws Except @Override public CompletableFuture> listJobs(Time timeout) { - if (jobManagerRunners.isEmpty()) { --- End diff -- nice ---
[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6083 ---
[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry
[ https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526502#comment-16526502 ] ASF GitHub Bot commented on FLINK-8983: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6083 > End-to-end test: Confluent schema registry > -- > > Key: FLINK-8983 > URL: https://issues.apache.org/jira/browse/FLINK-8983 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector, Tests >Reporter: Till Rohrmann >Assignee: Yazdan Shirvany >Priority: Critical > Labels: pull-request-available > Fix For: 1.6.0 > > > It would be good to add an end-to-end test which verifies that Flink is able > to work together with the Confluent schema registry. In order to do that we > have to setup a Kafka cluster and write a Flink job which reads from the > Confluent schema registry producing an Avro type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8983) End-to-end test: Confluent schema registry
[ https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-8983. Resolution: Fixed Fix Version/s: 1.6.0 Added via a36b56999240d1ead0793be7acb4ad13cd0559f2 9b366045697f80534b8eb2e8b559f02d1452f0cf > End-to-end test: Confluent schema registry > -- > > Key: FLINK-8983 > URL: https://issues.apache.org/jira/browse/FLINK-8983 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector, Tests >Reporter: Till Rohrmann >Assignee: Yazdan Shirvany >Priority: Critical > Labels: pull-request-available > Fix For: 1.6.0 > > > It would be good to add an end-to-end test which verifies that Flink is able > to work together with the Confluent schema registry. In order to do that we > have to setup a Kafka cluster and write a Flink job which reads from the > Confluent schema registry producing an Avro type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8745) Reduce travis usage
[ https://issues.apache.org/jira/browse/FLINK-8745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-8745: Affects Version/s: (was: 1.5.0) (was: 1.4.0) 1.6.0 > Reduce travis usage > --- > > Key: FLINK-8745 > URL: https://issues.apache.org/jira/browse/FLINK-8745 > Project: Flink > Issue Type: Improvement > Components: Build System, Travis >Affects Versions: 1.6.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Critical > > We've been notified by INFRA that our travis usage is exceedingly high. > There are various things we could look into short- and long term: > h2. Short-term > h3. Reduce number of jobs > We currently run 12 job for each pr/push. > The first 10 jobs belong to 2 groups, with each group representing one test > run of Flink against a specific hadoop version. > Given that the majority of changes made to Flink do not impact our > compatibility with hadoop we could drop one of the groups and instead rely on > daily cron jobs. This alone would cut our travis usage by 40%. > Once the migration to flip6 is done we can drop the remaining 2 jobs, > increasing the reduction to 60%. > h3. Reduce number of builds > Travis is run for every PR, regardless of what change was made, even if it > was something trivial as removing a trailing space in a documentation file. > From time to time it also happens that new commits are pushed in a PR solely > to trigger a new build to get that perfect green build. > Instead we could look into manually triggering travis for pull requests, that > is with a bot. > h2. Long-term > h3. Incremental builds > Making the build dependent on the changes made has been brought up a few > times now. This would in particular benefit cases where connectors/libraries > are modified as they generally have few dependents. We would still have to > run everything though if changes are made to the core modules. > h3. Repository split > The most painful of them all, but in my opinion also the most promising. With > separate repositories for the core flink modules (flink-runtime etc), > flink-connectors and flink-libraries would cut downright skip the compilation > for a large number of modules. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9687) Delay the state fetch only when the triggerResult is fire
[ https://issues.apache.org/jira/browse/FLINK-9687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526454#comment-16526454 ] Hequn Cheng commented on FLINK-9687: Good catch. When isFire=false, there is no need to get contents from window state. > Delay the state fetch only when the triggerResult is fire > - > > Key: FLINK-9687 > URL: https://issues.apache.org/jira/browse/FLINK-9687 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > > When the window operator is fired by the event timer or processing timer, it > fetch the state content first. I think it only need to fetch the content from > windowState when the triggerResult is Fire. So we have to change the order to > avoid this cost ( the cost of fetch content from state is more than judge the > triggerResult). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9684) HistoryServerArchiveFetcher not working properly with secure hdfs cluster
[ https://issues.apache.org/jira/browse/FLINK-9684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526449#comment-16526449 ] Ethan Li commented on FLINK-9684: - PR: https://github.com/apache/flink/pull/6225 > HistoryServerArchiveFetcher not working properly with secure hdfs cluster > - > > Key: FLINK-9684 > URL: https://issues.apache.org/jira/browse/FLINK-9684 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.2 >Reporter: Ethan Li >Priority: Major > > With my current setup, jobmanager and taskmanager are able to talk to hdfs > cluster (with kerberos setup). However, running history server gets: > > > {code:java} > 2018-06-27 19:03:32,080 WARN org.apache.hadoop.ipc.Client - Exception > encountered while connecting to the server : > java.lang.IllegalArgumentException: Failed to specify server's Kerberos > principal name > 2018-06-27 19:03:32,085 ERROR > org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher - > Failed to access job archive location for path > hdfs://openqe11blue-n2.blue.ygrid.yahoo.com/tmp/flink/openstorm10-blue/jmarchive. > java.io.IOException: Failed on local exception: java.io.IOException: > java.lang.IllegalArgumentException: Failed to specify server's Kerberos > principal name; Host Details : local host is: > "openstorm10blue-n2.blue.ygrid.yahoo.com/10.215.79.35"; destination host is: > "openqe11blue-n2.blue.ygri > d.yahoo.com":8020; > at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:764) > at org.apache.hadoop.ipc.Client.call(Client.java:1414) > at org.apache.hadoop.ipc.Client.call(Client.java:1363) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) > at com.sun.proxy.$Proxy9.getListing(Unknown Source) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103) > at com.sun.proxy.$Proxy9.getListing(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:515) > at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1743) > at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1726) > at > org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:650) > at > org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:102) > at > org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:712) > at > org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:708) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:708) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:146) > at > org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher$JobArchiveFetcherTask.run(HistoryServerArchiveFetcher.java:139) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: java.lang.IllegalArgumentException: Failed to > specify server's Kerberos principal name > at org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:677) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) > at > org.apache.hadoop.ipc.Client$Connection.handleSaslConnectionFailure(Client.java:640) > at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:724) > at org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:367) > at org.apache.hadoop.ipc.Client.getConnection(Client.java:1462) > at org.apache.hadoop.ipc.Client.call(Client.java:1381) > ...
[jira] [Commented] (FLINK-9687) Delay the state fetch only when the triggerResult is fire
[ https://issues.apache.org/jira/browse/FLINK-9687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526330#comment-16526330 ] aitozi commented on FLINK-9687: --- Hi, [~kkl0u] I have added my thought/description on this issue, can you help review it ? > Delay the state fetch only when the triggerResult is fire > - > > Key: FLINK-9687 > URL: https://issues.apache.org/jira/browse/FLINK-9687 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > > When the window operator is fired by the event timer or processing timer, it > fetch the state content first. I think it only need to fetch the content from > windowState when the triggerResult is Fire. So we have to change the order to > avoid this cost ( the cost of fetch content from state is more than judge the > triggerResult). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9687) Delay the state fetch only when the triggerResult is fire
[ https://issues.apache.org/jira/browse/FLINK-9687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] aitozi updated FLINK-9687: -- Description: When the window operator is fired by the event timer or processing timer, it fetch the state content first. I think it only need to fetch the content from windowState when the triggerResult is Fire. So we have to change the order to avoid this cost ( the cost of fetch content from state is more than judge the triggerResult). > Delay the state fetch only when the triggerResult is fire > - > > Key: FLINK-9687 > URL: https://issues.apache.org/jira/browse/FLINK-9687 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > > When the window operator is fired by the event timer or processing timer, it > fetch the state content first. I think it only need to fetch the content from > windowState when the triggerResult is Fire. So we have to change the order to > avoid this cost ( the cost of fetch content from state is more than judge the > triggerResult). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9687) Delay the state fetch only when the triggerResult is fire
[ https://issues.apache.org/jira/browse/FLINK-9687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526286#comment-16526286 ] Kostas Kloudas commented on FLINK-9687: --- Hi [~aitozi]. Can you provide a description here on what do you mean and why this is interesting? JIRAs should include the discussion on an issue, and not PRs. This is not only an issue of "taste" but it is required, as Jira is Apache, github is not. > Delay the state fetch only when the triggerResult is fire > - > > Key: FLINK-9687 > URL: https://issues.apache.org/jira/browse/FLINK-9687 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9662) Task manager isolation for jobs
[ https://issues.apache.org/jira/browse/FLINK-9662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526247#comment-16526247 ] Renjie Liu commented on FLINK-9662: --- Would it be better if we reuse the existing job id field in SlotStatus class? Till Rohrmann (JIRA) 于 2018年6月28日周四 下午7:29写道: -- Liu, Renjie Software Engineer, MVAD > Task manager isolation for jobs > --- > > Key: FLINK-9662 > URL: https://issues.apache.org/jira/browse/FLINK-9662 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination >Affects Versions: 1.5.0, 1.6.0 >Reporter: Renjie Liu >Assignee: Renjie Liu >Priority: Major > Fix For: 1.6.0 > > Attachments: job isolation sequence.jpg > > > Disable task manager sharing for different jobs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-9666) short-circuit logic should be used in boolean contexts
[ https://issues.apache.org/jira/browse/FLINK-9666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-9666. -- Resolution: Fixed Fixed via da37daa8ba387435abf6a3bc5629ce7d21a6b017 > short-circuit logic should be used in boolean contexts > -- > > Key: FLINK-9666 > URL: https://issues.apache.org/jira/browse/FLINK-9666 > Project: Flink > Issue Type: Improvement > Components: Core, DataStream API >Affects Versions: 1.5.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Minor > Labels: pull-request-available > Fix For: 1.6.0 > > > short-circuit logic should be used in boolean contexts -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9666) short-circuit logic should be used in boolean contexts
[ https://issues.apache.org/jira/browse/FLINK-9666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-9666: Assignee: lamber-ken > short-circuit logic should be used in boolean contexts > -- > > Key: FLINK-9666 > URL: https://issues.apache.org/jira/browse/FLINK-9666 > Project: Flink > Issue Type: Improvement > Components: Core, DataStream API >Affects Versions: 1.5.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Minor > Labels: pull-request-available > Fix For: 1.6.0 > > > short-circuit logic should be used in boolean contexts -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9662) Task manager isolation for jobs
[ https://issues.apache.org/jira/browse/FLINK-9662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526224#comment-16526224 ] Till Rohrmann commented on FLINK-9662: -- I think you can still solve the problem of task manager isolation with the tags. You need simply to specify a predicate which strictly requires the job id tag to be present. > Task manager isolation for jobs > --- > > Key: FLINK-9662 > URL: https://issues.apache.org/jira/browse/FLINK-9662 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination >Affects Versions: 1.5.0, 1.6.0 >Reporter: Renjie Liu >Assignee: Renjie Liu >Priority: Major > Fix For: 1.6.0 > > Attachments: job isolation sequence.jpg > > > Disable task manager sharing for different jobs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9624) Move jar/artifact upload logic out of JobGraph
[ https://issues.apache.org/jira/browse/FLINK-9624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-9624. --- Resolution: Fixed master: dd4c8469b11184b633d2b9514b9910622734270f > Move jar/artifact upload logic out of JobGraph > -- > > Key: FLINK-9624 > URL: https://issues.apache.org/jira/browse/FLINK-9624 > Project: Flink > Issue Type: Improvement > Components: Job-Submission >Affects Versions: 1.6.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The {{JobGraph}} offers utility methods for uploading jars and artifacts to > the BlobService. > However, how these files are uploaded isn't a concern of the {{JobGraph}} but > the submission-method, like the {{RestClusterClient}}. > These methods should be moved into a utility class. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9688) ATAN2 Sql Function support
[ https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526175#comment-16526175 ] Fabian Hueske commented on FLINK-9688: -- We aim of course for a large set of built-in function. So such new features are highly welcome. Thanks for creating this issue! > ATAN2 Sql Function support > -- > > Key: FLINK-9688 > URL: https://issues.apache.org/jira/browse/FLINK-9688 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Minor > > simple query fails {code} > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, > config()); > DataSet> ds = > CollectionDataSets.get3TupleDataSet(env); > tableEnv.registerDataSet("t1", ds, "x, y, z"); > String sqlQuery = "SELECT atan2(1,2)"; > Table result = tableEnv.sqlQuery(sqlQuery); > {code} > while at the same time Calcite supports it and in Calcite's sqlline it works > like {noformat} > 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2); > +-+ > | EXPR$0 | > +-+ > | 0.4636476090008061 | > +-+ > 1 row selected (0.173 seconds) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9688) ATAN2 Sql Function support
[ https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526166#comment-16526166 ] Sergey Nuyanzin commented on FLINK-9688: ok I see, thank you for clarification > ATAN2 Sql Function support > -- > > Key: FLINK-9688 > URL: https://issues.apache.org/jira/browse/FLINK-9688 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Minor > > simple query fails {code} > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, > config()); > DataSet> ds = > CollectionDataSets.get3TupleDataSet(env); > tableEnv.registerDataSet("t1", ds, "x, y, z"); > String sqlQuery = "SELECT atan2(1,2)"; > Table result = tableEnv.sqlQuery(sqlQuery); > {code} > while at the same time Calcite supports it and in Calcite's sqlline it works > like {noformat} > 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2); > +-+ > | EXPR$0 | > +-+ > | 0.4636476090008061 | > +-+ > 1 row selected (0.173 seconds) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9688) ATAN2 Sql Function support
[ https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526161#comment-16526161 ] Fabian Hueske commented on FLINK-9688: -- This is issue requests a new feature. The {{ATAN2}} is not support yet. Adding it is a new feature. It would be a bug if the function was already supported but computing an incorrect result. > ATAN2 Sql Function support > -- > > Key: FLINK-9688 > URL: https://issues.apache.org/jira/browse/FLINK-9688 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Minor > > simple query fails {code} > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, > config()); > DataSet> ds = > CollectionDataSets.get3TupleDataSet(env); > tableEnv.registerDataSet("t1", ds, "x, y, z"); > String sqlQuery = "SELECT atan2(1,2)"; > Table result = tableEnv.sqlQuery(sqlQuery); > {code} > while at the same time Calcite supports it and in Calcite's sqlline it works > like {noformat} > 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2); > +-+ > | EXPR$0 | > +-+ > | 0.4636476090008061 | > +-+ > 1 row selected (0.173 seconds) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9688) ATAN2 Sql Function support
[ https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-9688: - Issue Type: New Feature (was: Bug) > ATAN2 Sql Function support > -- > > Key: FLINK-9688 > URL: https://issues.apache.org/jira/browse/FLINK-9688 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Minor > > simple query fails {code} > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, > config()); > DataSet> ds = > CollectionDataSets.get3TupleDataSet(env); > tableEnv.registerDataSet("t1", ds, "x, y, z"); > String sqlQuery = "SELECT atan2(1,2)"; > Table result = tableEnv.sqlQuery(sqlQuery); > {code} > while at the same time Calcite supports it and in Calcite's sqlline it works > like {noformat} > 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2); > +-+ > | EXPR$0 | > +-+ > | 0.4636476090008061 | > +-+ > 1 row selected (0.173 seconds) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9688) ATAN2 Sql Function support
[ https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin updated FLINK-9688: --- Issue Type: Bug (was: New Feature) > ATAN2 Sql Function support > -- > > Key: FLINK-9688 > URL: https://issues.apache.org/jira/browse/FLINK-9688 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Minor > > simple query fails {code} > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, > config()); > DataSet> ds = > CollectionDataSets.get3TupleDataSet(env); > tableEnv.registerDataSet("t1", ds, "x, y, z"); > String sqlQuery = "SELECT atan2(1,2)"; > Table result = tableEnv.sqlQuery(sqlQuery); > {code} > while at the same time Calcite supports it and in Calcite's sqlline it works > like {noformat} > 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2); > +-+ > | EXPR$0 | > +-+ > | 0.4636476090008061 | > +-+ > 1 row selected (0.173 seconds) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9688) ATAN2 Sql Function support
[ https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-9688: - Priority: Minor (was: Major) > ATAN2 Sql Function support > -- > > Key: FLINK-9688 > URL: https://issues.apache.org/jira/browse/FLINK-9688 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Minor > > simple query fails {code} > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, > config()); > DataSet> ds = > CollectionDataSets.get3TupleDataSet(env); > tableEnv.registerDataSet("t1", ds, "x, y, z"); > String sqlQuery = "SELECT atan2(1,2)"; > Table result = tableEnv.sqlQuery(sqlQuery); > {code} > while at the same time Calcite supports it and in Calcite's sqlline it works > like {noformat} > 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2); > +-+ > | EXPR$0 | > +-+ > | 0.4636476090008061 | > +-+ > 1 row selected (0.173 seconds) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9688) ATAN2 Sql Function support
[ https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-9688: - Issue Type: New Feature (was: Bug) > ATAN2 Sql Function support > -- > > Key: FLINK-9688 > URL: https://issues.apache.org/jira/browse/FLINK-9688 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > > simple query fails {code} > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, > config()); > DataSet> ds = > CollectionDataSets.get3TupleDataSet(env); > tableEnv.registerDataSet("t1", ds, "x, y, z"); > String sqlQuery = "SELECT atan2(1,2)"; > Table result = tableEnv.sqlQuery(sqlQuery); > {code} > while at the same time Calcite supports it and in Calcite's sqlline it works > like {noformat} > 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2); > +-+ > | EXPR$0 | > +-+ > | 0.4636476090008061 | > +-+ > 1 row selected (0.173 seconds) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9682) Add setDescription to execution environment and display it in the UI
[ https://issues.apache.org/jira/browse/FLINK-9682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-9682: --- Assignee: vinoyang > Add setDescription to execution environment and display it in the UI > > > Key: FLINK-9682 > URL: https://issues.apache.org/jira/browse/FLINK-9682 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Webfrontend >Affects Versions: 1.5.0 >Reporter: Elias Levy >Assignee: vinoyang >Priority: Major > > Currently you can provide a job name to {{execute}} in the execution > environment. In an environment where many version of a job may be executing, > such as a development or test environment, identifying which running job is > of a specific version via the UI can be difficult unless the version is > embedded into the job name given the {{execute}}. But the job name is uses > for other purposes, such as for namespacing metrics. Thus, it is not ideal > to modify the job name, as that could require modifying metric dashboards and > monitors each time versions change. > I propose a new method be added to the execution environment, > {{setDescription}}, that would allow a user to pass in an arbitrary > description that would be displayed in the dashboard, allowing users to > distinguish jobs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9688) ATAN2 Sql Function support
[ https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin updated FLINK-9688: --- Description: simple query fails {code} ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); DataSet> ds = CollectionDataSets.get3TupleDataSet(env); tableEnv.registerDataSet("t1", ds, "x, y, z"); String sqlQuery = "SELECT atan2(1,2)"; Table result = tableEnv.sqlQuery(sqlQuery); {code} while at the same time Calcite supports it and in Calcite's sqlline it works like {noformat} 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2); +-+ | EXPR$0 | +-+ | 0.4636476090008061 | +-+ 1 row selected (0.173 seconds) {noformat} was: simple query fails {code} ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); DataSet> ds = CollectionDataSets.get3TupleDataSet(env); tableEnv.registerDataSet("t1", ds, "x, y, z"); String sqlQuery = "SELECT atan2(1,2)"; Table result = tableEnv.sqlQuery(sqlQuery); {code} while at the same time Calcite supports it and in Calcite's sqlline it works like {noformat} 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2); +-+ | EXPR$0 | +-+ | 0.4636476090008061 | +-+ 1 row selected (0.173 seconds) {noformat} > ATAN2 Sql Function support > -- > > Key: FLINK-9688 > URL: https://issues.apache.org/jira/browse/FLINK-9688 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > > simple query fails {code} > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, > config()); > DataSet> ds = > CollectionDataSets.get3TupleDataSet(env); > tableEnv.registerDataSet("t1", ds, "x, y, z"); > String sqlQuery = "SELECT atan2(1,2)"; > Table result = tableEnv.sqlQuery(sqlQuery); > {code} > while at the same time Calcite supports it and in Calcite's sqlline it works > like {noformat} > 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2); > +-+ > | EXPR$0 | > +-+ > | 0.4636476090008061 | > +-+ > 1 row selected (0.173 seconds) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9688) ATAN2 Sql Function support
Sergey Nuyanzin created FLINK-9688: -- Summary: ATAN2 Sql Function support Key: FLINK-9688 URL: https://issues.apache.org/jira/browse/FLINK-9688 Project: Flink Issue Type: Bug Components: Table API SQL Reporter: Sergey Nuyanzin Assignee: Sergey Nuyanzin simple query fails {code} ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); DataSet> ds = CollectionDataSets.get3TupleDataSet(env); tableEnv.registerDataSet("t1", ds, "x, y, z"); String sqlQuery = "SELECT atan2(1,2)"; Table result = tableEnv.sqlQuery(sqlQuery); {code} while at the same time Calcite supports it and in Calcite's sqlline it works like {noformat} 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2); +-+ | EXPR$0 | +-+ | 0.4636476090008061 | +-+ 1 row selected (0.173 seconds) {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9580) Potentially unclosed ByteBufInputStream in RestClient#readRawResponse
[ https://issues.apache.org/jira/browse/FLINK-9580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-9580. --- Resolution: Fixed Fix Version/s: 1.5.1 1.6.0 master: b4574c9bb5398713dd5501baf596f284ea19817f 1.5: 1bdc7194dff23b3ce3181c687be53f8a66a31bfe > Potentially unclosed ByteBufInputStream in RestClient#readRawResponse > - > > Key: FLINK-9580 > URL: https://issues.apache.org/jira/browse/FLINK-9580 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0, 1.6.0 >Reporter: Ted Yu >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Here is related code: > {code} > ByteBufInputStream in = new ByteBufInputStream(content); > byte[] data = new byte[in.available()]; > in.readFully(data); > {code} > In the catch block, ByteBufInputStream is not closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9629) Datadog metrics reporter does not have shaded dependencies
[ https://issues.apache.org/jira/browse/FLINK-9629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-9629. --- Resolution: Fixed Fix Version/s: 1.5.1 1.6.0 master: afcb513e21afaeab7289c0e51222c261d5d0150a 1.5: 0dae5a1aee771f17b086d6dbd54bf0b95bb436f2 > Datadog metrics reporter does not have shaded dependencies > -- > > Key: FLINK-9629 > URL: https://issues.apache.org/jira/browse/FLINK-9629 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.5.0, 1.6.0 >Reporter: Georgii Gobozov >Assignee: Georgii Gobozov >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > flink-metrics-datadog-1.5.0.jar does not contain shaded dependencies for > okhttp3 and okio -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9629) Datadog metrics reporter does not have shaded dependencies
[ https://issues.apache.org/jira/browse/FLINK-9629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-9629: Affects Version/s: (was: 1.5.1) > Datadog metrics reporter does not have shaded dependencies > -- > > Key: FLINK-9629 > URL: https://issues.apache.org/jira/browse/FLINK-9629 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.5.0, 1.6.0 >Reporter: Georgii Gobozov >Assignee: Georgii Gobozov >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > flink-metrics-datadog-1.5.0.jar does not contain shaded dependencies for > okhttp3 and okio -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9580) Potentially unclosed ByteBufInputStream in RestClient#readRawResponse
[ https://issues.apache.org/jira/browse/FLINK-9580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-9580: Affects Version/s: 1.6.0 1.5.0 > Potentially unclosed ByteBufInputStream in RestClient#readRawResponse > - > > Key: FLINK-9580 > URL: https://issues.apache.org/jira/browse/FLINK-9580 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0, 1.6.0 >Reporter: Ted Yu >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > Here is related code: > {code} > ByteBufInputStream in = new ByteBufInputStream(content); > byte[] data = new byte[in.available()]; > in.readFully(data); > {code} > In the catch block, ByteBufInputStream is not closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8785) JobSubmitHandler does not handle JobSubmissionExceptions
[ https://issues.apache.org/jira/browse/FLINK-8785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-8785: -- Labels: flip-6 pull-request-available (was: flip-6) > JobSubmitHandler does not handle JobSubmissionExceptions > > > Key: FLINK-8785 > URL: https://issues.apache.org/jira/browse/FLINK-8785 > Project: Flink > Issue Type: Bug > Components: Job-Submission, JobManager, REST >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Labels: flip-6, pull-request-available > > If the job submission, i.e. {{DispatcherGateway#submitJob}} fails with a > {{JobSubmissionException}} the {{JobSubmissionHandler}} returns "Internal > server error" instead of signaling the failed job submission. > This can for example occur if the transmitted execution graph is faulty, as > tested by the \{{JobSubmissionFailsITCase}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8785) JobSubmitHandler does not handle JobSubmissionExceptions
[ https://issues.apache.org/jira/browse/FLINK-8785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526130#comment-16526130 ] ASF GitHub Bot commented on FLINK-8785: --- GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/6222 [FLINK-8785][rest] Handle JobSubmissionExceptions ## What is the purpose of the change This PR modifies the `JobSubmitHandler` to handle exceptions contained in the future returned by `DispatcherGateway#submitJob`. An exception handler was added via `CompletableFuture#exceptionally` to return a proper `ErrorResponseBody` signaling that the job submission has failed. This PR is pretty much the bare-bones solution; in the JIRA I advocated for including error messages from exceptions since there are various reasons why the submission could fail, but I can't find a satisfying solution. ## Verifying this change * see new test in `JobSubmitHandlerTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8785_basic Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6222.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6222 commit 32fe49270596cdcf2f91f822c3a6504a14ba40eb Author: zentol Date: 2018-06-28T08:57:01Z [FLINK-8785][rest] Handle JobSubmissionExceptions > JobSubmitHandler does not handle JobSubmissionExceptions > > > Key: FLINK-8785 > URL: https://issues.apache.org/jira/browse/FLINK-8785 > Project: Flink > Issue Type: Bug > Components: Job-Submission, JobManager, REST >Affects Versions: 1.5.0, 1.6.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Labels: flip-6, pull-request-available > > If the job submission, i.e. {{DispatcherGateway#submitJob}} fails with a > {{JobSubmissionException}} the {{JobSubmissionHandler}} returns "Internal > server error" instead of signaling the failed job submission. > This can for example occur if the transmitted execution graph is faulty, as > tested by the \{{JobSubmissionFailsITCase}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6222: [FLINK-8785][rest] Handle JobSubmissionExceptions
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/6222 [FLINK-8785][rest] Handle JobSubmissionExceptions ## What is the purpose of the change This PR modifies the `JobSubmitHandler` to handle exceptions contained in the future returned by `DispatcherGateway#submitJob`. An exception handler was added via `CompletableFuture#exceptionally` to return a proper `ErrorResponseBody` signaling that the job submission has failed. This PR is pretty much the bare-bones solution; in the JIRA I advocated for including error messages from exceptions since there are various reasons why the submission could fail, but I can't find a satisfying solution. ## Verifying this change * see new test in `JobSubmitHandlerTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8785_basic Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6222.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6222 commit 32fe49270596cdcf2f91f822c3a6504a14ba40eb Author: zentol Date: 2018-06-28T08:57:01Z [FLINK-8785][rest] Handle JobSubmissionExceptions ---
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526115#comment-16526115 ] ASF GitHub Bot commented on FLINK-9514: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r198757244 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.functions.FoldFunction; + +/** + * This class wraps folding function with TTL logic. + * + * @param Type of the values folded into the state + * @param Type of the value in the state + * + * @deprecated use {@link TtlAggregateFunction} instead + */ +@Deprecated +class TtlFoldFunction + extends AbstractTtlDecorator> + implements FoldFunction> { + TtlFoldFunction(FoldFunction original, TtlConfig config, TtlTimeProvider timeProvider) { + super(original, config, timeProvider); + } + + @Override + public TtlValue fold(TtlValue accumulator, T value) throws Exception { + return wrapWithTs(original.fold(getUnexpried(accumulator), value)); --- End diff -- As I understand, the wrapped states should already provide the default values. My idea was to wrap the original default value [in TTL factory](https://github.com/apache/flink/pull/6196/commits/4dedb9a20244a2addd337617778b17fe8349#diff-13011fbe7c28b56b994783572b461aaeR174) with expiration timestamp `Long.MAX_VALUE`, basically never expiring. Good point about test cases for it, I will add them for appending states. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r198757244 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.functions.FoldFunction; + +/** + * This class wraps folding function with TTL logic. + * + * @param Type of the values folded into the state + * @param Type of the value in the state + * + * @deprecated use {@link TtlAggregateFunction} instead + */ +@Deprecated +class TtlFoldFunction + extends AbstractTtlDecorator> + implements FoldFunction> { + TtlFoldFunction(FoldFunction original, TtlConfig config, TtlTimeProvider timeProvider) { + super(original, config, timeProvider); + } + + @Override + public TtlValue fold(TtlValue accumulator, T value) throws Exception { + return wrapWithTs(original.fold(getUnexpried(accumulator), value)); --- End diff -- As I understand, the wrapped states should already provide the default values. My idea was to wrap the original default value [in TTL factory](https://github.com/apache/flink/pull/6196/commits/4dedb9a20244a2addd337617778b17fe8349#diff-13011fbe7c28b56b994783572b461aaeR174) with expiration timestamp `Long.MAX_VALUE`, basically never expiring. Good point about test cases for it, I will add them for appending states. ---
[jira] [Created] (FLINK-9687) Delay the state fetch only when the triggerResult is fire
aitozi created FLINK-9687: - Summary: Delay the state fetch only when the triggerResult is fire Key: FLINK-9687 URL: https://issues.apache.org/jira/browse/FLINK-9687 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 1.5.0 Reporter: aitozi Assignee: aitozi -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526096#comment-16526096 ] ASF GitHub Bot commented on FLINK-9514: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r198751723 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.AbstractMap; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +/** + * This class wraps map state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry key of state with TTL + * @param Type of the user entry value of state with TTL + */ +class TtlMapState + extends AbstractTtlState, Map>, InternalMapState>> + implements InternalMapState { + TtlMapState( + InternalMapState> original, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(original, config, timeProvider, valueSerializer); + } + + @Override + public UV get(UK key) throws Exception { + return getWithTtlCheckAndUpdate(() -> original.get(key), v -> original.put(key, v), () -> original.remove(key)); + } + + @Override + public void put(UK key, UV value) throws Exception { + original.put(key, wrapWithTs(value)); + } + + @Override + public void putAll(Map map) throws Exception { + if (map == null) { + return; + } + Map> ttlMap = map.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> wrapWithTs(e.getValue(; + original.putAll(ttlMap); + } + + @Override + public void remove(UK key) throws Exception { + original.remove(key); + } + + @Override + public boolean contains(UK key) throws Exception { + return get(key) != null; + } + + @Override + public Iterable> entries() throws Exception { + return entriesStream()::iterator; + } + + private Stream> entriesStream() throws Exception { + Iterable>> withTs = original.entries(); + withTs = withTs == null ? Collections.emptyList() : withTs; + return StreamSupport + .stream(withTs.spliterator(), false) --- End diff -- As I understand, it depends on use case. If it is parallelizable, lazy operations over big collection like filter and map over lists, stream will give boost over loops but for short collections or non-parallelizable spliterators the overhead kills the performance. Though, it might be hard to predict the type of used spliterator. I agree the real benchmarking should be done to make sure. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r198751723 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.AbstractMap; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +/** + * This class wraps map state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry key of state with TTL + * @param Type of the user entry value of state with TTL + */ +class TtlMapState + extends AbstractTtlState, Map>, InternalMapState>> + implements InternalMapState { + TtlMapState( + InternalMapState> original, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(original, config, timeProvider, valueSerializer); + } + + @Override + public UV get(UK key) throws Exception { + return getWithTtlCheckAndUpdate(() -> original.get(key), v -> original.put(key, v), () -> original.remove(key)); + } + + @Override + public void put(UK key, UV value) throws Exception { + original.put(key, wrapWithTs(value)); + } + + @Override + public void putAll(Map map) throws Exception { + if (map == null) { + return; + } + Map> ttlMap = map.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> wrapWithTs(e.getValue(; + original.putAll(ttlMap); + } + + @Override + public void remove(UK key) throws Exception { + original.remove(key); + } + + @Override + public boolean contains(UK key) throws Exception { + return get(key) != null; + } + + @Override + public Iterable> entries() throws Exception { + return entriesStream()::iterator; + } + + private Stream> entriesStream() throws Exception { + Iterable>> withTs = original.entries(); + withTs = withTs == null ? Collections.emptyList() : withTs; + return StreamSupport + .stream(withTs.spliterator(), false) --- End diff -- As I understand, it depends on use case. If it is parallelizable, lazy operations over big collection like filter and map over lists, stream will give boost over loops but for short collections or non-parallelizable spliterators the overhead kills the performance. Though, it might be hard to predict the type of used spliterator. I agree the real benchmarking should be done to make sure. ---