[GitHub] [flink] TisonKun commented on a change in pull request #10143: [FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main thread
TisonKun commented on a change in pull request #10143: [FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main thread URL: https://github.com/apache/flink/pull/10143#discussion_r344592398 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -322,11 +323,7 @@ Resource getContainerResource() { public boolean stopWorker(final YarnWorkerNode workerNode) { final Container container = workerNode.getContainer(); log.info("Stopping container {}.", container.getId()); - try { - nodeManagerClient.stopContainer(container.getId(), container.getNodeId()); - } catch (final Exception e) { - log.warn("Error while calling YARN Node Manager to stop container", e); - } + nodeManagerClient.stopContainerAsync(container.getId(), container.getNodeId()); Review comment: Well I notice `onXXX` methods below. Would you explain why we don't ```java try { ... } catch (Throwable t) { onStopContainerError(...) } ``` here as we do when start container? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] TisonKun commented on a change in pull request #10143: [FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main thread
TisonKun commented on a change in pull request #10143: [FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main thread URL: https://github.com/apache/flink/pull/10143#discussion_r344592017 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -407,15 +404,9 @@ private void startTaskExecutorInContainer(Container container) { containerIdStr, container.getNodeId().getHost()); - nodeManagerClient.startContainer(container, taskExecutorLaunchContext); + nodeManagerClient.startContainerAsync(container, taskExecutorLaunchContext); } catch (Throwable t) { - log.error("Could not start TaskManager in container {}.", container.getId(), t); - - // release the failed container - workerNodeMap.remove(resourceId); - resourceManagerClient.releaseAssignedContainer(container.getId()); - // and ask for a new one - requestYarnContainerIfRequired(); + onStartContainerError(container.getId(), t); Review comment: The same here. How can we react to start container failure? I don't think it handles the future result of an async method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] TisonKun commented on a change in pull request #10143: [FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main thread
TisonKun commented on a change in pull request #10143: [FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main thread URL: https://github.com/apache/flink/pull/10143#discussion_r344592017 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -407,15 +404,9 @@ private void startTaskExecutorInContainer(Container container) { containerIdStr, container.getNodeId().getHost()); - nodeManagerClient.startContainer(container, taskExecutorLaunchContext); + nodeManagerClient.startContainerAsync(container, taskExecutorLaunchContext); } catch (Throwable t) { - log.error("Could not start TaskManager in container {}.", container.getId(), t); - - // release the failed container - workerNodeMap.remove(resourceId); - resourceManagerClient.releaseAssignedContainer(container.getId()); - // and ask for a new one - requestYarnContainerIfRequired(); + onStartContainerError(container.getId(), t); Review comment: The same here. How can we react to start container failure? I don't think it handles the future result of an async method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344592080 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/BackPressureSampleServiceTest.java ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Tests for {@link BackPressureSampleService}. + */ +public class BackPressureSampleServiceTest extends TestLogger { + + private ScheduledExecutorService scheduledExecutorService; + + private BackPressureSampleService backPressureSampleService; + + @Before Review comment: use @BeforeClass, we can have a class level service instead of creating instance for every test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10106: [FLINK-11463][e2e] Design the e2e java framework so that at least the Kafka streaming tests can run on it
flinkbot edited a comment on issue #10106: [FLINK-11463][e2e] Design the e2e java framework so that at least the Kafka streaming tests can run on it URL: https://github.com/apache/flink/pull/10106#issuecomment-550309075 ## CI report: * 9817ef9c8be75fd02ccef3825e706497f3e6c1b6 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/135261385) * 48033e52f0aa852d03c351319d464dedab3c8088 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/135397835) * 75bf191da0a22986a3877236a0f4be9dbe2e8606 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/135874115) * 676cae9dbfa13d89e1618655da7b67cba8b647d4 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/135875904) * 3f67bbfda975cad5bc1f3d3e7e5ee28fb7326e9f : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] TisonKun commented on a change in pull request #10143: [FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main thread
TisonKun commented on a change in pull request #10143: [FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main thread URL: https://github.com/apache/flink/pull/10143#discussion_r344591626 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -322,11 +323,7 @@ Resource getContainerResource() { public boolean stopWorker(final YarnWorkerNode workerNode) { final Container container = workerNode.getContainer(); log.info("Stopping container {}.", container.getId()); - try { - nodeManagerClient.stopContainer(container.getId(), container.getNodeId()); - } catch (final Exception e) { - log.warn("Error while calling YARN Node Manager to stop container", e); - } + nodeManagerClient.stopContainerAsync(container.getId(), container.getNodeId()); Review comment: How can we react to stop container failure? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344591459 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/BackPressureSampleServiceTest.java ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Tests for {@link BackPressureSampleService}. + */ +public class BackPressureSampleServiceTest extends TestLogger { + + private ScheduledExecutorService scheduledExecutorService; + + private BackPressureSampleService backPressureSampleService; + + @Before + public void setUp() throws Exception { + scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + final ScheduledExecutor scheduledExecutor = new ScheduledExecutorServiceAdapter(scheduledExecutorService); + + backPressureSampleService = new BackPressureSampleService( 10, Time.milliseconds(10), scheduledExecutor); Review comment: remove whitespace before `10` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344590458 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureTrackerTestUtils.java ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.backpressure; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.JobInformation; +import org.apache.flink.runtime.executiongraph.TestingSlotProvider; +import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.util.SerializedValue; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; + +/** + * Utility methods for {@link BackPressureStatsTrackerImplTest} and {@link BackPressureRequestCoordinatorTest}. + */ +public class BackPressureTrackerTestUtils { + + public static ExecutionJobVertex createExecutionJobVertex() throws Exception { + return new ExecutionJobVertex( + createExecutionGraph(), + new JobVertex("TestingJobVertex", new JobVertexID()), + 4, + JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue(), + Time.milliseconds(1), + 1L, + System.currentTimeMillis()); + } + + public static ExecutionGraph createExecutionGraph() throws IOException { Review comment: We could create a more simple `ExecutionGraph` like below: ``` ExecutionGraph eg = new ExecutionGraph( TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), new JobID(), "test", new Configuration(), new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy(), new TestingSlotProvider(ignored -> new CompletableFuture<>())); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344589965 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureTrackerTestUtils.java ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.backpressure; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.JobInformation; +import org.apache.flink.runtime.executiongraph.TestingSlotProvider; +import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.util.SerializedValue; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; + +/** + * Utility methods for {@link BackPressureStatsTrackerImplTest} and {@link BackPressureRequestCoordinatorTest}. + */ +public class BackPressureTrackerTestUtils { + + public static ExecutionJobVertex createExecutionJobVertex() throws Exception { Review comment: Actually we could use a simpler constructor for `ExecutionJobVertex`, even it is better to use `ExecutionJobVertexTest#createExecutionJobVertex` directly, because `BackPressureTrackerTestUtils` actually did not create back pressure specific components. Although the execution graph is mocked inside `ExecutionJobVertexTest#createExecutionJobVertex`, it is not touched by this PR, so we can keep it that or make it as a real graph if you like (not mandatory). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344589965 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureTrackerTestUtils.java ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.backpressure; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.JobInformation; +import org.apache.flink.runtime.executiongraph.TestingSlotProvider; +import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.util.SerializedValue; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; + +/** + * Utility methods for {@link BackPressureStatsTrackerImplTest} and {@link BackPressureRequestCoordinatorTest}. + */ +public class BackPressureTrackerTestUtils { + + public static ExecutionJobVertex createExecutionJobVertex() throws Exception { Review comment: Actually we could use a simpler constructor for `ExecutionJobVertex`, even it is better to use `ExecutionJobVertexTest#createExecutionJobVertex` directly, because `BackPressureTrackerTestUtils` actually did not create back pressure specific components. Although the execution graph was mocked inside `ExecutionJobVertexTest#createExecutionJobVertex`, it is not touched by this PR, so we can keep it that or make it as a real graph if you like (not mandatory). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-14702) Rewrite the tpch ( test_tpch.sh ) e2e tests by using the newly introduced java framework
Zheng Hu created FLINK-14702: Summary: Rewrite the tpch ( test_tpch.sh ) e2e tests by using the newly introduced java framework Key: FLINK-14702 URL: https://issues.apache.org/jira/browse/FLINK-14702 Project: Flink Issue Type: Sub-task Reporter: Zheng Hu Will rewrite the https://github.com/apache/flink/blob/master/flink-end-to-end-tests/test-scripts/test_tpch.sh e2e tests by java e2e framework. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14674) some tpc-ds query hang in scheduled stage for long time
[ https://issues.apache.org/jira/browse/FLINK-14674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16971363#comment-16971363 ] Zhu Zhu commented on FLINK-14674: - The job hangs due to FLINK-14701, which happen if {{SharedSlotOversubscribedException}} happens (A.T.M only if a shared slot cannot fulfill the managed memory requirements from all tasks in it). Insufficient managed memory for one task is also a problem for q28 and q77, however, jobs will fail but not hang in this case if FLINK-14701 is resolved. > some tpc-ds query hang in scheduled stage for long time > --- > > Key: FLINK-14674 > URL: https://issues.apache.org/jira/browse/FLINK-14674 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Affects Versions: 1.9.1 >Reporter: Leonard Xu >Priority: Major > Fix For: 1.10.0 > > > When run tpc-ds query in standalone mode, some query(4、11、28、31、77、88) hang > in scheduled stage for long time, > BTW,I use blink planner,bath mode shuffle. > Can reproduce this issue in > [https://github.com/leonardBang/flink/tree/FLINK-11491] > > cc [~zhuzh] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344587521 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplTest.java ## @@ -18,166 +18,218 @@ package org.apache.flink.runtime.rest.handler.legacy.backpressure; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.TestLogger; -import org.junit.Assert; import org.junit.Test; -import org.mockito.Matchers; -import org.mockito.Mockito; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + /** - * Tests for the BackPressureStatsTrackerImpl. + * Tests for the {@link BackPressureStatsTrackerImpl}. */ public class BackPressureStatsTrackerImplTest extends TestLogger { - /** Tests simple statistics with fake stack traces. */ @Test - @SuppressWarnings("unchecked") - public void testTriggerStackTraceSample() throws Exception { - CompletableFuture sampleFuture = new CompletableFuture<>(); - - StackTraceSampleCoordinator sampleCoordinator = Mockito.mock(StackTraceSampleCoordinator.class); - Mockito.when(sampleCoordinator.triggerStackTraceSample( - Matchers.any(ExecutionVertex[].class), - Matchers.anyInt(), - Matchers.any(Time.class), - Matchers.anyInt())).thenReturn(sampleFuture); - - ExecutionGraph graph = Mockito.mock(ExecutionGraph.class); - Mockito.when(graph.getState()).thenReturn(JobStatus.RUNNING); - - // Same Thread execution context - Mockito.when(graph.getFutureExecutor()).thenReturn(new Executor() { - - @Override - public void execute(Runnable runnable) { - runnable.run(); - } - }); - - ExecutionVertex[] taskVertices = new ExecutionVertex[4]; - - ExecutionJobVertex jobVertex = Mockito.mock(ExecutionJobVertex.class); - Mockito.when(jobVertex.getJobId()).thenReturn(new JobID()); - Mockito.when(jobVertex.getJobVertexId()).thenReturn(new JobVertexID()); - Mockito.when(jobVertex.getGraph()).thenReturn(graph); - Mockito.when(jobVertex.getTaskVertices()).thenReturn(taskVertices); - - taskVertices[0] = mockExecutionVertex(jobVertex, 0); - taskVertices[1] = mockExecutionVertex(jobVertex, 1); - taskVertices[2] = mockExecutionVertex(jobVertex, 2); - taskVertices[3] = mockExecutionVertex(jobVertex, 3); - - int numSamples = 100; - Time delayBetweenSamples = Time.milliseconds(100L); - - BackPressureStatsTrackerImpl tracker = new BackPressureStatsTrackerImpl( - sampleCoordinator, , numSamples, Integer.MAX_VALUE, delayBetweenSamples); - - // getOperatorBackPressureStats triggers stack trace sampling - Assert.assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent()); - - Mockito.verify(sampleCoordinator, Mockito.times(1)).triggerStackTraceSample( - Matchers.eq(taskVertices), - Matchers.eq(numSamples), - Matchers.eq(delayBetweenSamples), - Matchers.eq(BackPressureStatsTrackerImpl.MAX_STACK_TRACE_DEPTH)); - - // Request back pressure stats again. This should not trigger another sample request - Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent()); - - Mockito.verify(sampleCoordinator, Mockito.times(1)).triggerStackTraceSample( - Matchers.eq(taskVertices), - Matchers.eq(numSamples), -
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344586695 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplTest.java ## @@ -18,166 +18,218 @@ package org.apache.flink.runtime.rest.handler.legacy.backpressure; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.TestLogger; -import org.junit.Assert; import org.junit.Test; -import org.mockito.Matchers; -import org.mockito.Mockito; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + /** - * Tests for the BackPressureStatsTrackerImpl. + * Tests for the {@link BackPressureStatsTrackerImpl}. */ public class BackPressureStatsTrackerImplTest extends TestLogger { - /** Tests simple statistics with fake stack traces. */ @Test - @SuppressWarnings("unchecked") - public void testTriggerStackTraceSample() throws Exception { - CompletableFuture sampleFuture = new CompletableFuture<>(); - - StackTraceSampleCoordinator sampleCoordinator = Mockito.mock(StackTraceSampleCoordinator.class); - Mockito.when(sampleCoordinator.triggerStackTraceSample( - Matchers.any(ExecutionVertex[].class), - Matchers.anyInt(), - Matchers.any(Time.class), - Matchers.anyInt())).thenReturn(sampleFuture); - - ExecutionGraph graph = Mockito.mock(ExecutionGraph.class); - Mockito.when(graph.getState()).thenReturn(JobStatus.RUNNING); - - // Same Thread execution context - Mockito.when(graph.getFutureExecutor()).thenReturn(new Executor() { - - @Override - public void execute(Runnable runnable) { - runnable.run(); - } - }); - - ExecutionVertex[] taskVertices = new ExecutionVertex[4]; - - ExecutionJobVertex jobVertex = Mockito.mock(ExecutionJobVertex.class); - Mockito.when(jobVertex.getJobId()).thenReturn(new JobID()); - Mockito.when(jobVertex.getJobVertexId()).thenReturn(new JobVertexID()); - Mockito.when(jobVertex.getGraph()).thenReturn(graph); - Mockito.when(jobVertex.getTaskVertices()).thenReturn(taskVertices); - - taskVertices[0] = mockExecutionVertex(jobVertex, 0); - taskVertices[1] = mockExecutionVertex(jobVertex, 1); - taskVertices[2] = mockExecutionVertex(jobVertex, 2); - taskVertices[3] = mockExecutionVertex(jobVertex, 3); - - int numSamples = 100; - Time delayBetweenSamples = Time.milliseconds(100L); - - BackPressureStatsTrackerImpl tracker = new BackPressureStatsTrackerImpl( - sampleCoordinator, , numSamples, Integer.MAX_VALUE, delayBetweenSamples); - - // getOperatorBackPressureStats triggers stack trace sampling - Assert.assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent()); - - Mockito.verify(sampleCoordinator, Mockito.times(1)).triggerStackTraceSample( - Matchers.eq(taskVertices), - Matchers.eq(numSamples), - Matchers.eq(delayBetweenSamples), - Matchers.eq(BackPressureStatsTrackerImpl.MAX_STACK_TRACE_DEPTH)); - - // Request back pressure stats again. This should not trigger another sample request - Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent()); - - Mockito.verify(sampleCoordinator, Mockito.times(1)).triggerStackTraceSample( - Matchers.eq(taskVertices), - Matchers.eq(numSamples), -
[jira] [Commented] (FLINK-14701) Slot leaks if SharedSlotOversubscribedException happens
[ https://issues.apache.org/jira/browse/FLINK-14701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16971359#comment-16971359 ] Zhu Zhu commented on FLINK-14701: - [~chesnay], what do you think of the issue and the proposed solution? This issue also happens in 1.9 so I think we also need the fix there. > Slot leaks if SharedSlotOversubscribedException happens > --- > > Key: FLINK-14701 > URL: https://issues.apache.org/jira/browse/FLINK-14701 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.10.0, 1.9.2 >Reporter: Zhu Zhu >Priority: Blocker > Fix For: 1.10.0, 1.9.2 > > > If a {{SharedSlotOversubscribedException}} happens, the {{MultiTaskSlot}} > will release some of its child {{SingleTaskSlot}}. The triggered releasing > will trigger a re-allocation of the task slot right inside > {{SingleTaskSlot#release(...)}}. So that a previous allocation > in {{SloSharingManager#allTaskSlots}} will be replaced by the new allocation > because they share the same {{slotRequestId}}. > However, the {{SingleTaskSlot#release(...)}} will then invoke > {{MultiTaskSlot#releaseChild}} to release the previous allocation with the > {{slotRequestId}}, which will unexpectedly remove the new allocation from the > {{SloSharingManager}}. > In this way, slot leak happens because the pending slot request is not > tracked by the {{SloSharingManager}} and cannot be released when its payload > terminates. > A test case {{testNoSlotLeakOnSharedSlotOversubscribedException}} which > exhibits this issue can be found in this > [commit|https://github.com/zhuzhurk/flink/commit/9024e2e9eb4bd17f371896d6dbc745bc9e585e14]. > The slot leak blocks the TPC-DS queries on flink 1.10, see FLINK-14674. > To solve it, I'd propose to strengthen the {{MultiTaskSlot#releaseChild}} to > only remove its true child task slot from the {{SloSharingManager}}, i.e. add > a check {{if (child == allTaskSlots.get(child.getSlotRequestId()))}} before > invoking {{allTaskSlots.remove(child.getSlotRequestId())}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344586291 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplTest.java ## @@ -18,166 +18,218 @@ package org.apache.flink.runtime.rest.handler.legacy.backpressure; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.TestLogger; -import org.junit.Assert; import org.junit.Test; -import org.mockito.Matchers; -import org.mockito.Mockito; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + /** - * Tests for the BackPressureStatsTrackerImpl. + * Tests for the {@link BackPressureStatsTrackerImpl}. */ public class BackPressureStatsTrackerImplTest extends TestLogger { - /** Tests simple statistics with fake stack traces. */ @Test - @SuppressWarnings("unchecked") - public void testTriggerStackTraceSample() throws Exception { - CompletableFuture sampleFuture = new CompletableFuture<>(); - - StackTraceSampleCoordinator sampleCoordinator = Mockito.mock(StackTraceSampleCoordinator.class); - Mockito.when(sampleCoordinator.triggerStackTraceSample( - Matchers.any(ExecutionVertex[].class), - Matchers.anyInt(), - Matchers.any(Time.class), - Matchers.anyInt())).thenReturn(sampleFuture); - - ExecutionGraph graph = Mockito.mock(ExecutionGraph.class); - Mockito.when(graph.getState()).thenReturn(JobStatus.RUNNING); - - // Same Thread execution context - Mockito.when(graph.getFutureExecutor()).thenReturn(new Executor() { - - @Override - public void execute(Runnable runnable) { - runnable.run(); - } - }); - - ExecutionVertex[] taskVertices = new ExecutionVertex[4]; - - ExecutionJobVertex jobVertex = Mockito.mock(ExecutionJobVertex.class); - Mockito.when(jobVertex.getJobId()).thenReturn(new JobID()); - Mockito.when(jobVertex.getJobVertexId()).thenReturn(new JobVertexID()); - Mockito.when(jobVertex.getGraph()).thenReturn(graph); - Mockito.when(jobVertex.getTaskVertices()).thenReturn(taskVertices); - - taskVertices[0] = mockExecutionVertex(jobVertex, 0); - taskVertices[1] = mockExecutionVertex(jobVertex, 1); - taskVertices[2] = mockExecutionVertex(jobVertex, 2); - taskVertices[3] = mockExecutionVertex(jobVertex, 3); - - int numSamples = 100; - Time delayBetweenSamples = Time.milliseconds(100L); - - BackPressureStatsTrackerImpl tracker = new BackPressureStatsTrackerImpl( - sampleCoordinator, , numSamples, Integer.MAX_VALUE, delayBetweenSamples); - - // getOperatorBackPressureStats triggers stack trace sampling - Assert.assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent()); - - Mockito.verify(sampleCoordinator, Mockito.times(1)).triggerStackTraceSample( - Matchers.eq(taskVertices), - Matchers.eq(numSamples), - Matchers.eq(delayBetweenSamples), - Matchers.eq(BackPressureStatsTrackerImpl.MAX_STACK_TRACE_DEPTH)); - - // Request back pressure stats again. This should not trigger another sample request - Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent()); - - Mockito.verify(sampleCoordinator, Mockito.times(1)).triggerStackTraceSample( - Matchers.eq(taskVertices), - Matchers.eq(numSamples), -
[GitHub] [flink] flinkbot edited a comment on issue #10146: [FLINK-14188][runtime] TaskExecutor derive and register with default slot resource profile
flinkbot edited a comment on issue #10146: [FLINK-14188][runtime] TaskExecutor derive and register with default slot resource profile URL: https://github.com/apache/flink/pull/10146#issuecomment-552307115 ## CI report: * 25f9e4b87846e5a736aa329c834f82962e1f50c4 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/135875925) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-14701) Slot leaks if SharedSlotOversubscribedException happens
Zhu Zhu created FLINK-14701: --- Summary: Slot leaks if SharedSlotOversubscribedException happens Key: FLINK-14701 URL: https://issues.apache.org/jira/browse/FLINK-14701 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.10.0, 1.9.2 Reporter: Zhu Zhu Fix For: 1.10.0, 1.9.2 If a {{SharedSlotOversubscribedException}} happens, the {{MultiTaskSlot}} will release some of its child {{SingleTaskSlot}}. The triggered releasing will trigger a re-allocation of the task slot right inside {{SingleTaskSlot#release(...)}}. So that a previous allocation in {{SloSharingManager#allTaskSlots}} will be replaced by the new allocation because they share the same {{slotRequestId}}. However, the {{SingleTaskSlot#release(...)}} will then invoke {{MultiTaskSlot#releaseChild}} to release the previous allocation with the {{slotRequestId}}, which will unexpectedly remove the new allocation from the {{SloSharingManager}}. In this way, slot leak happens because the pending slot request is not tracked by the {{SloSharingManager}} and cannot be released when its payload terminates. A test case {{testNoSlotLeakOnSharedSlotOversubscribedException}} which exhibits this issue can be found in this [commit|https://github.com/zhuzhurk/flink/commit/9024e2e9eb4bd17f371896d6dbc745bc9e585e14]. The slot leak blocks the TPC-DS queries on flink 1.10, see FLINK-14674. To solve it, I'd propose to strengthen the {{MultiTaskSlot#releaseChild}} to only remove its true child task slot from the {{SloSharingManager}}, i.e. add a check {{if (child == allTaskSlots.get(child.getSlotRequestId()))}} before invoking {{allTaskSlots.remove(child.getSlotRequestId())}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #10106: [FLINK-11463][e2e] Design the e2e java framework so that at least the Kafka streaming tests can run on it
flinkbot edited a comment on issue #10106: [FLINK-11463][e2e] Design the e2e java framework so that at least the Kafka streaming tests can run on it URL: https://github.com/apache/flink/pull/10106#issuecomment-550309075 ## CI report: * 9817ef9c8be75fd02ccef3825e706497f3e6c1b6 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/135261385) * 48033e52f0aa852d03c351319d464dedab3c8088 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/135397835) * 75bf191da0a22986a3877236a0f4be9dbe2e8606 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/135874115) * 676cae9dbfa13d89e1618655da7b67cba8b647d4 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/135875904) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344584416 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplTest.java ## @@ -18,166 +18,218 @@ package org.apache.flink.runtime.rest.handler.legacy.backpressure; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.TestLogger; -import org.junit.Assert; import org.junit.Test; -import org.mockito.Matchers; -import org.mockito.Mockito; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + /** - * Tests for the BackPressureStatsTrackerImpl. + * Tests for the {@link BackPressureStatsTrackerImpl}. */ public class BackPressureStatsTrackerImplTest extends TestLogger { - /** Tests simple statistics with fake stack traces. */ @Test - @SuppressWarnings("unchecked") - public void testTriggerStackTraceSample() throws Exception { - CompletableFuture sampleFuture = new CompletableFuture<>(); - - StackTraceSampleCoordinator sampleCoordinator = Mockito.mock(StackTraceSampleCoordinator.class); - Mockito.when(sampleCoordinator.triggerStackTraceSample( - Matchers.any(ExecutionVertex[].class), - Matchers.anyInt(), - Matchers.any(Time.class), - Matchers.anyInt())).thenReturn(sampleFuture); - - ExecutionGraph graph = Mockito.mock(ExecutionGraph.class); - Mockito.when(graph.getState()).thenReturn(JobStatus.RUNNING); - - // Same Thread execution context - Mockito.when(graph.getFutureExecutor()).thenReturn(new Executor() { - - @Override - public void execute(Runnable runnable) { - runnable.run(); - } - }); - - ExecutionVertex[] taskVertices = new ExecutionVertex[4]; - - ExecutionJobVertex jobVertex = Mockito.mock(ExecutionJobVertex.class); - Mockito.when(jobVertex.getJobId()).thenReturn(new JobID()); - Mockito.when(jobVertex.getJobVertexId()).thenReturn(new JobVertexID()); - Mockito.when(jobVertex.getGraph()).thenReturn(graph); - Mockito.when(jobVertex.getTaskVertices()).thenReturn(taskVertices); - - taskVertices[0] = mockExecutionVertex(jobVertex, 0); - taskVertices[1] = mockExecutionVertex(jobVertex, 1); - taskVertices[2] = mockExecutionVertex(jobVertex, 2); - taskVertices[3] = mockExecutionVertex(jobVertex, 3); - - int numSamples = 100; - Time delayBetweenSamples = Time.milliseconds(100L); - - BackPressureStatsTrackerImpl tracker = new BackPressureStatsTrackerImpl( - sampleCoordinator, , numSamples, Integer.MAX_VALUE, delayBetweenSamples); - - // getOperatorBackPressureStats triggers stack trace sampling - Assert.assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent()); - - Mockito.verify(sampleCoordinator, Mockito.times(1)).triggerStackTraceSample( - Matchers.eq(taskVertices), - Matchers.eq(numSamples), - Matchers.eq(delayBetweenSamples), - Matchers.eq(BackPressureStatsTrackerImpl.MAX_STACK_TRACE_DEPTH)); - - // Request back pressure stats again. This should not trigger another sample request - Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent()); - - Mockito.verify(sampleCoordinator, Mockito.times(1)).triggerStackTraceSample( - Matchers.eq(taskVertices), - Matchers.eq(numSamples), -
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344583750 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplTest.java ## @@ -18,166 +18,218 @@ package org.apache.flink.runtime.rest.handler.legacy.backpressure; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.TestLogger; -import org.junit.Assert; import org.junit.Test; -import org.mockito.Matchers; -import org.mockito.Mockito; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + /** - * Tests for the BackPressureStatsTrackerImpl. + * Tests for the {@link BackPressureStatsTrackerImpl}. */ public class BackPressureStatsTrackerImplTest extends TestLogger { - /** Tests simple statistics with fake stack traces. */ @Test - @SuppressWarnings("unchecked") - public void testTriggerStackTraceSample() throws Exception { - CompletableFuture sampleFuture = new CompletableFuture<>(); - - StackTraceSampleCoordinator sampleCoordinator = Mockito.mock(StackTraceSampleCoordinator.class); - Mockito.when(sampleCoordinator.triggerStackTraceSample( - Matchers.any(ExecutionVertex[].class), - Matchers.anyInt(), - Matchers.any(Time.class), - Matchers.anyInt())).thenReturn(sampleFuture); - - ExecutionGraph graph = Mockito.mock(ExecutionGraph.class); - Mockito.when(graph.getState()).thenReturn(JobStatus.RUNNING); - - // Same Thread execution context - Mockito.when(graph.getFutureExecutor()).thenReturn(new Executor() { - - @Override - public void execute(Runnable runnable) { - runnable.run(); - } - }); - - ExecutionVertex[] taskVertices = new ExecutionVertex[4]; - - ExecutionJobVertex jobVertex = Mockito.mock(ExecutionJobVertex.class); - Mockito.when(jobVertex.getJobId()).thenReturn(new JobID()); - Mockito.when(jobVertex.getJobVertexId()).thenReturn(new JobVertexID()); - Mockito.when(jobVertex.getGraph()).thenReturn(graph); - Mockito.when(jobVertex.getTaskVertices()).thenReturn(taskVertices); - - taskVertices[0] = mockExecutionVertex(jobVertex, 0); - taskVertices[1] = mockExecutionVertex(jobVertex, 1); - taskVertices[2] = mockExecutionVertex(jobVertex, 2); - taskVertices[3] = mockExecutionVertex(jobVertex, 3); - - int numSamples = 100; - Time delayBetweenSamples = Time.milliseconds(100L); - - BackPressureStatsTrackerImpl tracker = new BackPressureStatsTrackerImpl( - sampleCoordinator, , numSamples, Integer.MAX_VALUE, delayBetweenSamples); - - // getOperatorBackPressureStats triggers stack trace sampling - Assert.assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent()); - - Mockito.verify(sampleCoordinator, Mockito.times(1)).triggerStackTraceSample( - Matchers.eq(taskVertices), - Matchers.eq(numSamples), - Matchers.eq(delayBetweenSamples), - Matchers.eq(BackPressureStatsTrackerImpl.MAX_STACK_TRACE_DEPTH)); - - // Request back pressure stats again. This should not trigger another sample request - Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent()); - - Mockito.verify(sampleCoordinator, Mockito.times(1)).triggerStackTraceSample( - Matchers.eq(taskVertices), - Matchers.eq(numSamples), -
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344583125 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplTest.java ## @@ -18,166 +18,218 @@ package org.apache.flink.runtime.rest.handler.legacy.backpressure; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.TestLogger; -import org.junit.Assert; import org.junit.Test; -import org.mockito.Matchers; -import org.mockito.Mockito; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + /** - * Tests for the BackPressureStatsTrackerImpl. + * Tests for the {@link BackPressureStatsTrackerImpl}. */ public class BackPressureStatsTrackerImplTest extends TestLogger { - /** Tests simple statistics with fake stack traces. */ @Test - @SuppressWarnings("unchecked") - public void testTriggerStackTraceSample() throws Exception { - CompletableFuture sampleFuture = new CompletableFuture<>(); - - StackTraceSampleCoordinator sampleCoordinator = Mockito.mock(StackTraceSampleCoordinator.class); - Mockito.when(sampleCoordinator.triggerStackTraceSample( - Matchers.any(ExecutionVertex[].class), - Matchers.anyInt(), - Matchers.any(Time.class), - Matchers.anyInt())).thenReturn(sampleFuture); - - ExecutionGraph graph = Mockito.mock(ExecutionGraph.class); - Mockito.when(graph.getState()).thenReturn(JobStatus.RUNNING); - - // Same Thread execution context - Mockito.when(graph.getFutureExecutor()).thenReturn(new Executor() { - - @Override - public void execute(Runnable runnable) { - runnable.run(); - } - }); - - ExecutionVertex[] taskVertices = new ExecutionVertex[4]; - - ExecutionJobVertex jobVertex = Mockito.mock(ExecutionJobVertex.class); - Mockito.when(jobVertex.getJobId()).thenReturn(new JobID()); - Mockito.when(jobVertex.getJobVertexId()).thenReturn(new JobVertexID()); - Mockito.when(jobVertex.getGraph()).thenReturn(graph); - Mockito.when(jobVertex.getTaskVertices()).thenReturn(taskVertices); - - taskVertices[0] = mockExecutionVertex(jobVertex, 0); - taskVertices[1] = mockExecutionVertex(jobVertex, 1); - taskVertices[2] = mockExecutionVertex(jobVertex, 2); - taskVertices[3] = mockExecutionVertex(jobVertex, 3); - - int numSamples = 100; - Time delayBetweenSamples = Time.milliseconds(100L); - - BackPressureStatsTrackerImpl tracker = new BackPressureStatsTrackerImpl( - sampleCoordinator, , numSamples, Integer.MAX_VALUE, delayBetweenSamples); - - // getOperatorBackPressureStats triggers stack trace sampling - Assert.assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent()); - - Mockito.verify(sampleCoordinator, Mockito.times(1)).triggerStackTraceSample( - Matchers.eq(taskVertices), - Matchers.eq(numSamples), - Matchers.eq(delayBetweenSamples), - Matchers.eq(BackPressureStatsTrackerImpl.MAX_STACK_TRACE_DEPTH)); - - // Request back pressure stats again. This should not trigger another sample request - Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent()); - - Mockito.verify(sampleCoordinator, Mockito.times(1)).triggerStackTraceSample( - Matchers.eq(taskVertices), - Matchers.eq(numSamples), -
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344582415 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplTest.java ## @@ -18,166 +18,218 @@ package org.apache.flink.runtime.rest.handler.legacy.backpressure; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.TestLogger; -import org.junit.Assert; import org.junit.Test; -import org.mockito.Matchers; -import org.mockito.Mockito; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + /** - * Tests for the BackPressureStatsTrackerImpl. + * Tests for the {@link BackPressureStatsTrackerImpl}. */ public class BackPressureStatsTrackerImplTest extends TestLogger { - /** Tests simple statistics with fake stack traces. */ @Test - @SuppressWarnings("unchecked") - public void testTriggerStackTraceSample() throws Exception { - CompletableFuture sampleFuture = new CompletableFuture<>(); - - StackTraceSampleCoordinator sampleCoordinator = Mockito.mock(StackTraceSampleCoordinator.class); - Mockito.when(sampleCoordinator.triggerStackTraceSample( - Matchers.any(ExecutionVertex[].class), - Matchers.anyInt(), - Matchers.any(Time.class), - Matchers.anyInt())).thenReturn(sampleFuture); - - ExecutionGraph graph = Mockito.mock(ExecutionGraph.class); - Mockito.when(graph.getState()).thenReturn(JobStatus.RUNNING); - - // Same Thread execution context - Mockito.when(graph.getFutureExecutor()).thenReturn(new Executor() { - - @Override - public void execute(Runnable runnable) { - runnable.run(); - } - }); - - ExecutionVertex[] taskVertices = new ExecutionVertex[4]; - - ExecutionJobVertex jobVertex = Mockito.mock(ExecutionJobVertex.class); - Mockito.when(jobVertex.getJobId()).thenReturn(new JobID()); - Mockito.when(jobVertex.getJobVertexId()).thenReturn(new JobVertexID()); - Mockito.when(jobVertex.getGraph()).thenReturn(graph); - Mockito.when(jobVertex.getTaskVertices()).thenReturn(taskVertices); - - taskVertices[0] = mockExecutionVertex(jobVertex, 0); - taskVertices[1] = mockExecutionVertex(jobVertex, 1); - taskVertices[2] = mockExecutionVertex(jobVertex, 2); - taskVertices[3] = mockExecutionVertex(jobVertex, 3); - - int numSamples = 100; - Time delayBetweenSamples = Time.milliseconds(100L); - - BackPressureStatsTrackerImpl tracker = new BackPressureStatsTrackerImpl( - sampleCoordinator, , numSamples, Integer.MAX_VALUE, delayBetweenSamples); - - // getOperatorBackPressureStats triggers stack trace sampling - Assert.assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent()); - - Mockito.verify(sampleCoordinator, Mockito.times(1)).triggerStackTraceSample( - Matchers.eq(taskVertices), - Matchers.eq(numSamples), - Matchers.eq(delayBetweenSamples), - Matchers.eq(BackPressureStatsTrackerImpl.MAX_STACK_TRACE_DEPTH)); - - // Request back pressure stats again. This should not trigger another sample request - Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent()); - - Mockito.verify(sampleCoordinator, Mockito.times(1)).triggerStackTraceSample( - Matchers.eq(taskVertices), - Matchers.eq(numSamples), -
[GitHub] [flink] openinx edited a comment on issue #10106: [FLINK-11463][e2e] Design the e2e java framework so that at least the Kafka streaming tests can run on it
openinx edited a comment on issue #10106: [FLINK-11463][e2e] Design the e2e java framework so that at least the Kafka streaming tests can run on it URL: https://github.com/apache/flink/pull/10106#issuecomment-552298571 Update the pull request: 1. use the Files.walkFileTree to copy the directory of flink dist dir to testing directory; 2. Made the kafka resources and tests into a separate sub-module to eliminate the dependency; 3. Remove the getRestPort ( which parse rest.port in flink-conf.yaml) . Will file separate issue to address the random port binding (https://issues.apache.org/jira/browse/FLINK-14700). FYI @zentol , Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-14700) Consider to set the random listen port in e2e flink cluster.
Zheng Hu created FLINK-14700: Summary: Consider to set the random listen port in e2e flink cluster. Key: FLINK-14700 URL: https://issues.apache.org/jira/browse/FLINK-14700 Project: Flink Issue Type: Sub-task Reporter: Zheng Hu Have a discussion with [~chesnay] in PR https://github.com/apache/flink/pull/10106 , we plan to set the ports of e2e testing flink cluster to be 0, which means it will select the port randomly to bind & listen. That would be helpful to avoid port conflicts when running parallel e2e tests. But that need some way to find the listen port because the tests will wait the port to be available ( by requesting to the port repeatably). we discussed the three ways to find the port: 1. Sweep the listening ports; 2. MiniCluster pre-check the available ports and choose one to listen , then persist the port to flink-conf.xml; 3. A regular parser to extract the binded port in log messages; ... Anyway, filed the issue to address the things here, will try to fix it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344581857 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplTest.java ## @@ -18,166 +18,218 @@ package org.apache.flink.runtime.rest.handler.legacy.backpressure; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.TestLogger; -import org.junit.Assert; import org.junit.Test; -import org.mockito.Matchers; -import org.mockito.Mockito; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + /** - * Tests for the BackPressureStatsTrackerImpl. + * Tests for the {@link BackPressureStatsTrackerImpl}. */ public class BackPressureStatsTrackerImplTest extends TestLogger { - /** Tests simple statistics with fake stack traces. */ @Test - @SuppressWarnings("unchecked") - public void testTriggerStackTraceSample() throws Exception { - CompletableFuture sampleFuture = new CompletableFuture<>(); - - StackTraceSampleCoordinator sampleCoordinator = Mockito.mock(StackTraceSampleCoordinator.class); - Mockito.when(sampleCoordinator.triggerStackTraceSample( - Matchers.any(ExecutionVertex[].class), - Matchers.anyInt(), - Matchers.any(Time.class), - Matchers.anyInt())).thenReturn(sampleFuture); - - ExecutionGraph graph = Mockito.mock(ExecutionGraph.class); - Mockito.when(graph.getState()).thenReturn(JobStatus.RUNNING); - - // Same Thread execution context - Mockito.when(graph.getFutureExecutor()).thenReturn(new Executor() { - - @Override - public void execute(Runnable runnable) { - runnable.run(); - } - }); - - ExecutionVertex[] taskVertices = new ExecutionVertex[4]; - - ExecutionJobVertex jobVertex = Mockito.mock(ExecutionJobVertex.class); - Mockito.when(jobVertex.getJobId()).thenReturn(new JobID()); - Mockito.when(jobVertex.getJobVertexId()).thenReturn(new JobVertexID()); - Mockito.when(jobVertex.getGraph()).thenReturn(graph); - Mockito.when(jobVertex.getTaskVertices()).thenReturn(taskVertices); - - taskVertices[0] = mockExecutionVertex(jobVertex, 0); - taskVertices[1] = mockExecutionVertex(jobVertex, 1); - taskVertices[2] = mockExecutionVertex(jobVertex, 2); - taskVertices[3] = mockExecutionVertex(jobVertex, 3); - - int numSamples = 100; - Time delayBetweenSamples = Time.milliseconds(100L); - - BackPressureStatsTrackerImpl tracker = new BackPressureStatsTrackerImpl( - sampleCoordinator, , numSamples, Integer.MAX_VALUE, delayBetweenSamples); - - // getOperatorBackPressureStats triggers stack trace sampling - Assert.assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent()); - - Mockito.verify(sampleCoordinator, Mockito.times(1)).triggerStackTraceSample( - Matchers.eq(taskVertices), - Matchers.eq(numSamples), - Matchers.eq(delayBetweenSamples), - Matchers.eq(BackPressureStatsTrackerImpl.MAX_STACK_TRACE_DEPTH)); - - // Request back pressure stats again. This should not trigger another sample request - Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent()); - - Mockito.verify(sampleCoordinator, Mockito.times(1)).triggerStackTraceSample( - Matchers.eq(taskVertices), - Matchers.eq(numSamples), -
[GitHub] [flink] docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner
docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner URL: https://github.com/apache/flink/pull/10105#discussion_r344581143 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala ## @@ -370,8 +373,56 @@ object GenerateUtils { generateNonNullLiteral(literalType, literalValue.toString, literalValue) case TIMESTAMP_WITHOUT_TIME_ZONE => -val millis = literalValue.asInstanceOf[Long] -generateNonNullLiteral(literalType, millis + "L", millis) +def getNanoOfMillisSinceEpoch(timestampString: TimestampString): Int = { + val v = timestampString.toString() + val length = v.length + val nanoOfSeconds = length match { +case 19 | 20 => 0 +case _ => + JInteger.valueOf(v.substring(20)) * pow(10, 9 - (length - 20)).intValue() + } + nanoOfSeconds % 100 +} + +// TODO: we copied the logical of TimestampString::getMillisSinceEpoch since the copied +// DateTimeUtils.ymdToJulian is wrong. Review comment: [FLINK-11935](https://issues.apache.org/jira/browse/FLINK-11935) should do this, and after that this copied code could be removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344581031 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplTest.java ## @@ -18,166 +18,218 @@ package org.apache.flink.runtime.rest.handler.legacy.backpressure; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.TestLogger; -import org.junit.Assert; import org.junit.Test; -import org.mockito.Matchers; -import org.mockito.Mockito; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + /** - * Tests for the BackPressureStatsTrackerImpl. + * Tests for the {@link BackPressureStatsTrackerImpl}. */ public class BackPressureStatsTrackerImplTest extends TestLogger { - /** Tests simple statistics with fake stack traces. */ @Test - @SuppressWarnings("unchecked") - public void testTriggerStackTraceSample() throws Exception { - CompletableFuture sampleFuture = new CompletableFuture<>(); - - StackTraceSampleCoordinator sampleCoordinator = Mockito.mock(StackTraceSampleCoordinator.class); - Mockito.when(sampleCoordinator.triggerStackTraceSample( - Matchers.any(ExecutionVertex[].class), - Matchers.anyInt(), - Matchers.any(Time.class), - Matchers.anyInt())).thenReturn(sampleFuture); - - ExecutionGraph graph = Mockito.mock(ExecutionGraph.class); - Mockito.when(graph.getState()).thenReturn(JobStatus.RUNNING); - - // Same Thread execution context - Mockito.when(graph.getFutureExecutor()).thenReturn(new Executor() { - - @Override - public void execute(Runnable runnable) { - runnable.run(); - } - }); - - ExecutionVertex[] taskVertices = new ExecutionVertex[4]; - - ExecutionJobVertex jobVertex = Mockito.mock(ExecutionJobVertex.class); - Mockito.when(jobVertex.getJobId()).thenReturn(new JobID()); - Mockito.when(jobVertex.getJobVertexId()).thenReturn(new JobVertexID()); - Mockito.when(jobVertex.getGraph()).thenReturn(graph); - Mockito.when(jobVertex.getTaskVertices()).thenReturn(taskVertices); - - taskVertices[0] = mockExecutionVertex(jobVertex, 0); - taskVertices[1] = mockExecutionVertex(jobVertex, 1); - taskVertices[2] = mockExecutionVertex(jobVertex, 2); - taskVertices[3] = mockExecutionVertex(jobVertex, 3); - - int numSamples = 100; - Time delayBetweenSamples = Time.milliseconds(100L); - - BackPressureStatsTrackerImpl tracker = new BackPressureStatsTrackerImpl( - sampleCoordinator, , numSamples, Integer.MAX_VALUE, delayBetweenSamples); - - // getOperatorBackPressureStats triggers stack trace sampling - Assert.assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent()); - - Mockito.verify(sampleCoordinator, Mockito.times(1)).triggerStackTraceSample( - Matchers.eq(taskVertices), - Matchers.eq(numSamples), - Matchers.eq(delayBetweenSamples), - Matchers.eq(BackPressureStatsTrackerImpl.MAX_STACK_TRACE_DEPTH)); - - // Request back pressure stats again. This should not trigger another sample request - Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent()); - - Mockito.verify(sampleCoordinator, Mockito.times(1)).triggerStackTraceSample( - Matchers.eq(taskVertices), - Matchers.eq(numSamples), -
[GitHub] [flink] docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner
docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner URL: https://github.com/apache/flink/pull/10105#discussion_r344580399 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala ## @@ -370,8 +373,56 @@ object GenerateUtils { generateNonNullLiteral(literalType, literalValue.toString, literalValue) case TIMESTAMP_WITHOUT_TIME_ZONE => -val millis = literalValue.asInstanceOf[Long] -generateNonNullLiteral(literalType, millis + "L", millis) +def getNanoOfMillisSinceEpoch(timestampString: TimestampString): Int = { + val v = timestampString.toString() + val length = v.length + val nanoOfSeconds = length match { +case 19 | 20 => 0 +case _ => + JInteger.valueOf(v.substring(20)) * pow(10, 9 - (length - 20)).intValue() + } + nanoOfSeconds % 100 +} + +// TODO: we copied the logical of TimestampString::getMillisSinceEpoch since the copied +// DateTimeUtils.ymdToJulian is wrong. Review comment: Two reasons to not fixing our copied `DateTimeUtils` in this PR 1) two copies of `DateTimeUtils` should remain the same in legacy planner and blink planner 2) and the impact should be evaluated for both planner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344580223 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplTest.java ## @@ -18,166 +18,218 @@ package org.apache.flink.runtime.rest.handler.legacy.backpressure; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.TestLogger; -import org.junit.Assert; import org.junit.Test; -import org.mockito.Matchers; -import org.mockito.Mockito; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + /** - * Tests for the BackPressureStatsTrackerImpl. + * Tests for the {@link BackPressureStatsTrackerImpl}. */ public class BackPressureStatsTrackerImplTest extends TestLogger { - /** Tests simple statistics with fake stack traces. */ @Test - @SuppressWarnings("unchecked") - public void testTriggerStackTraceSample() throws Exception { - CompletableFuture sampleFuture = new CompletableFuture<>(); - - StackTraceSampleCoordinator sampleCoordinator = Mockito.mock(StackTraceSampleCoordinator.class); - Mockito.when(sampleCoordinator.triggerStackTraceSample( - Matchers.any(ExecutionVertex[].class), - Matchers.anyInt(), - Matchers.any(Time.class), - Matchers.anyInt())).thenReturn(sampleFuture); - - ExecutionGraph graph = Mockito.mock(ExecutionGraph.class); - Mockito.when(graph.getState()).thenReturn(JobStatus.RUNNING); - - // Same Thread execution context - Mockito.when(graph.getFutureExecutor()).thenReturn(new Executor() { - - @Override - public void execute(Runnable runnable) { - runnable.run(); - } - }); - - ExecutionVertex[] taskVertices = new ExecutionVertex[4]; - - ExecutionJobVertex jobVertex = Mockito.mock(ExecutionJobVertex.class); - Mockito.when(jobVertex.getJobId()).thenReturn(new JobID()); - Mockito.when(jobVertex.getJobVertexId()).thenReturn(new JobVertexID()); - Mockito.when(jobVertex.getGraph()).thenReturn(graph); - Mockito.when(jobVertex.getTaskVertices()).thenReturn(taskVertices); - - taskVertices[0] = mockExecutionVertex(jobVertex, 0); - taskVertices[1] = mockExecutionVertex(jobVertex, 1); - taskVertices[2] = mockExecutionVertex(jobVertex, 2); - taskVertices[3] = mockExecutionVertex(jobVertex, 3); - - int numSamples = 100; - Time delayBetweenSamples = Time.milliseconds(100L); - - BackPressureStatsTrackerImpl tracker = new BackPressureStatsTrackerImpl( - sampleCoordinator, , numSamples, Integer.MAX_VALUE, delayBetweenSamples); - - // getOperatorBackPressureStats triggers stack trace sampling - Assert.assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent()); - - Mockito.verify(sampleCoordinator, Mockito.times(1)).triggerStackTraceSample( - Matchers.eq(taskVertices), - Matchers.eq(numSamples), - Matchers.eq(delayBetweenSamples), - Matchers.eq(BackPressureStatsTrackerImpl.MAX_STACK_TRACE_DEPTH)); - - // Request back pressure stats again. This should not trigger another sample request - Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent()); - - Mockito.verify(sampleCoordinator, Mockito.times(1)).triggerStackTraceSample( - Matchers.eq(taskVertices), - Matchers.eq(numSamples), -
[jira] [Created] (FLINK-14699) Move ClosureCleaner to flink-core
Zili Chen created FLINK-14699: - Summary: Move ClosureCleaner to flink-core Key: FLINK-14699 URL: https://issues.apache.org/jira/browse/FLINK-14699 Project: Flink Issue Type: Improvement Reporter: Zili Chen Fix For: 1.10.0 {{ClosureCleaner}} is currently under {{flink-java}}. However, it doesn't stick to {{flink-java}} and used in {{flink-streaming-java}}. IMHO {{flink-streaming-java}} should not base on {{flink-java}}( {{flink-batch-java}} in fact ). Thus, I propose to move {{ClosureCleaner}} to {{flink-core}}. CC [~chesnay] [~aljoscha] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] TisonKun commented on a change in pull request #10143: [FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main thread
TisonKun commented on a change in pull request #10143: [FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main thread URL: https://github.com/apache/flink/pull/10143#discussion_r344577356 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ## @@ -641,4 +610,32 @@ static void require(boolean condition, String message, Object... values) { } } + /** +* Get dynamic properties based on two Flink configuration. If base config does not contain and target config +* contains the key or the value is different, it should be added to results. Otherwise, if the base config contains +* and target config does not contain the key, it will be ignored. +* @param baseConfig The base configuration. +* @param targetConfig The target configuration. +* @return Dynamic properties as string, separated by space. +*/ + static String getDynamicProperties( + org.apache.flink.configuration.Configuration baseConfig, + org.apache.flink.configuration.Configuration targetConfig) { + + String[] newAddedConfigs = targetConfig.keySet().stream().flatMap( + (String key) -> { + final String baseValue = baseConfig.getString(ConfigOptions.key(key).stringType().noDefaultValue()); + final String targetValue = targetConfig.getString(ConfigOptions.key(key).stringType().noDefaultValue()); + + if (!baseConfig.keySet().contains(key) || !baseValue.equals(targetValue)) { + return Stream.of("-" + CommandLineOptions.DYNAMIC_PROPERTY_OPTION.getOpt() + key + + CommandLineOptions.DYNAMIC_PROPERTY_OPTION.getValueSeparator() + targetValue); + } else { + return Stream.empty(); + } + }) + .toArray(String[]::new); + return org.apache.commons.lang3.StringUtils.join(newAddedConfigs, " "); Review comment: I'm not sure whether `java.lang.String.join` has the same semantic of `o.a.c.lang3.StringUtils.join`, but if it is not so different, I tend to use `java.lang.String.join` :-) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10146: [FLINK-14188][runtime] TaskExecutor derive and register with default slot resource profile
flinkbot edited a comment on issue #10146: [FLINK-14188][runtime] TaskExecutor derive and register with default slot resource profile URL: https://github.com/apache/flink/pull/10146#issuecomment-552307115 ## CI report: * 25f9e4b87846e5a736aa329c834f82962e1f50c4 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/135875925) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner
KurtYoung commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner URL: https://github.com/apache/flink/pull/10105#discussion_r344573790 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala ## @@ -37,6 +37,17 @@ import java.util.{Locale, TimeZone} class TemporalTypesTest extends ExpressionTestBase { Review comment: That would be great. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner
KurtYoung commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner URL: https://github.com/apache/flink/pull/10105#discussion_r344573698 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ArrayTypeTest.scala ## @@ -94,6 +96,48 @@ class ArrayTypeTest extends ArrayTypeTestBase { "ARRAY[TIMESTAMP '1985-04-11 14:15:16', TIMESTAMP '2018-07-26 17:18:19']", "[1985-04-11 14:15:16.000, 2018-07-26 17:18:19.000]") +// localDateTime use DateTimeUtils.timestampStringToUnixDate to parse a time string, +// which only support millisecond's precision. +testTableApi( + Array( +JLocalDateTime.of(1985, 4, 11, 14, 15, 16, 123456789), +JLocalDateTime.of(2018, 7, 26, 17, 18, 19, 123456789)), +"[1985-04-11T14:15:16.123456789, 2018-07-26T17:18:19.123456789]") + +testTableApi( + Array( +JLocalDateTime.of(1985, 4, 11, 14, 15, 16, 123456700), +JLocalDateTime.of(2018, 7, 26, 17, 18, 19, 123456700)), + "[1985-04-11T14:15:16.123456700, 2018-07-26T17:18:19.123456700]") + +testTableApi( + Array( +JLocalDateTime.of(1985, 4, 11, 14, 15, 16, 123456000), +JLocalDateTime.of(2018, 7, 26, 17, 18, 19, 123456000)), + "[1985-04-11T14:15:16.123456, 2018-07-26T17:18:19.123456]") + +testTableApi( + Array( +JLocalDateTime.of(1985, 4, 11, 14, 15, 16, 12340), +JLocalDateTime.of(2018, 7, 26, 17, 18, 19, 12340)), + "[1985-04-11T14:15:16.123400, 2018-07-26T17:18:19.123400]") + +testSqlApi( + "ARRAY[TIMESTAMP '1985-04-11 14:15:16.123456789', TIMESTAMP '2018-07-26 17:18:19.123456789']", + "[1985-04-11T14:15:16.123456789, 2018-07-26T17:18:19.123456789]") + +testSqlApi( + "ARRAY[TIMESTAMP '1985-04-11 14:15:16.1234567', TIMESTAMP '2018-07-26 17:18:19.1234567']", + "[1985-04-11T14:15:16.123456700, 2018-07-26T17:18:19.123456700]") Review comment: Can you check whether this fits the sql standard? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner
KurtYoung commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner URL: https://github.com/apache/flink/pull/10105#discussion_r344573331 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala ## @@ -174,8 +174,8 @@ object ScalarOperatorGens { (l, r) => s"$l $op ((int) ($r / ${MILLIS_PER_DAY}L))" } case TIMESTAMP_WITHOUT_TIME_ZONE => -generateOperatorIfNotNull(ctx, new TimestampType(), left, right) { - (l, r) => s"($l * ${MILLIS_PER_DAY}L) $op $r" +generateOperatorIfNotNull(ctx, new TimestampType(3), left, right) { Review comment: I'm not sure this is the right way to go. We either don't change it and leave it to next issue, or we make it right in this PR. We don't encourage to do some wrong modification and say you will fix it in following jira. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10106: [FLINK-11463][e2e] Design the e2e java framework so that at least the Kafka streaming tests can run on it
flinkbot edited a comment on issue #10106: [FLINK-11463][e2e] Design the e2e java framework so that at least the Kafka streaming tests can run on it URL: https://github.com/apache/flink/pull/10106#issuecomment-550309075 ## CI report: * 9817ef9c8be75fd02ccef3825e706497f3e6c1b6 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/135261385) * 48033e52f0aa852d03c351319d464dedab3c8088 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/135397835) * 75bf191da0a22986a3877236a0f4be9dbe2e8606 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/135874115) * 676cae9dbfa13d89e1618655da7b67cba8b647d4 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/135875904) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner
KurtYoung commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner URL: https://github.com/apache/flink/pull/10105#discussion_r344572884 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala ## @@ -370,8 +373,56 @@ object GenerateUtils { generateNonNullLiteral(literalType, literalValue.toString, literalValue) case TIMESTAMP_WITHOUT_TIME_ZONE => -val millis = literalValue.asInstanceOf[Long] -generateNonNullLiteral(literalType, millis + "L", millis) +def getNanoOfMillisSinceEpoch(timestampString: TimestampString): Int = { + val v = timestampString.toString() + val length = v.length + val nanoOfSeconds = length match { +case 19 | 20 => 0 +case _ => + JInteger.valueOf(v.substring(20)) * pow(10, 9 - (length - 20)).intValue() + } + nanoOfSeconds % 100 +} + +// TODO: we copied the logical of TimestampString::getMillisSinceEpoch since the copied +// DateTimeUtils.ymdToJulian is wrong. Review comment: So how about fixing it in our copied `DateTimeUtils`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner
KurtYoung commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner URL: https://github.com/apache/flink/pull/10105#discussion_r344572768 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeSystem.scala ## @@ -53,6 +53,9 @@ class FlinkTypeSystem extends RelDataTypeSystemImpl { case SqlTypeName.VARCHAR | SqlTypeName.CHAR | SqlTypeName.VARBINARY | SqlTypeName.BINARY => Int.MaxValue +// The maximal precision of TIMESTAMP is 3, change it to 9 to support nanoseconds precision +case SqlTypeName.TIMESTAMP => 9 Review comment: What is the impact of changing it to 9? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-14698) Refactor the SQL CLI parser to reuse flink-sql-parser
[ https://issues.apache.org/jira/browse/FLINK-14698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young closed FLINK-14698. -- Fix Version/s: (was: 1.10.0) Resolution: Duplicate > Refactor the SQL CLI parser to reuse flink-sql-parser > - > > Key: FLINK-14698 > URL: https://issues.apache.org/jira/browse/FLINK-14698 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.9.1 >Reporter: Danny Chen >Priority: Major > > We should reuse the SQL CLI for commands parsing, especially for those > statements that are sql queries. There are at lease 2 benefits i can see: > # To reduce the bugs because the parsing work by regex expression now is very > easy to encounter that for complex queries > # To reduce the redundant parse work, we only need to maintain the > flink-sql-parser -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14698) Refactor the SQL CLI parser to reuse flink-sql-parser
[ https://issues.apache.org/jira/browse/FLINK-14698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16971335#comment-16971335 ] Kurt Young commented on FLINK-14698: I've already create this one: https://issues.apache.org/jira/browse/FLINK-14671 > Refactor the SQL CLI parser to reuse flink-sql-parser > - > > Key: FLINK-14698 > URL: https://issues.apache.org/jira/browse/FLINK-14698 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.9.1 >Reporter: Danny Chen >Priority: Major > Fix For: 1.10.0 > > > We should reuse the SQL CLI for commands parsing, especially for those > statements that are sql queries. There are at lease 2 benefits i can see: > # To reduce the bugs because the parsing work by regex expression now is very > easy to encounter that for complex queries > # To reduce the redundant parse work, we only need to maintain the > flink-sql-parser -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-14666) support multiple catalog in flink table sql
[ https://issues.apache.org/jira/browse/FLINK-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young closed FLINK-14666. -- Fix Version/s: 1.10.0 Resolution: Not A Problem > support multiple catalog in flink table sql > --- > > Key: FLINK-14666 > URL: https://issues.apache.org/jira/browse/FLINK-14666 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.8.0, 1.8.2, 1.9.0, 1.9.1 >Reporter: yuemeng >Priority: Critical > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 20m > Remaining Estimate: 0h > > currently, calcite will only use the current catalog as schema path to > validate sql node, > maybe this is not reasonable > {code} > tableEnvironment.useCatalog("user_catalog"); > tableEnvironment.useDatabase("user_db"); > Table table = tableEnvironment.sqlQuery("SELECT action, os,count(*) as cnt > from music_queue_3 group by action, os,tumble(proctime, INTERVAL '10' > SECOND)"); tableEnvironment.registerTable("v1", table); > Table t2 = tableEnvironment.sqlQuery("select action, os, 1 as cnt from v1"); > tableEnvironment.registerTable("v2", t2); > tableEnvironment.sqlUpdate("INSERT INTO database2.kafka_table_test1 SELECT > action, os,cast (cnt as BIGINT) as cnt from v2"); > {code} > suppose source table music_queue_3 and sink table kafka_table_test1 both in > user_catalog > catalog > but some temp table or view such as v1, v2,v3 will register in default > catalog. > when we select temp table v2 and insert it into our own catalog table > database2.kafka_table_test1 > it always failed with sql node validate, because of schema path in > catalog reader is the current catalog without default catalog,the temp table > or view will never be Identified -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Reopened] (FLINK-14666) support multiple catalog in flink table sql
[ https://issues.apache.org/jira/browse/FLINK-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young reopened FLINK-14666: > support multiple catalog in flink table sql > --- > > Key: FLINK-14666 > URL: https://issues.apache.org/jira/browse/FLINK-14666 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.8.0, 1.8.2, 1.9.0, 1.9.1 >Reporter: yuemeng >Priority: Critical > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > currently, calcite will only use the current catalog as schema path to > validate sql node, > maybe this is not reasonable > {code} > tableEnvironment.useCatalog("user_catalog"); > tableEnvironment.useDatabase("user_db"); > Table table = tableEnvironment.sqlQuery("SELECT action, os,count(*) as cnt > from music_queue_3 group by action, os,tumble(proctime, INTERVAL '10' > SECOND)"); tableEnvironment.registerTable("v1", table); > Table t2 = tableEnvironment.sqlQuery("select action, os, 1 as cnt from v1"); > tableEnvironment.registerTable("v2", t2); > tableEnvironment.sqlUpdate("INSERT INTO database2.kafka_table_test1 SELECT > action, os,cast (cnt as BIGINT) as cnt from v2"); > {code} > suppose source table music_queue_3 and sink table kafka_table_test1 both in > user_catalog > catalog > but some temp table or view such as v1, v2,v3 will register in default > catalog. > when we select temp table v2 and insert it into our own catalog table > database2.kafka_table_test1 > it always failed with sql node validate, because of schema path in > catalog reader is the current catalog without default catalog,the temp table > or view will never be Identified -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] TisonKun commented on a change in pull request #10143: [FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main thread
TisonKun commented on a change in pull request #10143: [FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main thread URL: https://github.com/apache/flink/pull/10143#discussion_r344571496 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java ## @@ -381,7 +381,8 @@ public static String getTaskManagerShellCommand( boolean hasLogback, boolean hasLog4j, boolean hasKrb5, - Class mainClass) { + Class mainClass, Review comment: I think BootstrapTools is an internal class so that we don't stick to it, but it makes sense to add an override method for preventing some compile issue. IIRC it was reported in classes like CloseClosure and CheckpointCoordinator. Also an override makes it smooth for forks to pick think commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] TisonKun commented on a change in pull request #10143: [FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main thread
TisonKun commented on a change in pull request #10143: [FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main thread URL: https://github.com/apache/flink/pull/10143#discussion_r344571496 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java ## @@ -381,7 +381,8 @@ public static String getTaskManagerShellCommand( boolean hasLogback, boolean hasLog4j, boolean hasKrb5, - Class mainClass) { + Class mainClass, Review comment: I think BootstrapTools is an internal class so that we don't stick to it, but it makes sense to add an override method for preventing some compile issue. IIRC it was reported in classes like CloseClosure and CheckpointCoordinator. Also an override makes it smooth for forks to pick this commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #10146: [FLINK-14188][runtime] TaskExecutor derive and register with default slot resource profile
flinkbot commented on issue #10146: [FLINK-14188][runtime] TaskExecutor derive and register with default slot resource profile URL: https://github.com/apache/flink/pull/10146#issuecomment-552307115 ## CI report: * 25f9e4b87846e5a736aa329c834f82962e1f50c4 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10106: [FLINK-11463][e2e] Design the e2e java framework so that at least the Kafka streaming tests can run on it
flinkbot edited a comment on issue #10106: [FLINK-11463][e2e] Design the e2e java framework so that at least the Kafka streaming tests can run on it URL: https://github.com/apache/flink/pull/10106#issuecomment-550309075 ## CI report: * 9817ef9c8be75fd02ccef3825e706497f3e6c1b6 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/135261385) * 48033e52f0aa852d03c351319d464dedab3c8088 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/135397835) * 75bf191da0a22986a3877236a0f4be9dbe2e8606 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/135874115) * 676cae9dbfa13d89e1618655da7b67cba8b647d4 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on issue #10112: [FLINK-14640] Change Type of Field currentExecutions from ConcurrentHashMap to HashMap
yanghua commented on issue #10112: [FLINK-14640] Change Type of Field currentExecutions from ConcurrentHashMap to HashMap URL: https://github.com/apache/flink/pull/10112#issuecomment-552303110 @GJL I have modified the test based on your suggestion. WDYT about the new changes? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #10146: [FLINK-14188][runtime] TaskExecutor derive and register with default slot resource profile
flinkbot commented on issue #10146: [FLINK-14188][runtime] TaskExecutor derive and register with default slot resource profile URL: https://github.com/apache/flink/pull/10146#issuecomment-552302004 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 25f9e4b87846e5a736aa329c834f82962e1f50c4 (Mon Nov 11 05:46:03 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-14188) TaskExecutor derive and register with default slot resource profile
[ https://issues.apache.org/jira/browse/FLINK-14188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-14188: --- Labels: pull-request-available (was: ) > TaskExecutor derive and register with default slot resource profile > --- > > Key: FLINK-14188 > URL: https://issues.apache.org/jira/browse/FLINK-14188 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Xintong Song >Assignee: Xintong Song >Priority: Major > Labels: pull-request-available > > * Introduce config option for defaultSlotFraction > * Derive default slot resource profile from the new config option, or the > legacy config option "taskmanager.numberOfTaskSlots". > * Register task executor with the default slot resource profile. > This step should not introduce any behavior changes. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] xintongsong opened a new pull request #10146: [FLINK-14188][runtime] TaskExecutor derive and register with default slot resource profile
xintongsong opened a new pull request #10146: [FLINK-14188][runtime] TaskExecutor derive and register with default slot resource profile URL: https://github.com/apache/flink/pull/10146 ## What is the purpose of the change This PR is a subtask of FLIP-56. It makes task executor to derive the default slot resource profile, and register it to the resource manager. This PR is based on #9910 and #10141. ## Brief change log - 8f633ca0096ea14ff885025ab6e62115e5529992..88f393794eb6a39c05bc520589fc4ddc2b894d6b: Commits of previous PRs. - e661a3e4bb0dafc3d5c96850e6005460851a1ca2: Introduce config option for default slot resource fraction. - fd1f894e5356184d8371bb9b4512b6d8fd6b7a68: Set container cpu cores into `TaskExecutorResourceSpec`. This will be used for deriving cpu cores of the default slot resource profiles. - 96cb75d13001c6d53861853dc6a1cc18b4d6bb31: Task executor derive default slot resource profile. - 25f9e4b87846e5a736aa329c834f82962e1f50c4: Task executor register default slot resource profile to RM. ## Verifying this change This change added tests and can be verified as follows: - `TaskExecutorResourceUtilsTest#testConfigCpuCores` - `TaskExecutorResourceUtilsTest#testConfigNoCpuCores` - `TaskExecutorResourceUtilsTest#testConfigDefaultSlotFraction` - `TaskExecutorResourceUtilsTest#testConfigDefaultSlotFractionLegacyNumSlots` - `TaskExecutorResourceUtilsTest#testConfigDefaultSlotFractionFailure` - `TaskExecutorTest#testRegisterWithDefaultSlotResourceProfile` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10145: [FLINK-14673][hive] Shouldn't expect HMS client to throw NoSuchObject…
flinkbot edited a comment on issue #10145: [FLINK-14673][hive] Shouldn't expect HMS client to throw NoSuchObject… URL: https://github.com/apache/flink/pull/10145#issuecomment-552287128 ## CI report: * fa9222c68fa3dd623180c52f5b769ed7e733676e : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/135869245) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-14698) Refactor the SQL CLI parser to reuse flink-sql-parser
Danny Chen created FLINK-14698: -- Summary: Refactor the SQL CLI parser to reuse flink-sql-parser Key: FLINK-14698 URL: https://issues.apache.org/jira/browse/FLINK-14698 Project: Flink Issue Type: Sub-task Components: Table SQL / API Affects Versions: 1.9.1 Reporter: Danny Chen Fix For: 1.10.0 We should reuse the SQL CLI for commands parsing, especially for those statements that are sql queries. There are at lease 2 benefits i can see: # To reduce the bugs because the parsing work by regex expression now is very easy to encounter that for complex queries # To reduce the redundant parse work, we only need to maintain the flink-sql-parser -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #10106: [FLINK-11463][e2e] Design the e2e java framework so that at least the Kafka streaming tests can run on it
flinkbot edited a comment on issue #10106: [FLINK-11463][e2e] Design the e2e java framework so that at least the Kafka streaming tests can run on it URL: https://github.com/apache/flink/pull/10106#issuecomment-550309075 ## CI report: * 9817ef9c8be75fd02ccef3825e706497f3e6c1b6 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/135261385) * 48033e52f0aa852d03c351319d464dedab3c8088 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/135397835) * 75bf191da0a22986a3877236a0f4be9dbe2e8606 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lirui-apache commented on issue #10140: [FLINK-14660][sql cli] add 'SHOW MODULES' to SQL CLI
lirui-apache commented on issue #10140: [FLINK-14660][sql cli] add 'SHOW MODULES' to SQL CLI URL: https://github.com/apache/flink/pull/10140#issuecomment-552298911 LGTM but the compile failure seems related. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] openinx commented on issue #10106: [FLINK-11463][e2e] Design the e2e java framework so that at least the Kafka streaming tests can run on it
openinx commented on issue #10106: [FLINK-11463][e2e] Design the e2e java framework so that at least the Kafka streaming tests can run on it URL: https://github.com/apache/flink/pull/10106#issuecomment-552298571 Update the pull request: 1. use the Files.walkFileTree to copy the directory of flink dist dir to testing directory; 2. Made the kafka resources and tests into a separate sub-module to eliminate the dependency; 3. Remove the getRestPort ( which parse rest.port in flink-conf.yaml) . Will file separate issue to address the random port binding . FYI @zentol , Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-14697) Connection timeout occurs while checkpointing
[ https://issues.apache.org/jira/browse/FLINK-14697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dmgkeke updated FLINK-14697: Environment: Flink version : 1.8.0 Yarn cluster mode 8 nodes 16 yarn containers checkpoint path : hdfs was: Flink version : 1.8.0 Yarn cluster mode 8 nodes 16 yarn containers > Connection timeout occurs while checkpointing > - > > Key: FLINK-14697 > URL: https://issues.apache.org/jira/browse/FLINK-14697 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.8.0 > Environment: Flink version : 1.8.0 > Yarn cluster mode > 8 nodes > 16 yarn containers > checkpoint path : hdfs >Reporter: dmgkeke >Priority: Major > > I am currently running a flink streaming application. > It generally works well. > But I have one issue. > Intermittently connection timeout occurs on netty during a checkpoint. > And restarting app by failover strategy > All nodes are not busy, Network traffic is also normal. > But I can't find a solution for this situation. > Please let me know how to solve the problem. > I have written the log trace below. > > > > 2019-11-09 13:59:35,426 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 1412 @ 1573275575348 for job b83d95ff31d96c081f28b4d31b00c000. > 2019-11-09 14:00:23,460 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - session-window > (130/512) (bfeb017b6a81a0b1c2951a39141e6d9d) switched from RUNNING to FAILED. > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: > 연결 시간 초과 (connection to ':') > at > org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:165) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256) > at > org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256) > at > org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:87) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1401) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:953) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.handleReadException(AbstractNioByteChannel.java:125) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:174) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470) > at >
[jira] [Created] (FLINK-14697) Connection timeout occurs while checkpointing
dmgkeke created FLINK-14697: --- Summary: Connection timeout occurs while checkpointing Key: FLINK-14697 URL: https://issues.apache.org/jira/browse/FLINK-14697 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.8.0 Environment: Flink version : 1.8.0 Yarn cluster mode 8 nodes 16 yarn containers Reporter: dmgkeke I am currently running a flink streaming application. It generally works well. But I have one issue. Intermittently connection timeout occurs on netty during a checkpoint. And restarting app by failover strategy All nodes are not busy, Network traffic is also normal. But I can't find a solution for this situation. Please let me know how to solve the problem. I have written the log trace below. 2019-11-09 13:59:35,426 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1412 @ 1573275575348 for job b83d95ff31d96c081f28b4d31b00c000. 2019-11-09 14:00:23,460 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - session-window (130/512) (bfeb017b6a81a0b1c2951a39141e6d9d) switched from RUNNING to FAILED. org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: 연결 시간 초과 (connection to ':') at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:165) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:87) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1401) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:953) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.handleReadException(AbstractNioByteChannel.java:125) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:174) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: 연결 시간 초과 at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288) at
[jira] [Commented] (FLINK-14693) python tox checks fails on travis
[ https://issues.apache.org/jira/browse/FLINK-14693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16971309#comment-16971309 ] Dian Fu commented on FLINK-14693: - Great thanks for reporting this issue. We have also found this issue at the last weekend and it seems that it's the problem of the tox. We have found some solutions (but not pretty sure) and will discuss with the tox community and then fix it. > python tox checks fails on travis > - > > Key: FLINK-14693 > URL: https://issues.apache.org/jira/browse/FLINK-14693 > Project: Flink > Issue Type: Improvement > Components: API / Python >Reporter: Kurt Young >Priority: Major > > ImportError: cannot import name 'ensure_is_path' from > 'importlib_metadata._compat' > (/home/travis/build/apache/flink/flink-python/dev/.conda/lib/python3.7/site-packages/importlib_metadata/_compat.py) > tox checks... [FAILED] > see: [https://api.travis-ci.org/v3/job/609614353/log.txt] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #10145: [FLINK-14673][hive] Shouldn't expect HMS client to throw NoSuchObject…
flinkbot edited a comment on issue #10145: [FLINK-14673][hive] Shouldn't expect HMS client to throw NoSuchObject… URL: https://github.com/apache/flink/pull/10145#issuecomment-552287128 ## CI report: * fa9222c68fa3dd623180c52f5b769ed7e733676e : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/135869245) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10112: [FLINK-14640] Change Type of Field currentExecutions from ConcurrentHashMap to HashMap
flinkbot edited a comment on issue #10112: [FLINK-14640] Change Type of Field currentExecutions from ConcurrentHashMap to HashMap URL: https://github.com/apache/flink/pull/10112#issuecomment-550964937 ## CI report: * 13118777453ff58976b17b9aa1b7a4590ce86d5c : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/135400128) * 3e4caf67766a23a2d53c4fef790fa95d5ba40914 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/135865214) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] TisonKun commented on issue #934: Framesize fix
TisonKun commented on issue #934: Framesize fix URL: https://github.com/apache/flink/pull/934#issuecomment-552290311 @thorntree https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#akka-framesize This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] openinx commented on a change in pull request #10106: [FLINK-11463][e2e] Design the e2e java framework so that at least the Kafka streaming tests can run on it
openinx commented on a change in pull request #10106: [FLINK-11463][e2e] Design the e2e java framework so that at least the Kafka streaming tests can run on it URL: https://github.com/apache/flink/pull/10106#discussion_r344554821 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -123,20 +126,28 @@ public void afterTestSuccess() { @Override public void afterTestFailure() { - logBackupDir.ifPresent(backupLocation -> { + if (logBackupDir != null) { final UUID id = UUID.randomUUID(); - LOG.info("Backing up logs to {}/{}.", backupLocation, id); + LOG.info("Backing up logs to {}/{}.", logBackupDir, id); try { - Files.createDirectories(backupLocation); - FileUtils.copyDirectory(log.toFile(), backupLocation.resolve(id.toString()).toFile()); + Files.createDirectories(logBackupDir); + FileUtils.copyDirectory(log.toFile(), logBackupDir.resolve(id.toString()).toFile()); } catch (IOException e) { LOG.warn("An error occurred while backing up logs.", e); } - }); - + } afterTestSuccess(); } + /** +* Read the value of `rest.port` part in FLINK_DIST_DIR/conf/flink-conf.yaml. +* +* @return the rest port which standalone Flink cluster will listen. +*/ + public int getRestPort() { + return defaultConfig.getInteger("rest.port", 8081); Review comment: Fine, let me revert to use 8081 firstly. Will file a separate issue to improve the port management. Thanks much. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #10145: [FLINK-14673][hive] Shouldn't expect HMS client to throw NoSuchObject…
flinkbot commented on issue #10145: [FLINK-14673][hive] Shouldn't expect HMS client to throw NoSuchObject… URL: https://github.com/apache/flink/pull/10145#issuecomment-552287128 ## CI report: * fa9222c68fa3dd623180c52f5b769ed7e733676e : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344551534 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest.java ## @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.backpressure; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.messages.TaskBackPressureResponse; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link BackPressureRequestCoordinator}. + */ +public class BackPressureRequestCoordinatorTest extends TestLogger { + + private static final long requestTimeout = 1; + private static final double backPressureRatio = 0.5; + + private static ScheduledExecutorService executorService; + private BackPressureRequestCoordinator coordinator; + + @BeforeClass + public static void setUp() throws Exception { + executorService = new ScheduledThreadPoolExecutor(1); + } + + @AfterClass + public static void tearDown() throws Exception { + if (executorService != null) { + executorService.shutdown(); + } + } + + @Before + public void initCoordinator() throws Exception { + coordinator = new BackPressureRequestCoordinator(executorService, requestTimeout); + } + + @After + public void shutdownCoordinator() throws Exception { + if (coordinator != null) { + coordinator.shutDown(); + } + } + + /** Tests simple request of task back pressure stats. */ + @Test(timeout = 1L) + public void testSuccessfulBackPressureRequest() throws Exception { + ExecutionVertex[] vertices = createExecutionVertices(ExecutionState.RUNNING, CompletionType.SUCCESSFULLY); + + CompletableFuture requestFuture = coordinator.triggerBackPressureRequest(vertices); + BackPressureStats backPressureStats = requestFuture.get(); + + // verify the request result + assertEquals(0, backPressureStats.getRequestId()); + Map backPressureRatios = backPressureStats.getBackPressureRatios(); + for (ExecutionVertex executionVertex : vertices) { + ExecutionAttemptID executionId = executionVertex.getCurrentExecutionAttempt().getAttemptId(); + assertEquals(backPressureRatio, backPressureRatios.get(executionId), 0.0); + } + + // verify no more pending request + assertEquals(0, coordinator.getNumberOfPendingRequests()); + } + + /** Tests back pressure request of non-running tasks
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344550447 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest.java ## @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.backpressure; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.messages.TaskBackPressureResponse; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link BackPressureRequestCoordinator}. + */ +public class BackPressureRequestCoordinatorTest extends TestLogger { + + private static final long requestTimeout = 1; + private static final double backPressureRatio = 0.5; + + private static ScheduledExecutorService executorService; + private BackPressureRequestCoordinator coordinator; + + @BeforeClass + public static void setUp() throws Exception { + executorService = new ScheduledThreadPoolExecutor(1); + } + + @AfterClass + public static void tearDown() throws Exception { + if (executorService != null) { + executorService.shutdown(); + } + } + + @Before + public void initCoordinator() throws Exception { + coordinator = new BackPressureRequestCoordinator(executorService, requestTimeout); + } + + @After + public void shutdownCoordinator() throws Exception { + if (coordinator != null) { + coordinator.shutDown(); + } + } + + /** Tests simple request of task back pressure stats. */ + @Test(timeout = 1L) + public void testSuccessfulBackPressureRequest() throws Exception { + ExecutionVertex[] vertices = createExecutionVertices(ExecutionState.RUNNING, CompletionType.SUCCESSFULLY); + + CompletableFuture requestFuture = coordinator.triggerBackPressureRequest(vertices); + BackPressureStats backPressureStats = requestFuture.get(); + + // verify the request result + assertEquals(0, backPressureStats.getRequestId()); + Map backPressureRatios = backPressureStats.getBackPressureRatios(); + for (ExecutionVertex executionVertex : vertices) { + ExecutionAttemptID executionId = executionVertex.getCurrentExecutionAttempt().getAttemptId(); + assertEquals(backPressureRatio, backPressureRatios.get(executionId), 0.0); + } + + // verify no more pending request + assertEquals(0, coordinator.getNumberOfPendingRequests()); + } + + /** Tests back pressure request of non-running tasks
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344550465 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest.java ## @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.backpressure; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.messages.TaskBackPressureResponse; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link BackPressureRequestCoordinator}. + */ +public class BackPressureRequestCoordinatorTest extends TestLogger { + + private static final long requestTimeout = 1; + private static final double backPressureRatio = 0.5; + + private static ScheduledExecutorService executorService; + private BackPressureRequestCoordinator coordinator; + + @BeforeClass + public static void setUp() throws Exception { + executorService = new ScheduledThreadPoolExecutor(1); + } + + @AfterClass + public static void tearDown() throws Exception { + if (executorService != null) { + executorService.shutdown(); + } + } + + @Before + public void initCoordinator() throws Exception { + coordinator = new BackPressureRequestCoordinator(executorService, requestTimeout); + } + + @After + public void shutdownCoordinator() throws Exception { + if (coordinator != null) { + coordinator.shutDown(); + } + } + + /** Tests simple request of task back pressure stats. */ + @Test(timeout = 1L) + public void testSuccessfulBackPressureRequest() throws Exception { + ExecutionVertex[] vertices = createExecutionVertices(ExecutionState.RUNNING, CompletionType.SUCCESSFULLY); + + CompletableFuture requestFuture = coordinator.triggerBackPressureRequest(vertices); + BackPressureStats backPressureStats = requestFuture.get(); + + // verify the request result + assertEquals(0, backPressureStats.getRequestId()); + Map backPressureRatios = backPressureStats.getBackPressureRatios(); + for (ExecutionVertex executionVertex : vertices) { + ExecutionAttemptID executionId = executionVertex.getCurrentExecutionAttempt().getAttemptId(); + assertEquals(backPressureRatio, backPressureRatios.get(executionId), 0.0); + } + + // verify no more pending request + assertEquals(0, coordinator.getNumberOfPendingRequests()); + } + + /** Tests back pressure request of non-running tasks
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344550166 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest.java ## @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.backpressure; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.messages.TaskBackPressureResponse; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link BackPressureRequestCoordinator}. + */ +public class BackPressureRequestCoordinatorTest extends TestLogger { + + private static final long requestTimeout = 1; + private static final double backPressureRatio = 0.5; + + private static ScheduledExecutorService executorService; + private BackPressureRequestCoordinator coordinator; + + @BeforeClass + public static void setUp() throws Exception { + executorService = new ScheduledThreadPoolExecutor(1); + } + + @AfterClass + public static void tearDown() throws Exception { + if (executorService != null) { + executorService.shutdown(); + } + } + + @Before + public void initCoordinator() throws Exception { + coordinator = new BackPressureRequestCoordinator(executorService, requestTimeout); + } + + @After + public void shutdownCoordinator() throws Exception { + if (coordinator != null) { + coordinator.shutDown(); + } + } + + /** Tests simple request of task back pressure stats. */ + @Test(timeout = 1L) Review comment: If it is possible to make timeout as class level, no need to tag it for every test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #10145: [FLINK-14673][hive] Shouldn't expect HMS client to throw NoSuchObject…
flinkbot commented on issue #10145: [FLINK-14673][hive] Shouldn't expect HMS client to throw NoSuchObject… URL: https://github.com/apache/flink/pull/10145#issuecomment-552283313 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit fa9222c68fa3dd623180c52f5b769ed7e733676e (Mon Nov 11 03:53:55 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lirui-apache commented on issue #10145: [FLINK-14673][hive] Shouldn't expect HMS client to throw NoSuchObject…
lirui-apache commented on issue #10145: [FLINK-14673][hive] Shouldn't expect HMS client to throw NoSuchObject… URL: https://github.com/apache/flink/pull/10145#issuecomment-552283323 @xuefuz @bowenli86 This is to port the fix of FLINK-14673 to 1.9.x. Please take a look. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner
docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner URL: https://github.com/apache/flink/pull/10105#discussion_r344549947 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java ## @@ -191,6 +191,22 @@ public Decimal getDecimal(int pos, int precision, int scale) { return Decimal.readDecimalFieldFromSegments(segments, offset, offsetAndSize, precision, scale); } + @Override + public SqlTimestamp getTimestamp(int pos, int precision) { + assertIndexIsValid(pos); + + if (SqlTimestamp.isCompact(precision)) { + return SqlTimestamp.fromEpochMillis(segments[0].getLong(getElementOffset(pos, 8))); Review comment: Good catch, it should be `SegmentsUtil.getLong(segments, getElementOffset(pos, 8)` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner
docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner URL: https://github.com/apache/flink/pull/10105#discussion_r344549642 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala ## @@ -37,6 +37,17 @@ import java.util.{Locale, TimeZone} class TemporalTypesTest extends ExpressionTestBase { Review comment: So divide it to TimestampTypeTest/DateTypeTest/TimeTypeTest ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lirui-apache opened a new pull request #10145: [FLINK-14673][hive] Shouldn't expect HMS client to throw NoSuchObject…
lirui-apache opened a new pull request #10145: [FLINK-14673][hive] Shouldn't expect HMS client to throw NoSuchObject… URL: https://github.com/apache/flink/pull/10145 …Exception for non-existing function ## What is the purpose of the change Always to check MetaException when getting function with HMS client. ## Brief change log - Removed shim method getFunction. - Moved the logic to HiveMetastoreClientWrapper. ## Verifying this change Existing test cases. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? NA This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner
docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner URL: https://github.com/apache/flink/pull/10105#discussion_r344549396 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala ## @@ -335,7 +336,7 @@ class RexNodeToExpressionConverter( case TIMESTAMP_WITHOUT_TIME_ZONE => val v = literal.getValueAs(classOf[java.lang.Long]) Review comment: Good catch. It should be TimestampString and preserve the nanosecond precision. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10112: [FLINK-14640] Change Type of Field currentExecutions from ConcurrentHashMap to HashMap
flinkbot edited a comment on issue #10112: [FLINK-14640] Change Type of Field currentExecutions from ConcurrentHashMap to HashMap URL: https://github.com/apache/flink/pull/10112#issuecomment-550964937 ## CI report: * 13118777453ff58976b17b9aa1b7a4590ce86d5c : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/135400128) * 3e4caf67766a23a2d53c4fef790fa95d5ba40914 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/135865214) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] TisonKun commented on a change in pull request #10113: [FLINK-13749][client] Make PackagedProgram respect classloading policy
TisonKun commented on a change in pull request #10113: [FLINK-13749][client] Make PackagedProgram respect classloading policy URL: https://github.com/apache/flink/pull/10113#discussion_r344548623 ## File path: flink-tests/src/test/java/org/apache/flink/test/classloading/jar/ClassLoadingPolicyProgram.java ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.classloading.jar; + +import java.io.File; +import java.net.URL; + +/** + * A simple program that verifies the classloading policy by ensuring the resource loaded is under the specified + * directory. + **/ +public class ClassLoadingPolicyProgram { + + public static void main(String[] args) throws Exception { + if (args.length < 2) { + throw new IllegalArgumentException("Missing parameters"); + } + String resourceName = args[0]; + String expectedResourceDir = args[1]; + URL url = Thread.currentThread().getContextClassLoader().getResource(resourceName); + if (url == null) { Review comment: `checkNotNull` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344548191 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest.java ## @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.backpressure; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.messages.TaskBackPressureResponse; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link BackPressureRequestCoordinator}. + */ +public class BackPressureRequestCoordinatorTest extends TestLogger { + + private static final long requestTimeout = 1; + private static final double backPressureRatio = 0.5; + + private static ScheduledExecutorService executorService; + private BackPressureRequestCoordinator coordinator; + + @BeforeClass + public static void setUp() throws Exception { + executorService = new ScheduledThreadPoolExecutor(1); + } + + @AfterClass + public static void tearDown() throws Exception { + if (executorService != null) { + executorService.shutdown(); + } + } + + @Before + public void initCoordinator() throws Exception { + coordinator = new BackPressureRequestCoordinator(executorService, requestTimeout); + } + + @After + public void shutdownCoordinator() throws Exception { + if (coordinator != null) { + coordinator.shutDown(); + } + } + + /** Tests simple request of task back pressure stats. */ + @Test(timeout = 1L) + public void testSuccessfulBackPressureRequest() throws Exception { + ExecutionVertex[] vertices = createExecutionVertices(ExecutionState.RUNNING, CompletionType.SUCCESSFULLY); + + CompletableFuture requestFuture = coordinator.triggerBackPressureRequest(vertices); + BackPressureStats backPressureStats = requestFuture.get(); + + // verify the request result + assertEquals(0, backPressureStats.getRequestId()); + Map backPressureRatios = backPressureStats.getBackPressureRatios(); + for (ExecutionVertex executionVertex : vertices) { + ExecutionAttemptID executionId = executionVertex.getCurrentExecutionAttempt().getAttemptId(); + assertEquals(backPressureRatio, backPressureRatios.get(executionId), 0.0); + } + + // verify no more pending request + assertEquals(0, coordinator.getNumberOfPendingRequests()); + } + + /** Tests back pressure request of non-running tasks
[GitHub] [flink] docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner
docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner URL: https://github.com/apache/flink/pull/10105#discussion_r344548144 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala ## @@ -2184,7 +2236,20 @@ object ScalarOperatorGens { case TIME_WITHOUT_TIME_ZONE => s"${qualifyMethod(BuiltInMethods.UNIX_TIME_TO_STRING)}($operandTerm)" case TIMESTAMP_WITHOUT_TIME_ZONE => // including rowtime indicator -s"${qualifyMethod(BuiltInMethods.TIMESTAMP_TO_STRING)}($operandTerm, 3)" +// casting TimestampType to VARCHAR, if precision <= 3, keep the string representation Review comment: Just keep consistent with the original design. Or the behavior is changed from the users' side. Should we change this in this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344548105 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest.java ## @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.backpressure; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.messages.TaskBackPressureResponse; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link BackPressureRequestCoordinator}. + */ +public class BackPressureRequestCoordinatorTest extends TestLogger { + + private static final long requestTimeout = 1; + private static final double backPressureRatio = 0.5; + + private static ScheduledExecutorService executorService; + private BackPressureRequestCoordinator coordinator; + + @BeforeClass + public static void setUp() throws Exception { + executorService = new ScheduledThreadPoolExecutor(1); + } + + @AfterClass + public static void tearDown() throws Exception { + if (executorService != null) { + executorService.shutdown(); + } + } + + @Before + public void initCoordinator() throws Exception { + coordinator = new BackPressureRequestCoordinator(executorService, requestTimeout); + } + + @After + public void shutdownCoordinator() throws Exception { + if (coordinator != null) { + coordinator.shutDown(); + } + } + + /** Tests simple request of task back pressure stats. */ + @Test(timeout = 1L) + public void testSuccessfulBackPressureRequest() throws Exception { + ExecutionVertex[] vertices = createExecutionVertices(ExecutionState.RUNNING, CompletionType.SUCCESSFULLY); + + CompletableFuture requestFuture = coordinator.triggerBackPressureRequest(vertices); + BackPressureStats backPressureStats = requestFuture.get(); + + // verify the request result + assertEquals(0, backPressureStats.getRequestId()); + Map backPressureRatios = backPressureStats.getBackPressureRatios(); + for (ExecutionVertex executionVertex : vertices) { + ExecutionAttemptID executionId = executionVertex.getCurrentExecutionAttempt().getAttemptId(); + assertEquals(backPressureRatio, backPressureRatios.get(executionId), 0.0); + } + + // verify no more pending request + assertEquals(0, coordinator.getNumberOfPendingRequests()); + } + + /** Tests back pressure request of non-running tasks
[GitHub] [flink] docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner
docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner URL: https://github.com/apache/flink/pull/10105#discussion_r344547747 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala ## @@ -174,8 +174,8 @@ object ScalarOperatorGens { (l, r) => s"$l $op ((int) ($r / ${MILLIS_PER_DAY}L))" } case TIMESTAMP_WITHOUT_TIME_ZONE => -generateOperatorIfNotNull(ctx, new TimestampType(), left, right) { - (l, r) => s"($l * ${MILLIS_PER_DAY}L) $op $r" +generateOperatorIfNotNull(ctx, new TimestampType(3), left, right) { Review comment: The original `new TimestampType()` just returns `TimestampType(3)`. And just keep consistent with the original implementation. I will confirm the right behavior with Jack and Danny Chan and Fix it in the next PR [FLINK-14696](https://issues.apache.org/jira/browse/FLINK-14696) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner
docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner URL: https://github.com/apache/flink/pull/10105#discussion_r344547802 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala ## @@ -1073,51 +1124,51 @@ object ScalarOperatorGens { (INTEGER, TIMESTAMP_WITHOUT_TIME_ZONE) | (BIGINT, TIMESTAMP_WITHOUT_TIME_ZONE) => generateUnaryOperatorIfNotNull(ctx, targetType, operand) { -operandTerm => s"(((long) $operandTerm) * 1000)" +operandTerm => s"$SQL_TIMESTAMP.fromEpochMillis(((long) $operandTerm) * 1000)" } // Float -> Timestamp // Double -> Timestamp case (FLOAT, TIMESTAMP_WITHOUT_TIME_ZONE) | (DOUBLE, TIMESTAMP_WITHOUT_TIME_ZONE) => generateUnaryOperatorIfNotNull(ctx, targetType, operand) { -operandTerm => s"((long) ($operandTerm * 1000))" +operandTerm => s"$SQL_TIMESTAMP.fromEpochMillis((long) ($operandTerm * 1000))" } // Timestamp -> Tinyint case (TIMESTAMP_WITHOUT_TIME_ZONE, TINYINT) => generateUnaryOperatorIfNotNull(ctx, targetType, operand) { -operandTerm => s"((byte) ($operandTerm / 1000))" +operandTerm => s"((byte) ($operandTerm.getMillisecond() / 1000))" } // Timestamp -> Smallint case (TIMESTAMP_WITHOUT_TIME_ZONE, SMALLINT) => generateUnaryOperatorIfNotNull(ctx, targetType, operand) { -operandTerm => s"((short) ($operandTerm / 1000))" +operandTerm => s"((short) ($operandTerm.getMillisecond() / 1000))" } // Timestamp -> Int case (TIMESTAMP_WITHOUT_TIME_ZONE, INTEGER) => generateUnaryOperatorIfNotNull(ctx, targetType, operand) { -operandTerm => s"((int) ($operandTerm / 1000))" +operandTerm => s"((int) ($operandTerm.getMillisecond() / 1000))" } // Timestamp -> BigInt case (TIMESTAMP_WITHOUT_TIME_ZONE, BIGINT) => generateUnaryOperatorIfNotNull(ctx, targetType, operand) { -operandTerm => s"((long) ($operandTerm / 1000))" +operandTerm => s"((long) ($operandTerm.getMillisecond() / 1000))" Review comment: ditto This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-14696) Support precision of TimestampType in built-in SQL functions and operators
Zhenghua Gao created FLINK-14696: Summary: Support precision of TimestampType in built-in SQL functions and operators Key: FLINK-14696 URL: https://issues.apache.org/jira/browse/FLINK-14696 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Zhenghua Gao Many built-in SQL functions and operators use long as internal representation of Timestamp type and only support millisecond precision. This ticket will check fix it and let them support nanosecond precision. The related SQL functions and operators are: (To Be Confirmed) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344546978 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest.java ## @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.backpressure; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.messages.TaskBackPressureResponse; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link BackPressureRequestCoordinator}. + */ +public class BackPressureRequestCoordinatorTest extends TestLogger { + + private static final long requestTimeout = 1; + private static final double backPressureRatio = 0.5; + + private static ScheduledExecutorService executorService; + private BackPressureRequestCoordinator coordinator; + + @BeforeClass + public static void setUp() throws Exception { + executorService = new ScheduledThreadPoolExecutor(1); + } + + @AfterClass + public static void tearDown() throws Exception { + if (executorService != null) { + executorService.shutdown(); + } + } + + @Before + public void initCoordinator() throws Exception { + coordinator = new BackPressureRequestCoordinator(executorService, requestTimeout); + } + + @After + public void shutdownCoordinator() throws Exception { + if (coordinator != null) { + coordinator.shutDown(); + } + } + + /** Tests simple request of task back pressure stats. */ + @Test(timeout = 1L) + public void testSuccessfulBackPressureRequest() throws Exception { + ExecutionVertex[] vertices = createExecutionVertices(ExecutionState.RUNNING, CompletionType.SUCCESSFULLY); + + CompletableFuture requestFuture = coordinator.triggerBackPressureRequest(vertices); + BackPressureStats backPressureStats = requestFuture.get(); + + // verify the request result + assertEquals(0, backPressureStats.getRequestId()); + Map backPressureRatios = backPressureStats.getBackPressureRatios(); + for (ExecutionVertex executionVertex : vertices) { + ExecutionAttemptID executionId = executionVertex.getCurrentExecutionAttempt().getAttemptId(); + assertEquals(backPressureRatio, backPressureRatios.get(executionId), 0.0); + } + + // verify no more pending request + assertEquals(0, coordinator.getNumberOfPendingRequests()); + } + + /** Tests back pressure request of non-running tasks
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344546570 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest.java ## @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.backpressure; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.messages.TaskBackPressureResponse; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link BackPressureRequestCoordinator}. + */ +public class BackPressureRequestCoordinatorTest extends TestLogger { + + private static final long requestTimeout = 1; + private static final double backPressureRatio = 0.5; + + private static ScheduledExecutorService executorService; + private BackPressureRequestCoordinator coordinator; + + @BeforeClass + public static void setUp() throws Exception { + executorService = new ScheduledThreadPoolExecutor(1); + } + + @AfterClass + public static void tearDown() throws Exception { + if (executorService != null) { + executorService.shutdown(); + } + } + + @Before + public void initCoordinator() throws Exception { + coordinator = new BackPressureRequestCoordinator(executorService, requestTimeout); + } + + @After + public void shutdownCoordinator() throws Exception { + if (coordinator != null) { + coordinator.shutDown(); + } + } + + /** Tests simple request of task back pressure stats. */ + @Test(timeout = 1L) + public void testSuccessfulBackPressureRequest() throws Exception { + ExecutionVertex[] vertices = createExecutionVertices(ExecutionState.RUNNING, CompletionType.SUCCESSFULLY); + + CompletableFuture requestFuture = coordinator.triggerBackPressureRequest(vertices); + BackPressureStats backPressureStats = requestFuture.get(); + + // verify the request result + assertEquals(0, backPressureStats.getRequestId()); + Map backPressureRatios = backPressureStats.getBackPressureRatios(); + for (ExecutionVertex executionVertex : vertices) { + ExecutionAttemptID executionId = executionVertex.getCurrentExecutionAttempt().getAttemptId(); + assertEquals(backPressureRatio, backPressureRatios.get(executionId), 0.0); + } + + // verify no more pending request + assertEquals(0, coordinator.getNumberOfPendingRequests()); Review comment: Migrate this assert into @Aftter
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344546173 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest.java ## @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.backpressure; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.messages.TaskBackPressureResponse; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link BackPressureRequestCoordinator}. + */ +public class BackPressureRequestCoordinatorTest extends TestLogger { + + private static final long requestTimeout = 1; + private static final double backPressureRatio = 0.5; + + private static ScheduledExecutorService executorService; + private BackPressureRequestCoordinator coordinator; + + @BeforeClass + public static void setUp() throws Exception { + executorService = new ScheduledThreadPoolExecutor(1); + } + + @AfterClass + public static void tearDown() throws Exception { + if (executorService != null) { + executorService.shutdown(); + } + } + + @Before + public void initCoordinator() throws Exception { + coordinator = new BackPressureRequestCoordinator(executorService, requestTimeout); + } + + @After + public void shutdownCoordinator() throws Exception { + if (coordinator != null) { + coordinator.shutDown(); + } + } + + /** Tests simple request of task back pressure stats. */ + @Test(timeout = 1L) + public void testSuccessfulBackPressureRequest() throws Exception { + ExecutionVertex[] vertices = createExecutionVertices(ExecutionState.RUNNING, CompletionType.SUCCESSFULLY); + + CompletableFuture requestFuture = coordinator.triggerBackPressureRequest(vertices); + BackPressureStats backPressureStats = requestFuture.get(); + + // verify the request result + assertEquals(0, backPressureStats.getRequestId()); + Map backPressureRatios = backPressureStats.getBackPressureRatios(); + for (ExecutionVertex executionVertex : vertices) { + ExecutionAttemptID executionId = executionVertex.getCurrentExecutionAttempt().getAttemptId(); + assertEquals(backPressureRatio, backPressureRatios.get(executionId), 0.0); + } + + // verify no more pending request + assertEquals(0, coordinator.getNumberOfPendingRequests()); + } + + /** Tests back pressure request of non-running tasks
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344546013 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest.java ## @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.backpressure; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.messages.TaskBackPressureResponse; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link BackPressureRequestCoordinator}. + */ +public class BackPressureRequestCoordinatorTest extends TestLogger { + + private static final long requestTimeout = 1; + private static final double backPressureRatio = 0.5; + + private static ScheduledExecutorService executorService; + private BackPressureRequestCoordinator coordinator; + + @BeforeClass + public static void setUp() throws Exception { + executorService = new ScheduledThreadPoolExecutor(1); + } + + @AfterClass + public static void tearDown() throws Exception { + if (executorService != null) { + executorService.shutdown(); + } + } + + @Before + public void initCoordinator() throws Exception { + coordinator = new BackPressureRequestCoordinator(executorService, requestTimeout); + } + + @After + public void shutdownCoordinator() throws Exception { + if (coordinator != null) { + coordinator.shutDown(); + } + } + + /** Tests simple request of task back pressure stats. */ + @Test(timeout = 1L) + public void testSuccessfulBackPressureRequest() throws Exception { + ExecutionVertex[] vertices = createExecutionVertices(ExecutionState.RUNNING, CompletionType.SUCCESSFULLY); + + CompletableFuture requestFuture = coordinator.triggerBackPressureRequest(vertices); + BackPressureStats backPressureStats = requestFuture.get(); + + // verify the request result + assertEquals(0, backPressureStats.getRequestId()); + Map backPressureRatios = backPressureStats.getBackPressureRatios(); + for (ExecutionVertex executionVertex : vertices) { + ExecutionAttemptID executionId = executionVertex.getCurrentExecutionAttempt().getAttemptId(); + assertEquals(backPressureRatio, backPressureRatios.get(executionId), 0.0); + } + + // verify no more pending request + assertEquals(0, coordinator.getNumberOfPendingRequests()); + } + + /** Tests back pressure request of non-running tasks
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344545951 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest.java ## @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.backpressure; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.messages.TaskBackPressureResponse; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link BackPressureRequestCoordinator}. + */ +public class BackPressureRequestCoordinatorTest extends TestLogger { + + private static final long requestTimeout = 1; + private static final double backPressureRatio = 0.5; + + private static ScheduledExecutorService executorService; + private BackPressureRequestCoordinator coordinator; + + @BeforeClass + public static void setUp() throws Exception { + executorService = new ScheduledThreadPoolExecutor(1); + } + + @AfterClass + public static void tearDown() throws Exception { + if (executorService != null) { + executorService.shutdown(); + } + } + + @Before + public void initCoordinator() throws Exception { + coordinator = new BackPressureRequestCoordinator(executorService, requestTimeout); + } + + @After + public void shutdownCoordinator() throws Exception { + if (coordinator != null) { + coordinator.shutDown(); + } + } + + /** Tests simple request of task back pressure stats. */ + @Test(timeout = 1L) + public void testSuccessfulBackPressureRequest() throws Exception { + ExecutionVertex[] vertices = createExecutionVertices(ExecutionState.RUNNING, CompletionType.SUCCESSFULLY); + + CompletableFuture requestFuture = coordinator.triggerBackPressureRequest(vertices); + BackPressureStats backPressureStats = requestFuture.get(); + + // verify the request result + assertEquals(0, backPressureStats.getRequestId()); + Map backPressureRatios = backPressureStats.getBackPressureRatios(); + for (ExecutionVertex executionVertex : vertices) { + ExecutionAttemptID executionId = executionVertex.getCurrentExecutionAttempt().getAttemptId(); + assertEquals(backPressureRatio, backPressureRatios.get(executionId), 0.0); + } + + // verify no more pending request + assertEquals(0, coordinator.getNumberOfPendingRequests()); + } + + /** Tests back pressure request of non-running tasks
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344545905 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest.java ## @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.backpressure; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.messages.TaskBackPressureResponse; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link BackPressureRequestCoordinator}. + */ +public class BackPressureRequestCoordinatorTest extends TestLogger { + + private static final long requestTimeout = 1; + private static final double backPressureRatio = 0.5; + + private static ScheduledExecutorService executorService; + private BackPressureRequestCoordinator coordinator; + + @BeforeClass + public static void setUp() throws Exception { + executorService = new ScheduledThreadPoolExecutor(1); + } + + @AfterClass + public static void tearDown() throws Exception { + if (executorService != null) { + executorService.shutdown(); + } + } + + @Before + public void initCoordinator() throws Exception { + coordinator = new BackPressureRequestCoordinator(executorService, requestTimeout); + } + + @After + public void shutdownCoordinator() throws Exception { + if (coordinator != null) { + coordinator.shutDown(); + } + } + + /** Tests simple request of task back pressure stats. */ + @Test(timeout = 1L) + public void testSuccessfulBackPressureRequest() throws Exception { + ExecutionVertex[] vertices = createExecutionVertices(ExecutionState.RUNNING, CompletionType.SUCCESSFULLY); + + CompletableFuture requestFuture = coordinator.triggerBackPressureRequest(vertices); + BackPressureStats backPressureStats = requestFuture.get(); + + // verify the request result + assertEquals(0, backPressureStats.getRequestId()); + Map backPressureRatios = backPressureStats.getBackPressureRatios(); + for (ExecutionVertex executionVertex : vertices) { + ExecutionAttemptID executionId = executionVertex.getCurrentExecutionAttempt().getAttemptId(); + assertEquals(backPressureRatio, backPressureRatios.get(executionId), 0.0); + } + + // verify no more pending request + assertEquals(0, coordinator.getNumberOfPendingRequests()); + } + + /** Tests back pressure request of non-running tasks
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344545581 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest.java ## @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.backpressure; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.messages.TaskBackPressureResponse; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link BackPressureRequestCoordinator}. + */ +public class BackPressureRequestCoordinatorTest extends TestLogger { + + private static final long requestTimeout = 1; + private static final double backPressureRatio = 0.5; + + private static ScheduledExecutorService executorService; + private BackPressureRequestCoordinator coordinator; + + @BeforeClass + public static void setUp() throws Exception { + executorService = new ScheduledThreadPoolExecutor(1); + } + + @AfterClass + public static void tearDown() throws Exception { + if (executorService != null) { + executorService.shutdown(); + } + } + + @Before + public void initCoordinator() throws Exception { + coordinator = new BackPressureRequestCoordinator(executorService, requestTimeout); + } + + @After + public void shutdownCoordinator() throws Exception { + if (coordinator != null) { + coordinator.shutDown(); + } + } + + /** Tests simple request of task back pressure stats. */ Review comment: Refactor the format of javadoc for all the methods. Should be /** * */ Also this description is not very accurate, what is "simple request" mean. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344545581 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest.java ## @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.backpressure; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.messages.TaskBackPressureResponse; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link BackPressureRequestCoordinator}. + */ +public class BackPressureRequestCoordinatorTest extends TestLogger { + + private static final long requestTimeout = 1; + private static final double backPressureRatio = 0.5; + + private static ScheduledExecutorService executorService; + private BackPressureRequestCoordinator coordinator; + + @BeforeClass + public static void setUp() throws Exception { + executorService = new ScheduledThreadPoolExecutor(1); + } + + @AfterClass + public static void tearDown() throws Exception { + if (executorService != null) { + executorService.shutdown(); + } + } + + @Before + public void initCoordinator() throws Exception { + coordinator = new BackPressureRequestCoordinator(executorService, requestTimeout); + } + + @After + public void shutdownCoordinator() throws Exception { + if (coordinator != null) { + coordinator.shutDown(); + } + } + + /** Tests simple request of task back pressure stats. */ Review comment: Refactor the format of javadoc for all the methods. Should be /** * */ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10112: [FLINK-14640] Change Type of Field currentExecutions from ConcurrentHashMap to HashMap
flinkbot edited a comment on issue #10112: [FLINK-14640] Change Type of Field currentExecutions from ConcurrentHashMap to HashMap URL: https://github.com/apache/flink/pull/10112#issuecomment-550964937 ## CI report: * 13118777453ff58976b17b9aa1b7a4590ce86d5c : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/135400128) * 3e4caf67766a23a2d53c4fef790fa95d5ba40914 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] qiuxiafei commented on a change in pull request #9373: [FLINK-13596][ml] Add two utils for Table transformations
qiuxiafei commented on a change in pull request #9373: [FLINK-13596][ml] Add two utils for Table transformations URL: https://github.com/apache/flink/pull/9373#discussion_r344544335 ## File path: flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/utils/DataSetConversionUtil.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.ml.common.utils; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.operators.SingleInputUdfOperator; +import org.apache.flink.api.java.operators.TwoInputUdfOperator; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.ml.common.MLEnvironment; +import org.apache.flink.ml.common.MLEnvironmentFactory; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +/** + * Provide functions of conversions between DataSet and Table. + */ +public class DataSetConversionUtil { + /** +* Convert the given Table to {@link DataSet}<{@link Row}>. +* +* @param sessionId the sessionId of {@link MLEnvironmentFactory} +* @param table the Table to convert. +* @return the converted DataSet. +*/ + public static DataSet fromTable(Long sessionId, Table table) { + return MLEnvironmentFactory + .get(sessionId) + .getBatchTableEnvironment() + .toDataSet(table, Row.class); + } + + /** +* Convert the given DataSet into a Table with specified TableSchema. +* +* @param sessionId the sessionId of {@link MLEnvironmentFactory} +* @param data the DataSet to convert. +* @param schema the specified TableSchema. +* @return the converted Table. +*/ + public static Table toTable(Long sessionId, DataSet data, TableSchema schema) { + return toTable(sessionId, data, schema.getFieldNames(), schema.getFieldTypes()); + } + + /** +* Convert the given DataSet into a Table with specified colNames and colTypes. +* +* @param sessionId sessionId the sessionId of {@link MLEnvironmentFactory}. +* @param data the DataSet to convert. +* @param colNames the specified colNames. +* @param colTypes the specified colTypes. This variable is used only when the +* DataSet is produced by a function and Flink cannot determine +* automatically what the produced type is. +* @return the converted Table. +*/ + public static Table toTable(Long sessionId, DataSet data, String[] colNames, TypeInformation [] colTypes) { + return toTable(MLEnvironmentFactory.get(sessionId), data, colNames, colTypes); + } + + /** +* Convert the given DataSet into a Table with specified colNames. +* +* @param sessionId sessionId the sessionId of {@link MLEnvironmentFactory}. +* @param data the DataSet to convert. +* @param colNames the specified colNames. +* @return the converted Table. +*/ + public static Table toTable(Long sessionId, DataSet data, String[] colNames) { + return toTable(MLEnvironmentFactory.get(sessionId), data, colNames); + } + + /** +* Convert the given DataSet into a Table with specified colNames and colTypes. +* +* @param session the MLEnvironment using to convert DataSet to Table. +* @param data the DataSet to convert. +* @param colNames the specified colNames. +* @param colTypes the specified colTypes. This variable is used only when the +* DataSet is produced by a function and Flink cannot determine +* automatically what the produced type is. +* @return the converted Table. +*/ + public static Table toTable(MLEnvironment session, DataSet
[GitHub] [flink] thorntree commented on issue #934: Framesize fix
thorntree commented on issue #934: Framesize fix URL: https://github.com/apache/flink/pull/934#issuecomment-552276083 @kl0u ,hi kl0u.can you tell me how increase akka.framesize? I don't know how config akka.framesize.Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-14674) some tpc-ds query hang in scheduled stage for long time
[ https://issues.apache.org/jira/browse/FLINK-14674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16971288#comment-16971288 ] Leonard Xu commented on FLINK-14674: [~lzljs3620320] I think [~ykt836] 's analysis is right, in local machine,I set taskmanager.numberOfTaskSlots as 4, taskmanager.heap.size as 2g, parallelism.default: 2 so that TM have about 1.26g managed memory available, but operator apply "managedMemoryInMB=768" which can not fillfull in this parallelism setting. {code:java} org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: bfed28d939e924b4b66030bc70c38185) at org.apache.flink.client.ClientUtils.submitJobAndWaitForResult(ClientUtils.java:146) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:64) at org.apache.flink.table.planner.delegation.BatchExecutor.execute(BatchExecutor.java:55) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:511) at org.apache.flink.table.tpcds.TpcdsTestProgram.main(TpcdsTestProgram.java:116) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:403) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:284) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:177) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:753) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:282) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1016) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1089) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1089) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) at org.apache.flink.client.ClientUtils.submitJobAndWaitForResult(ClientUtils.java:144) ... 18 more Caused by: org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException: Could not fulfill slot request 16d054a481a09a766e4f5c452e8a24b7. Requested resource profile (ResourceProfile{cpuCores=0.0, heapMemoryInMB=0, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0, managedMemoryInMB=768}) is unfulfillable. at org.apache.flink.runtime.resourcemanager.sloat org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.setFailUnfulfillableRequest(SlotManagerImpl.java:491) at org.apache.flink.runtime.resourcemanager.ResourceManager.setFailUnfulfillableRequest(ResourceManager.java:1070) at org.apache.flink.runtime.resourcemanager.StandaloneResourceManager.lambda$startStartupPeriod$0(StandaloneResourceManager.java:116) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at
[GitHub] [flink] docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner
docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner URL: https://github.com/apache/flink/pull/10105#discussion_r344543636 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarFunctionCallGen.scala ## @@ -66,7 +69,15 @@ class ScalarFunctionCallGen(scalarFunction: ScalarFunction) extends CallGenerato boxedTypeTermForType(returnType) } val resultTerm = ctx.addReusableLocalVariable(resultTypeTerm, "result") -val evalResult = s"$functionReference.eval(${parameters.map(_.resultTerm).mkString(", ")})" +val evalResult = + if (returnType.getTypeRoot == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) { Review comment: Currently many function use Long as the internal representation of Timestamp. And the document recommend this usage, see [Types.TIMESTAMP can be represented as long](https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/udfs.html#best-practices-for-implementing-udfs). We should cover this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner
docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner URL: https://github.com/apache/flink/pull/10105#discussion_r344542341 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala ## @@ -370,8 +373,56 @@ object GenerateUtils { generateNonNullLiteral(literalType, literalValue.toString, literalValue) case TIMESTAMP_WITHOUT_TIME_ZONE => -val millis = literalValue.asInstanceOf[Long] -generateNonNullLiteral(literalType, millis + "L", millis) +def getNanoOfMillisSinceEpoch(timestampString: TimestampString): Int = { + val v = timestampString.toString() + val length = v.length + val nanoOfSeconds = length match { +case 19 | 20 => 0 +case _ => + JInteger.valueOf(v.substring(20)) * pow(10, 9 - (length - 20)).intValue() + } + nanoOfSeconds % 100 +} + +// TODO: we copied the logical of TimestampString::getMillisSinceEpoch since the copied +// DateTimeUtils.ymdToJulian is wrong. +// SEE CALCITE-1884 +def getMillisInSecond(timestampString: TimestampString): Int = { + val v = timestampString.toString() + val length = v.length + val milliOfSeconds = length match { +case 19 => 0 +case 21 => JInteger.valueOf(v.substring(20)).intValue() * 100 +case 22 => JInteger.valueOf(v.substring(20)).intValue() * 10 +case 20 | 23 | _ => JInteger.valueOf(v.substring(20, 23)).intValue() + } + milliOfSeconds +} + +def getMillisSinceEpoch(timestampString: TimestampString): Long = { + val v = timestampString.toString() + val year = JInteger.valueOf(v.substring(0, 4)) + val month = JInteger.valueOf(v.substring(5, 7)) + val day = JInteger.valueOf(v.substring(8, 10)) + val h = JInteger.valueOf(v.substring(11, 13)) + val m = JInteger.valueOf(v.substring(14, 16)) + val s = JInteger.valueOf(v.substring(17, 19)) + val ms = getMillisInSecond(timestampString) + val d = SqlDateTimeUtils.ymdToJulian(year, month, day) + d * 8640L + h * 360L + m * 6L + s * 1000L + ms.toLong +} + +val fieldTerm = newName("timestamp") +val millis = literalValue.asInstanceOf[TimestampString].getMillisSinceEpoch Review comment: The timestamp literals in SQL text has passed to a TimestampString instant of a Long(for preserving precision information). See ExprCodeGenerator.scala::visitLiteral (line 389): ` override def visitLiteral(literal: RexLiteral): GeneratedExpression = { val resultType = FlinkTypeFactory.toLogicalType(literal.getType) val value = resultType.getTypeRoot match { case LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE => literal.getValueAs(classOf[TimestampString]) case _ => literal.getValue3 } generateLiteral(ctx, resultType, value) } ` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344541629 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ## @@ -387,7 +389,17 @@ public static TaskExecutor startTaskManager( metricQueryServiceAddress, blobCacheService, fatalErrorHandler, - new TaskExecutorPartitionTrackerImpl()); + new TaskExecutorPartitionTrackerImpl(), + createBackPressureSampleService(configuration, rpcService.getScheduledExecutor())); + } + + static BackPressureSampleService createBackPressureSampleService( Review comment: @VisibleForTesting? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344541629 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ## @@ -387,7 +389,17 @@ public static TaskExecutor startTaskManager( metricQueryServiceAddress, blobCacheService, fatalErrorHandler, - new TaskExecutorPartitionTrackerImpl()); + new TaskExecutorPartitionTrackerImpl(), + createBackPressureSampleService(configuration, rpcService.getScheduledExecutor())); + } + + static BackPressureSampleService createBackPressureSampleService( Review comment: @VisibleForTesting? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344541099 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/BackPressureSampleService.java ## @@ -53,64 +53,58 @@ this.numSamples = numSamples; this.delayBetweenSamples = checkNotNull(delayBetweenSamples); - this.scheduledExecutor = checkNotNull(scheduledExecutor, "The scheduledExecutor must not be null."); + this.scheduledExecutor = checkNotNull(scheduledExecutor); } /** -* Returns a future that completes with the back pressure ratio of a task. +* Schedules to sample the task back pressure and returns a future that completes +* with the back pressure ratio. * * @param task The task to be sampled. -* @return A future of the task back pressure ratio. +* @return A future containing the task back pressure ratio. */ public CompletableFuture sampleTaskBackPressure(BackPressureSampleableTask task) { + if (!task.isRunning()) { + throw new IllegalStateException("Cannot sample task. Because it is not running."); Review comment: It is better to give some debug info here. E.g we could give `task.toString()` in the message, then in the specific task implementation, it can provide `executionId` info as before. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner
docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner URL: https://github.com/apache/flink/pull/10105#discussion_r344540487 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala ## @@ -370,8 +373,56 @@ object GenerateUtils { generateNonNullLiteral(literalType, literalValue.toString, literalValue) case TIMESTAMP_WITHOUT_TIME_ZONE => -val millis = literalValue.asInstanceOf[Long] -generateNonNullLiteral(literalType, millis + "L", millis) +def getNanoOfMillisSinceEpoch(timestampString: TimestampString): Int = { + val v = timestampString.toString() + val length = v.length + val nanoOfSeconds = length match { +case 19 | 20 => 0 +case _ => + JInteger.valueOf(v.substring(20)) * pow(10, 9 - (length - 20)).intValue() + } + nanoOfSeconds % 100 +} + +// TODO: we copied the logical of TimestampString::getMillisSinceEpoch since the copied +// DateTimeUtils.ymdToJulian is wrong. Review comment: Yes. CALCITE-1884 is not fixed in our copied DateTimeUtils. And it's the root cause of some delicate cases, such as: ` SELECT TIMESTAMP '1500-04-30 00:00:00.123456789' FROM docs; SELECT CAST('1500-04-30 00:00:00.123456789' AS DATETIME(9)) FROM docs; ` should returns ` 1500-05-10T00:00:00.123456789 ` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services