[GitHub] [flink] TisonKun commented on a change in pull request #10143: [FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main thread

2019-11-10 Thread GitBox
TisonKun commented on a change in pull request #10143: [FLINK-13184]Starting a 
TaskExecutor blocks the YarnResourceManager's main thread
URL: https://github.com/apache/flink/pull/10143#discussion_r344592398
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -322,11 +323,7 @@ Resource getContainerResource() {
public boolean stopWorker(final YarnWorkerNode workerNode) {
final Container container = workerNode.getContainer();
log.info("Stopping container {}.", container.getId());
-   try {
-   nodeManagerClient.stopContainer(container.getId(), 
container.getNodeId());
-   } catch (final Exception e) {
-   log.warn("Error while calling YARN Node Manager to stop 
container", e);
-   }
+   nodeManagerClient.stopContainerAsync(container.getId(), 
container.getNodeId());
 
 Review comment:
   Well I notice `onXXX` methods below. Would you explain why we don't
   
   ```java
   try {
 ...
   } catch (Throwable t) {
 onStopContainerError(...)
   }
   ```
   
   here as we do when start container?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun commented on a change in pull request #10143: [FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main thread

2019-11-10 Thread GitBox
TisonKun commented on a change in pull request #10143: [FLINK-13184]Starting a 
TaskExecutor blocks the YarnResourceManager's main thread
URL: https://github.com/apache/flink/pull/10143#discussion_r344592017
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -407,15 +404,9 @@ private void startTaskExecutorInContainer(Container 
container) {
containerIdStr,
container.getNodeId().getHost());
 
-   nodeManagerClient.startContainer(container, 
taskExecutorLaunchContext);
+   nodeManagerClient.startContainerAsync(container, 
taskExecutorLaunchContext);
} catch (Throwable t) {
-   log.error("Could not start TaskManager in container 
{}.", container.getId(), t);
-
-   // release the failed container
-   workerNodeMap.remove(resourceId);
-   
resourceManagerClient.releaseAssignedContainer(container.getId());
-   // and ask for a new one
-   requestYarnContainerIfRequired();
+   onStartContainerError(container.getId(), t);
 
 Review comment:
   The same here. How can we react to start container failure? I don't think it 
handles the future result of an async method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun commented on a change in pull request #10143: [FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main thread

2019-11-10 Thread GitBox
TisonKun commented on a change in pull request #10143: [FLINK-13184]Starting a 
TaskExecutor blocks the YarnResourceManager's main thread
URL: https://github.com/apache/flink/pull/10143#discussion_r344592017
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -407,15 +404,9 @@ private void startTaskExecutorInContainer(Container 
container) {
containerIdStr,
container.getNodeId().getHost());
 
-   nodeManagerClient.startContainer(container, 
taskExecutorLaunchContext);
+   nodeManagerClient.startContainerAsync(container, 
taskExecutorLaunchContext);
} catch (Throwable t) {
-   log.error("Could not start TaskManager in container 
{}.", container.getId(), t);
-
-   // release the failed container
-   workerNodeMap.remove(resourceId);
-   
resourceManagerClient.releaseAssignedContainer(container.getId());
-   // and ask for a new one
-   requestYarnContainerIfRequired();
+   onStartContainerError(container.getId(), t);
 
 Review comment:
   The same here. How can we react to start container failure? I don't think it 
handles the future result of an async method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344592080
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/BackPressureSampleServiceTest.java
 ##
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link BackPressureSampleService}.
+ */
+public class BackPressureSampleServiceTest extends TestLogger {
+
+   private ScheduledExecutorService scheduledExecutorService;
+
+   private BackPressureSampleService backPressureSampleService;
+
+   @Before
 
 Review comment:
   use @BeforeClass, we can have a class level service instead of creating 
instance for every test.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10106: [FLINK-11463][e2e] Design the e2e java framework so that at least the Kafka streaming tests can run on it

2019-11-10 Thread GitBox
flinkbot edited a comment on issue #10106: [FLINK-11463][e2e] Design the e2e 
java framework so that at least the Kafka streaming tests can run on it
URL: https://github.com/apache/flink/pull/10106#issuecomment-550309075
 
 
   
   ## CI report:
   
   * 9817ef9c8be75fd02ccef3825e706497f3e6c1b6 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/135261385)
   * 48033e52f0aa852d03c351319d464dedab3c8088 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/135397835)
   * 75bf191da0a22986a3877236a0f4be9dbe2e8606 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/135874115)
   * 676cae9dbfa13d89e1618655da7b67cba8b647d4 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/135875904)
   * 3f67bbfda975cad5bc1f3d3e7e5ee28fb7326e9f : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun commented on a change in pull request #10143: [FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main thread

2019-11-10 Thread GitBox
TisonKun commented on a change in pull request #10143: [FLINK-13184]Starting a 
TaskExecutor blocks the YarnResourceManager's main thread
URL: https://github.com/apache/flink/pull/10143#discussion_r344591626
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -322,11 +323,7 @@ Resource getContainerResource() {
public boolean stopWorker(final YarnWorkerNode workerNode) {
final Container container = workerNode.getContainer();
log.info("Stopping container {}.", container.getId());
-   try {
-   nodeManagerClient.stopContainer(container.getId(), 
container.getNodeId());
-   } catch (final Exception e) {
-   log.warn("Error while calling YARN Node Manager to stop 
container", e);
-   }
+   nodeManagerClient.stopContainerAsync(container.getId(), 
container.getNodeId());
 
 Review comment:
   How can we react to stop container failure?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344591459
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/BackPressureSampleServiceTest.java
 ##
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link BackPressureSampleService}.
+ */
+public class BackPressureSampleServiceTest extends TestLogger {
+
+   private ScheduledExecutorService scheduledExecutorService;
+
+   private BackPressureSampleService backPressureSampleService;
+
+   @Before
+   public void setUp() throws Exception {
+   scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor();
+   final ScheduledExecutor scheduledExecutor = new 
ScheduledExecutorServiceAdapter(scheduledExecutorService);
+
+   backPressureSampleService = new BackPressureSampleService( 10, 
Time.milliseconds(10), scheduledExecutor);
 
 Review comment:
   remove whitespace before `10`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344590458
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureTrackerTestUtils.java
 ##
 @@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TestingSlotProvider;
+import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Utility methods for {@link BackPressureStatsTrackerImplTest} and {@link 
BackPressureRequestCoordinatorTest}.
+ */
+public class BackPressureTrackerTestUtils {
+
+   public static ExecutionJobVertex createExecutionJobVertex() throws 
Exception {
+   return new ExecutionJobVertex(
+   createExecutionGraph(),
+   new JobVertex("TestingJobVertex", new JobVertexID()),
+   4,
+   
JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue(),
+   Time.milliseconds(1),
+   1L,
+   System.currentTimeMillis());
+   }
+
+   public static ExecutionGraph createExecutionGraph() throws IOException {
 
 Review comment:
   We could create a more simple `ExecutionGraph` like below: 
   
   ```
   ExecutionGraph eg = new ExecutionGraph(
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
new JobID(), 
"test", 
new Configuration(),
new SerializedValue<>(new ExecutionConfig()),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
new TestingSlotProvider(ignored -> new 
CompletableFuture<>()));
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344589965
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureTrackerTestUtils.java
 ##
 @@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TestingSlotProvider;
+import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Utility methods for {@link BackPressureStatsTrackerImplTest} and {@link 
BackPressureRequestCoordinatorTest}.
+ */
+public class BackPressureTrackerTestUtils {
+
+   public static ExecutionJobVertex createExecutionJobVertex() throws 
Exception {
 
 Review comment:
   Actually we could use a simpler constructor for `ExecutionJobVertex`, even 
it is better to use `ExecutionJobVertexTest#createExecutionJobVertex` directly, 
because `BackPressureTrackerTestUtils` actually did not create back pressure 
specific components. 
   
   Although the execution graph is mocked inside 
`ExecutionJobVertexTest#createExecutionJobVertex`, it is not touched by this 
PR, so we can keep it that or make it as a real graph if you like (not 
mandatory).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344589965
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureTrackerTestUtils.java
 ##
 @@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TestingSlotProvider;
+import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Utility methods for {@link BackPressureStatsTrackerImplTest} and {@link 
BackPressureRequestCoordinatorTest}.
+ */
+public class BackPressureTrackerTestUtils {
+
+   public static ExecutionJobVertex createExecutionJobVertex() throws 
Exception {
 
 Review comment:
   Actually we could use a simpler constructor for `ExecutionJobVertex`, even 
it is better to use `ExecutionJobVertexTest#createExecutionJobVertex` directly, 
because `BackPressureTrackerTestUtils` actually did not create back pressure 
specific components. 
   
   Although the execution graph was mocked inside 
`ExecutionJobVertexTest#createExecutionJobVertex`, it is not touched by this 
PR, so we can keep it that or make it as a real graph if you like (not 
mandatory).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-14702) Rewrite the tpch ( test_tpch.sh ) e2e tests by using the newly introduced java framework

2019-11-10 Thread Zheng Hu (Jira)
Zheng Hu created FLINK-14702:


 Summary: Rewrite the tpch ( test_tpch.sh ) e2e tests by using the 
newly introduced java framework
 Key: FLINK-14702
 URL: https://issues.apache.org/jira/browse/FLINK-14702
 Project: Flink
  Issue Type: Sub-task
Reporter: Zheng Hu


Will rewrite the 
https://github.com/apache/flink/blob/master/flink-end-to-end-tests/test-scripts/test_tpch.sh
   e2e tests by java e2e framework. 




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14674) some tpc-ds query hang in scheduled stage for long time

2019-11-10 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16971363#comment-16971363
 ] 

Zhu Zhu commented on FLINK-14674:
-

The job hangs due to FLINK-14701, which happen if 
{{SharedSlotOversubscribedException}} happens (A.T.M only if a shared slot 
cannot fulfill the managed memory requirements from all tasks in it).
Insufficient managed memory for one task is also a problem for q28 and q77, 
however, jobs will fail but not hang in this case if FLINK-14701 is resolved.

> some tpc-ds query hang in scheduled stage for long time
> ---
>
> Key: FLINK-14674
> URL: https://issues.apache.org/jira/browse/FLINK-14674
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Affects Versions: 1.9.1
>Reporter: Leonard Xu
>Priority: Major
> Fix For: 1.10.0
>
>
> When run tpc-ds query in standalone mode, some query(4、11、28、31、77、88) hang 
> in scheduled stage for long time, 
> BTW,I use blink planner,bath mode shuffle.
> Can reproduce this issue in 
> [https://github.com/leonardBang/flink/tree/FLINK-11491]
>  
> cc [~zhuzh]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344587521
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplTest.java
 ##
 @@ -18,166 +18,218 @@
 
 package org.apache.flink.runtime.rest.handler.legacy.backpressure;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.Mockito;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
 /**
- * Tests for the BackPressureStatsTrackerImpl.
+ * Tests for the {@link BackPressureStatsTrackerImpl}.
  */
 public class BackPressureStatsTrackerImplTest extends TestLogger {
 
-   /** Tests simple statistics with fake stack traces. */
@Test
-   @SuppressWarnings("unchecked")
-   public void testTriggerStackTraceSample() throws Exception {
-   CompletableFuture sampleFuture = new 
CompletableFuture<>();
-
-   StackTraceSampleCoordinator sampleCoordinator = 
Mockito.mock(StackTraceSampleCoordinator.class);
-   Mockito.when(sampleCoordinator.triggerStackTraceSample(
-   Matchers.any(ExecutionVertex[].class),
-   Matchers.anyInt(),
-   Matchers.any(Time.class),
-   Matchers.anyInt())).thenReturn(sampleFuture);
-
-   ExecutionGraph graph = Mockito.mock(ExecutionGraph.class);
-   Mockito.when(graph.getState()).thenReturn(JobStatus.RUNNING);
-
-   // Same Thread execution context
-   Mockito.when(graph.getFutureExecutor()).thenReturn(new 
Executor() {
-
-   @Override
-   public void execute(Runnable runnable) {
-   runnable.run();
-   }
-   });
-
-   ExecutionVertex[] taskVertices = new ExecutionVertex[4];
-
-   ExecutionJobVertex jobVertex = 
Mockito.mock(ExecutionJobVertex.class);
-   Mockito.when(jobVertex.getJobId()).thenReturn(new JobID());
-   Mockito.when(jobVertex.getJobVertexId()).thenReturn(new 
JobVertexID());
-   Mockito.when(jobVertex.getGraph()).thenReturn(graph);
-   
Mockito.when(jobVertex.getTaskVertices()).thenReturn(taskVertices);
-
-   taskVertices[0] = mockExecutionVertex(jobVertex, 0);
-   taskVertices[1] = mockExecutionVertex(jobVertex, 1);
-   taskVertices[2] = mockExecutionVertex(jobVertex, 2);
-   taskVertices[3] = mockExecutionVertex(jobVertex, 3);
-
-   int numSamples = 100;
-   Time delayBetweenSamples = Time.milliseconds(100L);
-
-   BackPressureStatsTrackerImpl tracker = new 
BackPressureStatsTrackerImpl(
-   sampleCoordinator, , numSamples, 
Integer.MAX_VALUE, delayBetweenSamples);
-
-   // getOperatorBackPressureStats triggers stack trace sampling
-   
Assert.assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
-
-   Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
-   Matchers.eq(taskVertices),
-   Matchers.eq(numSamples),
-   Matchers.eq(delayBetweenSamples),
-   
Matchers.eq(BackPressureStatsTrackerImpl.MAX_STACK_TRACE_DEPTH));
-
-   // Request back pressure stats again. This should not trigger 
another sample request
-   
Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent());
-
-   Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
-   Matchers.eq(taskVertices),
-   Matchers.eq(numSamples),
- 

[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344586695
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplTest.java
 ##
 @@ -18,166 +18,218 @@
 
 package org.apache.flink.runtime.rest.handler.legacy.backpressure;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.Mockito;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
 /**
- * Tests for the BackPressureStatsTrackerImpl.
+ * Tests for the {@link BackPressureStatsTrackerImpl}.
  */
 public class BackPressureStatsTrackerImplTest extends TestLogger {
 
-   /** Tests simple statistics with fake stack traces. */
@Test
-   @SuppressWarnings("unchecked")
-   public void testTriggerStackTraceSample() throws Exception {
-   CompletableFuture sampleFuture = new 
CompletableFuture<>();
-
-   StackTraceSampleCoordinator sampleCoordinator = 
Mockito.mock(StackTraceSampleCoordinator.class);
-   Mockito.when(sampleCoordinator.triggerStackTraceSample(
-   Matchers.any(ExecutionVertex[].class),
-   Matchers.anyInt(),
-   Matchers.any(Time.class),
-   Matchers.anyInt())).thenReturn(sampleFuture);
-
-   ExecutionGraph graph = Mockito.mock(ExecutionGraph.class);
-   Mockito.when(graph.getState()).thenReturn(JobStatus.RUNNING);
-
-   // Same Thread execution context
-   Mockito.when(graph.getFutureExecutor()).thenReturn(new 
Executor() {
-
-   @Override
-   public void execute(Runnable runnable) {
-   runnable.run();
-   }
-   });
-
-   ExecutionVertex[] taskVertices = new ExecutionVertex[4];
-
-   ExecutionJobVertex jobVertex = 
Mockito.mock(ExecutionJobVertex.class);
-   Mockito.when(jobVertex.getJobId()).thenReturn(new JobID());
-   Mockito.when(jobVertex.getJobVertexId()).thenReturn(new 
JobVertexID());
-   Mockito.when(jobVertex.getGraph()).thenReturn(graph);
-   
Mockito.when(jobVertex.getTaskVertices()).thenReturn(taskVertices);
-
-   taskVertices[0] = mockExecutionVertex(jobVertex, 0);
-   taskVertices[1] = mockExecutionVertex(jobVertex, 1);
-   taskVertices[2] = mockExecutionVertex(jobVertex, 2);
-   taskVertices[3] = mockExecutionVertex(jobVertex, 3);
-
-   int numSamples = 100;
-   Time delayBetweenSamples = Time.milliseconds(100L);
-
-   BackPressureStatsTrackerImpl tracker = new 
BackPressureStatsTrackerImpl(
-   sampleCoordinator, , numSamples, 
Integer.MAX_VALUE, delayBetweenSamples);
-
-   // getOperatorBackPressureStats triggers stack trace sampling
-   
Assert.assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
-
-   Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
-   Matchers.eq(taskVertices),
-   Matchers.eq(numSamples),
-   Matchers.eq(delayBetweenSamples),
-   
Matchers.eq(BackPressureStatsTrackerImpl.MAX_STACK_TRACE_DEPTH));
-
-   // Request back pressure stats again. This should not trigger 
another sample request
-   
Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent());
-
-   Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
-   Matchers.eq(taskVertices),
-   Matchers.eq(numSamples),
- 

[jira] [Commented] (FLINK-14701) Slot leaks if SharedSlotOversubscribedException happens

2019-11-10 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16971359#comment-16971359
 ] 

Zhu Zhu commented on FLINK-14701:
-

[~chesnay], what do you think of the issue and the proposed solution?
This issue also happens in 1.9 so I think we also need the fix there.

> Slot leaks if SharedSlotOversubscribedException happens
> ---
>
> Key: FLINK-14701
> URL: https://issues.apache.org/jira/browse/FLINK-14701
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.9.2
>Reporter: Zhu Zhu
>Priority: Blocker
> Fix For: 1.10.0, 1.9.2
>
>
> If a {{SharedSlotOversubscribedException}} happens, the {{MultiTaskSlot}} 
> will release some of its child {{SingleTaskSlot}}. The triggered releasing 
> will trigger a re-allocation of the task slot right inside 
> {{SingleTaskSlot#release(...)}}. So that a previous allocation 
>  in {{SloSharingManager#allTaskSlots}} will be replaced by the new allocation 
> because they share the same {{slotRequestId}}.
> However, the {{SingleTaskSlot#release(...)}} will then invoke 
> {{MultiTaskSlot#releaseChild}} to release the previous allocation with the 
> {{slotRequestId}}, which will unexpectedly remove the new allocation from the 
> {{SloSharingManager}}.
> In this way, slot leak happens because the pending slot request is not 
> tracked by the {{SloSharingManager}} and cannot be released when its payload 
> terminates.
> A test case {{testNoSlotLeakOnSharedSlotOversubscribedException}} which 
> exhibits this issue can be found in this 
> [commit|https://github.com/zhuzhurk/flink/commit/9024e2e9eb4bd17f371896d6dbc745bc9e585e14].
> The slot leak blocks the TPC-DS queries on flink 1.10, see FLINK-14674.
> To solve it, I'd propose to strengthen the {{MultiTaskSlot#releaseChild}} to 
> only remove its true child task slot from the {{SloSharingManager}}, i.e. add 
> a check {{if (child == allTaskSlots.get(child.getSlotRequestId()))}} before 
> invoking {{allTaskSlots.remove(child.getSlotRequestId())}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344586291
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplTest.java
 ##
 @@ -18,166 +18,218 @@
 
 package org.apache.flink.runtime.rest.handler.legacy.backpressure;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.Mockito;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
 /**
- * Tests for the BackPressureStatsTrackerImpl.
+ * Tests for the {@link BackPressureStatsTrackerImpl}.
  */
 public class BackPressureStatsTrackerImplTest extends TestLogger {
 
-   /** Tests simple statistics with fake stack traces. */
@Test
-   @SuppressWarnings("unchecked")
-   public void testTriggerStackTraceSample() throws Exception {
-   CompletableFuture sampleFuture = new 
CompletableFuture<>();
-
-   StackTraceSampleCoordinator sampleCoordinator = 
Mockito.mock(StackTraceSampleCoordinator.class);
-   Mockito.when(sampleCoordinator.triggerStackTraceSample(
-   Matchers.any(ExecutionVertex[].class),
-   Matchers.anyInt(),
-   Matchers.any(Time.class),
-   Matchers.anyInt())).thenReturn(sampleFuture);
-
-   ExecutionGraph graph = Mockito.mock(ExecutionGraph.class);
-   Mockito.when(graph.getState()).thenReturn(JobStatus.RUNNING);
-
-   // Same Thread execution context
-   Mockito.when(graph.getFutureExecutor()).thenReturn(new 
Executor() {
-
-   @Override
-   public void execute(Runnable runnable) {
-   runnable.run();
-   }
-   });
-
-   ExecutionVertex[] taskVertices = new ExecutionVertex[4];
-
-   ExecutionJobVertex jobVertex = 
Mockito.mock(ExecutionJobVertex.class);
-   Mockito.when(jobVertex.getJobId()).thenReturn(new JobID());
-   Mockito.when(jobVertex.getJobVertexId()).thenReturn(new 
JobVertexID());
-   Mockito.when(jobVertex.getGraph()).thenReturn(graph);
-   
Mockito.when(jobVertex.getTaskVertices()).thenReturn(taskVertices);
-
-   taskVertices[0] = mockExecutionVertex(jobVertex, 0);
-   taskVertices[1] = mockExecutionVertex(jobVertex, 1);
-   taskVertices[2] = mockExecutionVertex(jobVertex, 2);
-   taskVertices[3] = mockExecutionVertex(jobVertex, 3);
-
-   int numSamples = 100;
-   Time delayBetweenSamples = Time.milliseconds(100L);
-
-   BackPressureStatsTrackerImpl tracker = new 
BackPressureStatsTrackerImpl(
-   sampleCoordinator, , numSamples, 
Integer.MAX_VALUE, delayBetweenSamples);
-
-   // getOperatorBackPressureStats triggers stack trace sampling
-   
Assert.assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
-
-   Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
-   Matchers.eq(taskVertices),
-   Matchers.eq(numSamples),
-   Matchers.eq(delayBetweenSamples),
-   
Matchers.eq(BackPressureStatsTrackerImpl.MAX_STACK_TRACE_DEPTH));
-
-   // Request back pressure stats again. This should not trigger 
another sample request
-   
Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent());
-
-   Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
-   Matchers.eq(taskVertices),
-   Matchers.eq(numSamples),
- 

[GitHub] [flink] flinkbot edited a comment on issue #10146: [FLINK-14188][runtime] TaskExecutor derive and register with default slot resource profile

2019-11-10 Thread GitBox
flinkbot edited a comment on issue #10146: [FLINK-14188][runtime] TaskExecutor 
derive and register with default slot resource profile
URL: https://github.com/apache/flink/pull/10146#issuecomment-552307115
 
 
   
   ## CI report:
   
   * 25f9e4b87846e5a736aa329c834f82962e1f50c4 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/135875925)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-14701) Slot leaks if SharedSlotOversubscribedException happens

2019-11-10 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-14701:
---

 Summary: Slot leaks if SharedSlotOversubscribedException happens
 Key: FLINK-14701
 URL: https://issues.apache.org/jira/browse/FLINK-14701
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.10.0, 1.9.2
Reporter: Zhu Zhu
 Fix For: 1.10.0, 1.9.2


If a {{SharedSlotOversubscribedException}} happens, the {{MultiTaskSlot}} will 
release some of its child {{SingleTaskSlot}}. The triggered releasing will 
trigger a re-allocation of the task slot right inside 
{{SingleTaskSlot#release(...)}}. So that a previous allocation 
 in {{SloSharingManager#allTaskSlots}} will be replaced by the new allocation 
because they share the same {{slotRequestId}}.
However, the {{SingleTaskSlot#release(...)}} will then invoke 
{{MultiTaskSlot#releaseChild}} to release the previous allocation with the 
{{slotRequestId}}, which will unexpectedly remove the new allocation from the 
{{SloSharingManager}}.
In this way, slot leak happens because the pending slot request is not tracked 
by the {{SloSharingManager}} and cannot be released when its payload terminates.

A test case {{testNoSlotLeakOnSharedSlotOversubscribedException}} which 
exhibits this issue can be found in this 
[commit|https://github.com/zhuzhurk/flink/commit/9024e2e9eb4bd17f371896d6dbc745bc9e585e14].

The slot leak blocks the TPC-DS queries on flink 1.10, see FLINK-14674.

To solve it, I'd propose to strengthen the {{MultiTaskSlot#releaseChild}} to 
only remove its true child task slot from the {{SloSharingManager}}, i.e. add a 
check {{if (child == allTaskSlots.get(child.getSlotRequestId()))}} before 
invoking {{allTaskSlots.remove(child.getSlotRequestId())}}.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #10106: [FLINK-11463][e2e] Design the e2e java framework so that at least the Kafka streaming tests can run on it

2019-11-10 Thread GitBox
flinkbot edited a comment on issue #10106: [FLINK-11463][e2e] Design the e2e 
java framework so that at least the Kafka streaming tests can run on it
URL: https://github.com/apache/flink/pull/10106#issuecomment-550309075
 
 
   
   ## CI report:
   
   * 9817ef9c8be75fd02ccef3825e706497f3e6c1b6 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/135261385)
   * 48033e52f0aa852d03c351319d464dedab3c8088 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/135397835)
   * 75bf191da0a22986a3877236a0f4be9dbe2e8606 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/135874115)
   * 676cae9dbfa13d89e1618655da7b67cba8b647d4 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/135875904)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344584416
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplTest.java
 ##
 @@ -18,166 +18,218 @@
 
 package org.apache.flink.runtime.rest.handler.legacy.backpressure;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.Mockito;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
 /**
- * Tests for the BackPressureStatsTrackerImpl.
+ * Tests for the {@link BackPressureStatsTrackerImpl}.
  */
 public class BackPressureStatsTrackerImplTest extends TestLogger {
 
-   /** Tests simple statistics with fake stack traces. */
@Test
-   @SuppressWarnings("unchecked")
-   public void testTriggerStackTraceSample() throws Exception {
-   CompletableFuture sampleFuture = new 
CompletableFuture<>();
-
-   StackTraceSampleCoordinator sampleCoordinator = 
Mockito.mock(StackTraceSampleCoordinator.class);
-   Mockito.when(sampleCoordinator.triggerStackTraceSample(
-   Matchers.any(ExecutionVertex[].class),
-   Matchers.anyInt(),
-   Matchers.any(Time.class),
-   Matchers.anyInt())).thenReturn(sampleFuture);
-
-   ExecutionGraph graph = Mockito.mock(ExecutionGraph.class);
-   Mockito.when(graph.getState()).thenReturn(JobStatus.RUNNING);
-
-   // Same Thread execution context
-   Mockito.when(graph.getFutureExecutor()).thenReturn(new 
Executor() {
-
-   @Override
-   public void execute(Runnable runnable) {
-   runnable.run();
-   }
-   });
-
-   ExecutionVertex[] taskVertices = new ExecutionVertex[4];
-
-   ExecutionJobVertex jobVertex = 
Mockito.mock(ExecutionJobVertex.class);
-   Mockito.when(jobVertex.getJobId()).thenReturn(new JobID());
-   Mockito.when(jobVertex.getJobVertexId()).thenReturn(new 
JobVertexID());
-   Mockito.when(jobVertex.getGraph()).thenReturn(graph);
-   
Mockito.when(jobVertex.getTaskVertices()).thenReturn(taskVertices);
-
-   taskVertices[0] = mockExecutionVertex(jobVertex, 0);
-   taskVertices[1] = mockExecutionVertex(jobVertex, 1);
-   taskVertices[2] = mockExecutionVertex(jobVertex, 2);
-   taskVertices[3] = mockExecutionVertex(jobVertex, 3);
-
-   int numSamples = 100;
-   Time delayBetweenSamples = Time.milliseconds(100L);
-
-   BackPressureStatsTrackerImpl tracker = new 
BackPressureStatsTrackerImpl(
-   sampleCoordinator, , numSamples, 
Integer.MAX_VALUE, delayBetweenSamples);
-
-   // getOperatorBackPressureStats triggers stack trace sampling
-   
Assert.assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
-
-   Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
-   Matchers.eq(taskVertices),
-   Matchers.eq(numSamples),
-   Matchers.eq(delayBetweenSamples),
-   
Matchers.eq(BackPressureStatsTrackerImpl.MAX_STACK_TRACE_DEPTH));
-
-   // Request back pressure stats again. This should not trigger 
another sample request
-   
Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent());
-
-   Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
-   Matchers.eq(taskVertices),
-   Matchers.eq(numSamples),
- 

[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344583750
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplTest.java
 ##
 @@ -18,166 +18,218 @@
 
 package org.apache.flink.runtime.rest.handler.legacy.backpressure;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.Mockito;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
 /**
- * Tests for the BackPressureStatsTrackerImpl.
+ * Tests for the {@link BackPressureStatsTrackerImpl}.
  */
 public class BackPressureStatsTrackerImplTest extends TestLogger {
 
-   /** Tests simple statistics with fake stack traces. */
@Test
-   @SuppressWarnings("unchecked")
-   public void testTriggerStackTraceSample() throws Exception {
-   CompletableFuture sampleFuture = new 
CompletableFuture<>();
-
-   StackTraceSampleCoordinator sampleCoordinator = 
Mockito.mock(StackTraceSampleCoordinator.class);
-   Mockito.when(sampleCoordinator.triggerStackTraceSample(
-   Matchers.any(ExecutionVertex[].class),
-   Matchers.anyInt(),
-   Matchers.any(Time.class),
-   Matchers.anyInt())).thenReturn(sampleFuture);
-
-   ExecutionGraph graph = Mockito.mock(ExecutionGraph.class);
-   Mockito.when(graph.getState()).thenReturn(JobStatus.RUNNING);
-
-   // Same Thread execution context
-   Mockito.when(graph.getFutureExecutor()).thenReturn(new 
Executor() {
-
-   @Override
-   public void execute(Runnable runnable) {
-   runnable.run();
-   }
-   });
-
-   ExecutionVertex[] taskVertices = new ExecutionVertex[4];
-
-   ExecutionJobVertex jobVertex = 
Mockito.mock(ExecutionJobVertex.class);
-   Mockito.when(jobVertex.getJobId()).thenReturn(new JobID());
-   Mockito.when(jobVertex.getJobVertexId()).thenReturn(new 
JobVertexID());
-   Mockito.when(jobVertex.getGraph()).thenReturn(graph);
-   
Mockito.when(jobVertex.getTaskVertices()).thenReturn(taskVertices);
-
-   taskVertices[0] = mockExecutionVertex(jobVertex, 0);
-   taskVertices[1] = mockExecutionVertex(jobVertex, 1);
-   taskVertices[2] = mockExecutionVertex(jobVertex, 2);
-   taskVertices[3] = mockExecutionVertex(jobVertex, 3);
-
-   int numSamples = 100;
-   Time delayBetweenSamples = Time.milliseconds(100L);
-
-   BackPressureStatsTrackerImpl tracker = new 
BackPressureStatsTrackerImpl(
-   sampleCoordinator, , numSamples, 
Integer.MAX_VALUE, delayBetweenSamples);
-
-   // getOperatorBackPressureStats triggers stack trace sampling
-   
Assert.assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
-
-   Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
-   Matchers.eq(taskVertices),
-   Matchers.eq(numSamples),
-   Matchers.eq(delayBetweenSamples),
-   
Matchers.eq(BackPressureStatsTrackerImpl.MAX_STACK_TRACE_DEPTH));
-
-   // Request back pressure stats again. This should not trigger 
another sample request
-   
Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent());
-
-   Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
-   Matchers.eq(taskVertices),
-   Matchers.eq(numSamples),
- 

[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344583125
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplTest.java
 ##
 @@ -18,166 +18,218 @@
 
 package org.apache.flink.runtime.rest.handler.legacy.backpressure;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.Mockito;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
 /**
- * Tests for the BackPressureStatsTrackerImpl.
+ * Tests for the {@link BackPressureStatsTrackerImpl}.
  */
 public class BackPressureStatsTrackerImplTest extends TestLogger {
 
-   /** Tests simple statistics with fake stack traces. */
@Test
-   @SuppressWarnings("unchecked")
-   public void testTriggerStackTraceSample() throws Exception {
-   CompletableFuture sampleFuture = new 
CompletableFuture<>();
-
-   StackTraceSampleCoordinator sampleCoordinator = 
Mockito.mock(StackTraceSampleCoordinator.class);
-   Mockito.when(sampleCoordinator.triggerStackTraceSample(
-   Matchers.any(ExecutionVertex[].class),
-   Matchers.anyInt(),
-   Matchers.any(Time.class),
-   Matchers.anyInt())).thenReturn(sampleFuture);
-
-   ExecutionGraph graph = Mockito.mock(ExecutionGraph.class);
-   Mockito.when(graph.getState()).thenReturn(JobStatus.RUNNING);
-
-   // Same Thread execution context
-   Mockito.when(graph.getFutureExecutor()).thenReturn(new 
Executor() {
-
-   @Override
-   public void execute(Runnable runnable) {
-   runnable.run();
-   }
-   });
-
-   ExecutionVertex[] taskVertices = new ExecutionVertex[4];
-
-   ExecutionJobVertex jobVertex = 
Mockito.mock(ExecutionJobVertex.class);
-   Mockito.when(jobVertex.getJobId()).thenReturn(new JobID());
-   Mockito.when(jobVertex.getJobVertexId()).thenReturn(new 
JobVertexID());
-   Mockito.when(jobVertex.getGraph()).thenReturn(graph);
-   
Mockito.when(jobVertex.getTaskVertices()).thenReturn(taskVertices);
-
-   taskVertices[0] = mockExecutionVertex(jobVertex, 0);
-   taskVertices[1] = mockExecutionVertex(jobVertex, 1);
-   taskVertices[2] = mockExecutionVertex(jobVertex, 2);
-   taskVertices[3] = mockExecutionVertex(jobVertex, 3);
-
-   int numSamples = 100;
-   Time delayBetweenSamples = Time.milliseconds(100L);
-
-   BackPressureStatsTrackerImpl tracker = new 
BackPressureStatsTrackerImpl(
-   sampleCoordinator, , numSamples, 
Integer.MAX_VALUE, delayBetweenSamples);
-
-   // getOperatorBackPressureStats triggers stack trace sampling
-   
Assert.assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
-
-   Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
-   Matchers.eq(taskVertices),
-   Matchers.eq(numSamples),
-   Matchers.eq(delayBetweenSamples),
-   
Matchers.eq(BackPressureStatsTrackerImpl.MAX_STACK_TRACE_DEPTH));
-
-   // Request back pressure stats again. This should not trigger 
another sample request
-   
Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent());
-
-   Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
-   Matchers.eq(taskVertices),
-   Matchers.eq(numSamples),
- 

[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344582415
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplTest.java
 ##
 @@ -18,166 +18,218 @@
 
 package org.apache.flink.runtime.rest.handler.legacy.backpressure;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.Mockito;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
 /**
- * Tests for the BackPressureStatsTrackerImpl.
+ * Tests for the {@link BackPressureStatsTrackerImpl}.
  */
 public class BackPressureStatsTrackerImplTest extends TestLogger {
 
-   /** Tests simple statistics with fake stack traces. */
@Test
-   @SuppressWarnings("unchecked")
-   public void testTriggerStackTraceSample() throws Exception {
-   CompletableFuture sampleFuture = new 
CompletableFuture<>();
-
-   StackTraceSampleCoordinator sampleCoordinator = 
Mockito.mock(StackTraceSampleCoordinator.class);
-   Mockito.when(sampleCoordinator.triggerStackTraceSample(
-   Matchers.any(ExecutionVertex[].class),
-   Matchers.anyInt(),
-   Matchers.any(Time.class),
-   Matchers.anyInt())).thenReturn(sampleFuture);
-
-   ExecutionGraph graph = Mockito.mock(ExecutionGraph.class);
-   Mockito.when(graph.getState()).thenReturn(JobStatus.RUNNING);
-
-   // Same Thread execution context
-   Mockito.when(graph.getFutureExecutor()).thenReturn(new 
Executor() {
-
-   @Override
-   public void execute(Runnable runnable) {
-   runnable.run();
-   }
-   });
-
-   ExecutionVertex[] taskVertices = new ExecutionVertex[4];
-
-   ExecutionJobVertex jobVertex = 
Mockito.mock(ExecutionJobVertex.class);
-   Mockito.when(jobVertex.getJobId()).thenReturn(new JobID());
-   Mockito.when(jobVertex.getJobVertexId()).thenReturn(new 
JobVertexID());
-   Mockito.when(jobVertex.getGraph()).thenReturn(graph);
-   
Mockito.when(jobVertex.getTaskVertices()).thenReturn(taskVertices);
-
-   taskVertices[0] = mockExecutionVertex(jobVertex, 0);
-   taskVertices[1] = mockExecutionVertex(jobVertex, 1);
-   taskVertices[2] = mockExecutionVertex(jobVertex, 2);
-   taskVertices[3] = mockExecutionVertex(jobVertex, 3);
-
-   int numSamples = 100;
-   Time delayBetweenSamples = Time.milliseconds(100L);
-
-   BackPressureStatsTrackerImpl tracker = new 
BackPressureStatsTrackerImpl(
-   sampleCoordinator, , numSamples, 
Integer.MAX_VALUE, delayBetweenSamples);
-
-   // getOperatorBackPressureStats triggers stack trace sampling
-   
Assert.assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
-
-   Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
-   Matchers.eq(taskVertices),
-   Matchers.eq(numSamples),
-   Matchers.eq(delayBetweenSamples),
-   
Matchers.eq(BackPressureStatsTrackerImpl.MAX_STACK_TRACE_DEPTH));
-
-   // Request back pressure stats again. This should not trigger 
another sample request
-   
Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent());
-
-   Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
-   Matchers.eq(taskVertices),
-   Matchers.eq(numSamples),
- 

[GitHub] [flink] openinx edited a comment on issue #10106: [FLINK-11463][e2e] Design the e2e java framework so that at least the Kafka streaming tests can run on it

2019-11-10 Thread GitBox
openinx edited a comment on issue #10106: [FLINK-11463][e2e] Design the e2e 
java framework so that at least the Kafka streaming tests can run on it
URL: https://github.com/apache/flink/pull/10106#issuecomment-552298571
 
 
   Update the pull request: 
   1.  use the Files.walkFileTree to copy the directory of flink dist dir to 
testing directory;
   2. Made the kafka resources and tests into a separate sub-module to 
eliminate the dependency; 
   3. Remove the getRestPort ( which parse rest.port in flink-conf.yaml) .  
Will file separate issue to address the random port binding  
(https://issues.apache.org/jira/browse/FLINK-14700). 
   FYI @zentol , Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-14700) Consider to set the random listen port in e2e flink cluster.

2019-11-10 Thread Zheng Hu (Jira)
Zheng Hu created FLINK-14700:


 Summary: Consider to set the random listen port in e2e flink 
cluster.
 Key: FLINK-14700
 URL: https://issues.apache.org/jira/browse/FLINK-14700
 Project: Flink
  Issue Type: Sub-task
Reporter: Zheng Hu


Have a discussion with [~chesnay] in PR 
https://github.com/apache/flink/pull/10106 , we plan to set the ports of e2e 
testing flink cluster to be 0,  which means it will select the port randomly to 
bind & listen.  That would be helpful to avoid port conflicts when running 
parallel e2e tests.   But that need some way to find the listen port because 
the tests will wait the port to be available ( by requesting to the port 
repeatably).  we discussed the three ways to find the port: 
1.  Sweep the listening ports; 
2.  MiniCluster pre-check the available ports and choose one to listen , then 
persist the port to flink-conf.xml; 
3.  A regular parser to extract the binded port in log messages; 
...
Anyway,  filed the issue to address the things here,  will try to fix it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344581857
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplTest.java
 ##
 @@ -18,166 +18,218 @@
 
 package org.apache.flink.runtime.rest.handler.legacy.backpressure;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.Mockito;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
 /**
- * Tests for the BackPressureStatsTrackerImpl.
+ * Tests for the {@link BackPressureStatsTrackerImpl}.
  */
 public class BackPressureStatsTrackerImplTest extends TestLogger {
 
-   /** Tests simple statistics with fake stack traces. */
@Test
-   @SuppressWarnings("unchecked")
-   public void testTriggerStackTraceSample() throws Exception {
-   CompletableFuture sampleFuture = new 
CompletableFuture<>();
-
-   StackTraceSampleCoordinator sampleCoordinator = 
Mockito.mock(StackTraceSampleCoordinator.class);
-   Mockito.when(sampleCoordinator.triggerStackTraceSample(
-   Matchers.any(ExecutionVertex[].class),
-   Matchers.anyInt(),
-   Matchers.any(Time.class),
-   Matchers.anyInt())).thenReturn(sampleFuture);
-
-   ExecutionGraph graph = Mockito.mock(ExecutionGraph.class);
-   Mockito.when(graph.getState()).thenReturn(JobStatus.RUNNING);
-
-   // Same Thread execution context
-   Mockito.when(graph.getFutureExecutor()).thenReturn(new 
Executor() {
-
-   @Override
-   public void execute(Runnable runnable) {
-   runnable.run();
-   }
-   });
-
-   ExecutionVertex[] taskVertices = new ExecutionVertex[4];
-
-   ExecutionJobVertex jobVertex = 
Mockito.mock(ExecutionJobVertex.class);
-   Mockito.when(jobVertex.getJobId()).thenReturn(new JobID());
-   Mockito.when(jobVertex.getJobVertexId()).thenReturn(new 
JobVertexID());
-   Mockito.when(jobVertex.getGraph()).thenReturn(graph);
-   
Mockito.when(jobVertex.getTaskVertices()).thenReturn(taskVertices);
-
-   taskVertices[0] = mockExecutionVertex(jobVertex, 0);
-   taskVertices[1] = mockExecutionVertex(jobVertex, 1);
-   taskVertices[2] = mockExecutionVertex(jobVertex, 2);
-   taskVertices[3] = mockExecutionVertex(jobVertex, 3);
-
-   int numSamples = 100;
-   Time delayBetweenSamples = Time.milliseconds(100L);
-
-   BackPressureStatsTrackerImpl tracker = new 
BackPressureStatsTrackerImpl(
-   sampleCoordinator, , numSamples, 
Integer.MAX_VALUE, delayBetweenSamples);
-
-   // getOperatorBackPressureStats triggers stack trace sampling
-   
Assert.assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
-
-   Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
-   Matchers.eq(taskVertices),
-   Matchers.eq(numSamples),
-   Matchers.eq(delayBetweenSamples),
-   
Matchers.eq(BackPressureStatsTrackerImpl.MAX_STACK_TRACE_DEPTH));
-
-   // Request back pressure stats again. This should not trigger 
another sample request
-   
Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent());
-
-   Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
-   Matchers.eq(taskVertices),
-   Matchers.eq(numSamples),
- 

[GitHub] [flink] docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-10 Thread GitBox
docete commented on a change in pull request #10105: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10105#discussion_r344581143
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
 ##
 @@ -370,8 +373,56 @@ object GenerateUtils {
 generateNonNullLiteral(literalType, literalValue.toString, 
literalValue)
 
   case TIMESTAMP_WITHOUT_TIME_ZONE =>
-val millis = literalValue.asInstanceOf[Long]
-generateNonNullLiteral(literalType, millis + "L", millis)
+def getNanoOfMillisSinceEpoch(timestampString: TimestampString): Int = 
{
+  val v = timestampString.toString()
+  val length = v.length
+  val nanoOfSeconds = length match {
+case 19 | 20 => 0
+case _ =>
+  JInteger.valueOf(v.substring(20)) * pow(10, 9 - (length - 
20)).intValue()
+  }
+  nanoOfSeconds % 100
+}
+
+// TODO: we copied the logical of TimestampString::getMillisSinceEpoch 
since the copied
+//  DateTimeUtils.ymdToJulian is wrong.
 
 Review comment:
   [FLINK-11935](https://issues.apache.org/jira/browse/FLINK-11935) should do 
this, and after that this copied code could be removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344581031
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplTest.java
 ##
 @@ -18,166 +18,218 @@
 
 package org.apache.flink.runtime.rest.handler.legacy.backpressure;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.Mockito;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
 /**
- * Tests for the BackPressureStatsTrackerImpl.
+ * Tests for the {@link BackPressureStatsTrackerImpl}.
  */
 public class BackPressureStatsTrackerImplTest extends TestLogger {
 
-   /** Tests simple statistics with fake stack traces. */
@Test
-   @SuppressWarnings("unchecked")
-   public void testTriggerStackTraceSample() throws Exception {
-   CompletableFuture sampleFuture = new 
CompletableFuture<>();
-
-   StackTraceSampleCoordinator sampleCoordinator = 
Mockito.mock(StackTraceSampleCoordinator.class);
-   Mockito.when(sampleCoordinator.triggerStackTraceSample(
-   Matchers.any(ExecutionVertex[].class),
-   Matchers.anyInt(),
-   Matchers.any(Time.class),
-   Matchers.anyInt())).thenReturn(sampleFuture);
-
-   ExecutionGraph graph = Mockito.mock(ExecutionGraph.class);
-   Mockito.when(graph.getState()).thenReturn(JobStatus.RUNNING);
-
-   // Same Thread execution context
-   Mockito.when(graph.getFutureExecutor()).thenReturn(new 
Executor() {
-
-   @Override
-   public void execute(Runnable runnable) {
-   runnable.run();
-   }
-   });
-
-   ExecutionVertex[] taskVertices = new ExecutionVertex[4];
-
-   ExecutionJobVertex jobVertex = 
Mockito.mock(ExecutionJobVertex.class);
-   Mockito.when(jobVertex.getJobId()).thenReturn(new JobID());
-   Mockito.when(jobVertex.getJobVertexId()).thenReturn(new 
JobVertexID());
-   Mockito.when(jobVertex.getGraph()).thenReturn(graph);
-   
Mockito.when(jobVertex.getTaskVertices()).thenReturn(taskVertices);
-
-   taskVertices[0] = mockExecutionVertex(jobVertex, 0);
-   taskVertices[1] = mockExecutionVertex(jobVertex, 1);
-   taskVertices[2] = mockExecutionVertex(jobVertex, 2);
-   taskVertices[3] = mockExecutionVertex(jobVertex, 3);
-
-   int numSamples = 100;
-   Time delayBetweenSamples = Time.milliseconds(100L);
-
-   BackPressureStatsTrackerImpl tracker = new 
BackPressureStatsTrackerImpl(
-   sampleCoordinator, , numSamples, 
Integer.MAX_VALUE, delayBetweenSamples);
-
-   // getOperatorBackPressureStats triggers stack trace sampling
-   
Assert.assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
-
-   Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
-   Matchers.eq(taskVertices),
-   Matchers.eq(numSamples),
-   Matchers.eq(delayBetweenSamples),
-   
Matchers.eq(BackPressureStatsTrackerImpl.MAX_STACK_TRACE_DEPTH));
-
-   // Request back pressure stats again. This should not trigger 
another sample request
-   
Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent());
-
-   Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
-   Matchers.eq(taskVertices),
-   Matchers.eq(numSamples),
- 

[GitHub] [flink] docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-10 Thread GitBox
docete commented on a change in pull request #10105: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10105#discussion_r344580399
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
 ##
 @@ -370,8 +373,56 @@ object GenerateUtils {
 generateNonNullLiteral(literalType, literalValue.toString, 
literalValue)
 
   case TIMESTAMP_WITHOUT_TIME_ZONE =>
-val millis = literalValue.asInstanceOf[Long]
-generateNonNullLiteral(literalType, millis + "L", millis)
+def getNanoOfMillisSinceEpoch(timestampString: TimestampString): Int = 
{
+  val v = timestampString.toString()
+  val length = v.length
+  val nanoOfSeconds = length match {
+case 19 | 20 => 0
+case _ =>
+  JInteger.valueOf(v.substring(20)) * pow(10, 9 - (length - 
20)).intValue()
+  }
+  nanoOfSeconds % 100
+}
+
+// TODO: we copied the logical of TimestampString::getMillisSinceEpoch 
since the copied
+//  DateTimeUtils.ymdToJulian is wrong.
 
 Review comment:
   Two reasons to not fixing our copied `DateTimeUtils` in this PR
   1) two copies of `DateTimeUtils` should remain the same in legacy planner 
and blink planner
   2) and the impact should be evaluated for both planner


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344580223
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplTest.java
 ##
 @@ -18,166 +18,218 @@
 
 package org.apache.flink.runtime.rest.handler.legacy.backpressure;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.Mockito;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
 /**
- * Tests for the BackPressureStatsTrackerImpl.
+ * Tests for the {@link BackPressureStatsTrackerImpl}.
  */
 public class BackPressureStatsTrackerImplTest extends TestLogger {
 
-   /** Tests simple statistics with fake stack traces. */
@Test
-   @SuppressWarnings("unchecked")
-   public void testTriggerStackTraceSample() throws Exception {
-   CompletableFuture sampleFuture = new 
CompletableFuture<>();
-
-   StackTraceSampleCoordinator sampleCoordinator = 
Mockito.mock(StackTraceSampleCoordinator.class);
-   Mockito.when(sampleCoordinator.triggerStackTraceSample(
-   Matchers.any(ExecutionVertex[].class),
-   Matchers.anyInt(),
-   Matchers.any(Time.class),
-   Matchers.anyInt())).thenReturn(sampleFuture);
-
-   ExecutionGraph graph = Mockito.mock(ExecutionGraph.class);
-   Mockito.when(graph.getState()).thenReturn(JobStatus.RUNNING);
-
-   // Same Thread execution context
-   Mockito.when(graph.getFutureExecutor()).thenReturn(new 
Executor() {
-
-   @Override
-   public void execute(Runnable runnable) {
-   runnable.run();
-   }
-   });
-
-   ExecutionVertex[] taskVertices = new ExecutionVertex[4];
-
-   ExecutionJobVertex jobVertex = 
Mockito.mock(ExecutionJobVertex.class);
-   Mockito.when(jobVertex.getJobId()).thenReturn(new JobID());
-   Mockito.when(jobVertex.getJobVertexId()).thenReturn(new 
JobVertexID());
-   Mockito.when(jobVertex.getGraph()).thenReturn(graph);
-   
Mockito.when(jobVertex.getTaskVertices()).thenReturn(taskVertices);
-
-   taskVertices[0] = mockExecutionVertex(jobVertex, 0);
-   taskVertices[1] = mockExecutionVertex(jobVertex, 1);
-   taskVertices[2] = mockExecutionVertex(jobVertex, 2);
-   taskVertices[3] = mockExecutionVertex(jobVertex, 3);
-
-   int numSamples = 100;
-   Time delayBetweenSamples = Time.milliseconds(100L);
-
-   BackPressureStatsTrackerImpl tracker = new 
BackPressureStatsTrackerImpl(
-   sampleCoordinator, , numSamples, 
Integer.MAX_VALUE, delayBetweenSamples);
-
-   // getOperatorBackPressureStats triggers stack trace sampling
-   
Assert.assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
-
-   Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
-   Matchers.eq(taskVertices),
-   Matchers.eq(numSamples),
-   Matchers.eq(delayBetweenSamples),
-   
Matchers.eq(BackPressureStatsTrackerImpl.MAX_STACK_TRACE_DEPTH));
-
-   // Request back pressure stats again. This should not trigger 
another sample request
-   
Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent());
-
-   Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
-   Matchers.eq(taskVertices),
-   Matchers.eq(numSamples),
- 

[jira] [Created] (FLINK-14699) Move ClosureCleaner to flink-core

2019-11-10 Thread Zili Chen (Jira)
Zili Chen created FLINK-14699:
-

 Summary: Move ClosureCleaner to flink-core
 Key: FLINK-14699
 URL: https://issues.apache.org/jira/browse/FLINK-14699
 Project: Flink
  Issue Type: Improvement
Reporter: Zili Chen
 Fix For: 1.10.0


{{ClosureCleaner}} is currently under {{flink-java}}. However, it doesn't stick 
to {{flink-java}} and used in {{flink-streaming-java}}. IMHO 
{{flink-streaming-java}} should not base on {{flink-java}}( 
{{flink-batch-java}} in fact ). Thus, I propose to move {{ClosureCleaner}} to 
{{flink-core}}.

CC [~chesnay] [~aljoscha]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] TisonKun commented on a change in pull request #10143: [FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main thread

2019-11-10 Thread GitBox
TisonKun commented on a change in pull request #10143: [FLINK-13184]Starting a 
TaskExecutor blocks the YarnResourceManager's main thread
URL: https://github.com/apache/flink/pull/10143#discussion_r344577356
 
 

 ##
 File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
 ##
 @@ -641,4 +610,32 @@ static void require(boolean condition, String message, 
Object... values) {
}
}
 
+   /**
+* Get dynamic properties based on two Flink configuration. If base 
config does not contain and target config
+* contains the key or the value is different, it should be added to 
results. Otherwise, if the base config contains
+* and target config does not contain the key, it will be ignored.
+* @param baseConfig The base configuration.
+* @param targetConfig The target configuration.
+* @return Dynamic properties as string, separated by space.
+*/
+   static String getDynamicProperties(
+   org.apache.flink.configuration.Configuration baseConfig,
+   org.apache.flink.configuration.Configuration targetConfig) {
+
+   String[] newAddedConfigs = 
targetConfig.keySet().stream().flatMap(
+   (String key) -> {
+   final String baseValue = 
baseConfig.getString(ConfigOptions.key(key).stringType().noDefaultValue());
+   final String targetValue = 
targetConfig.getString(ConfigOptions.key(key).stringType().noDefaultValue());
+
+   if (!baseConfig.keySet().contains(key) || 
!baseValue.equals(targetValue)) {
+   return Stream.of("-" + 
CommandLineOptions.DYNAMIC_PROPERTY_OPTION.getOpt() + key +
+   
CommandLineOptions.DYNAMIC_PROPERTY_OPTION.getValueSeparator() + targetValue);
+   } else {
+   return Stream.empty();
+   }
+   })
+   .toArray(String[]::new);
+   return 
org.apache.commons.lang3.StringUtils.join(newAddedConfigs, " ");
 
 Review comment:
   I'm not sure whether `java.lang.String.join` has the same semantic of 
`o.a.c.lang3.StringUtils.join`, but if it is not so different, I tend to use 
`java.lang.String.join` :-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10146: [FLINK-14188][runtime] TaskExecutor derive and register with default slot resource profile

2019-11-10 Thread GitBox
flinkbot edited a comment on issue #10146: [FLINK-14188][runtime] TaskExecutor 
derive and register with default slot resource profile
URL: https://github.com/apache/flink/pull/10146#issuecomment-552307115
 
 
   
   ## CI report:
   
   * 25f9e4b87846e5a736aa329c834f82962e1f50c4 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/135875925)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-10 Thread GitBox
KurtYoung commented on a change in pull request #10105: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10105#discussion_r344573790
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
 ##
 @@ -37,6 +37,17 @@ import java.util.{Locale, TimeZone}
 
 class TemporalTypesTest extends ExpressionTestBase {
 
 Review comment:
   That would be great. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-10 Thread GitBox
KurtYoung commented on a change in pull request #10105: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10105#discussion_r344573698
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ArrayTypeTest.scala
 ##
 @@ -94,6 +96,48 @@ class ArrayTypeTest extends ArrayTypeTestBase {
   "ARRAY[TIMESTAMP '1985-04-11 14:15:16', TIMESTAMP '2018-07-26 
17:18:19']",
   "[1985-04-11 14:15:16.000, 2018-07-26 17:18:19.000]")
 
+// localDateTime use DateTimeUtils.timestampStringToUnixDate to parse a 
time string,
+// which only support millisecond's precision.
+testTableApi(
+  Array(
+JLocalDateTime.of(1985, 4, 11, 14, 15, 16, 123456789),
+JLocalDateTime.of(2018, 7, 26, 17, 18, 19, 123456789)),
+"[1985-04-11T14:15:16.123456789, 2018-07-26T17:18:19.123456789]")
+
+testTableApi(
+  Array(
+JLocalDateTime.of(1985, 4, 11, 14, 15, 16, 123456700),
+JLocalDateTime.of(2018, 7, 26, 17, 18, 19, 123456700)),
+  "[1985-04-11T14:15:16.123456700, 2018-07-26T17:18:19.123456700]")
+
+testTableApi(
+  Array(
+JLocalDateTime.of(1985, 4, 11, 14, 15, 16, 123456000),
+JLocalDateTime.of(2018, 7, 26, 17, 18, 19, 123456000)),
+  "[1985-04-11T14:15:16.123456, 2018-07-26T17:18:19.123456]")
+
+testTableApi(
+  Array(
+JLocalDateTime.of(1985, 4, 11, 14, 15, 16, 12340),
+JLocalDateTime.of(2018, 7, 26, 17, 18, 19, 12340)),
+  "[1985-04-11T14:15:16.123400, 2018-07-26T17:18:19.123400]")
+
+testSqlApi(
+  "ARRAY[TIMESTAMP '1985-04-11 14:15:16.123456789', TIMESTAMP '2018-07-26 
17:18:19.123456789']",
+  "[1985-04-11T14:15:16.123456789, 2018-07-26T17:18:19.123456789]")
+
+testSqlApi(
+  "ARRAY[TIMESTAMP '1985-04-11 14:15:16.1234567', TIMESTAMP '2018-07-26 
17:18:19.1234567']",
+  "[1985-04-11T14:15:16.123456700, 2018-07-26T17:18:19.123456700]")
 
 Review comment:
   Can you check whether this fits the sql standard?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-10 Thread GitBox
KurtYoung commented on a change in pull request #10105: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10105#discussion_r344573331
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
 ##
 @@ -174,8 +174,8 @@ object ScalarOperatorGens {
   (l, r) => s"$l $op ((int) ($r / ${MILLIS_PER_DAY}L))"
 }
   case TIMESTAMP_WITHOUT_TIME_ZONE =>
-generateOperatorIfNotNull(ctx, new TimestampType(), left, right) {
-  (l, r) => s"($l * ${MILLIS_PER_DAY}L) $op $r"
+generateOperatorIfNotNull(ctx, new TimestampType(3), left, right) {
 
 Review comment:
   I'm not sure this is the right way to go. We either don't change it and 
leave it to next issue, or we make it right in this PR. We don't encourage to 
do some wrong modification and say you will fix it in following jira. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10106: [FLINK-11463][e2e] Design the e2e java framework so that at least the Kafka streaming tests can run on it

2019-11-10 Thread GitBox
flinkbot edited a comment on issue #10106: [FLINK-11463][e2e] Design the e2e 
java framework so that at least the Kafka streaming tests can run on it
URL: https://github.com/apache/flink/pull/10106#issuecomment-550309075
 
 
   
   ## CI report:
   
   * 9817ef9c8be75fd02ccef3825e706497f3e6c1b6 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/135261385)
   * 48033e52f0aa852d03c351319d464dedab3c8088 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/135397835)
   * 75bf191da0a22986a3877236a0f4be9dbe2e8606 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/135874115)
   * 676cae9dbfa13d89e1618655da7b67cba8b647d4 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/135875904)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-10 Thread GitBox
KurtYoung commented on a change in pull request #10105: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10105#discussion_r344572884
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
 ##
 @@ -370,8 +373,56 @@ object GenerateUtils {
 generateNonNullLiteral(literalType, literalValue.toString, 
literalValue)
 
   case TIMESTAMP_WITHOUT_TIME_ZONE =>
-val millis = literalValue.asInstanceOf[Long]
-generateNonNullLiteral(literalType, millis + "L", millis)
+def getNanoOfMillisSinceEpoch(timestampString: TimestampString): Int = 
{
+  val v = timestampString.toString()
+  val length = v.length
+  val nanoOfSeconds = length match {
+case 19 | 20 => 0
+case _ =>
+  JInteger.valueOf(v.substring(20)) * pow(10, 9 - (length - 
20)).intValue()
+  }
+  nanoOfSeconds % 100
+}
+
+// TODO: we copied the logical of TimestampString::getMillisSinceEpoch 
since the copied
+//  DateTimeUtils.ymdToJulian is wrong.
 
 Review comment:
   So how about fixing it in our copied `DateTimeUtils`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-10 Thread GitBox
KurtYoung commented on a change in pull request #10105: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10105#discussion_r344572768
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeSystem.scala
 ##
 @@ -53,6 +53,9 @@ class FlinkTypeSystem extends RelDataTypeSystemImpl {
 case SqlTypeName.VARCHAR | SqlTypeName.CHAR | SqlTypeName.VARBINARY | 
SqlTypeName.BINARY =>
   Int.MaxValue
 
+// The maximal precision of TIMESTAMP is 3, change it to 9 to support 
nanoseconds precision
+case SqlTypeName.TIMESTAMP => 9
 
 Review comment:
   What is the impact of changing it to 9?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-14698) Refactor the SQL CLI parser to reuse flink-sql-parser

2019-11-10 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-14698.
--
Fix Version/s: (was: 1.10.0)
   Resolution: Duplicate

> Refactor the SQL CLI parser to reuse flink-sql-parser
> -
>
> Key: FLINK-14698
> URL: https://issues.apache.org/jira/browse/FLINK-14698
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.9.1
>Reporter: Danny Chen
>Priority: Major
>
> We should reuse the SQL CLI for commands parsing, especially for those 
> statements that are sql queries. There are at lease 2 benefits i can see:
> # To reduce the bugs because the parsing work by regex expression now is very 
> easy to encounter that for complex queries
> # To reduce the redundant parse work, we only need to maintain the 
> flink-sql-parser



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14698) Refactor the SQL CLI parser to reuse flink-sql-parser

2019-11-10 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16971335#comment-16971335
 ] 

Kurt Young commented on FLINK-14698:


I've already create this one: https://issues.apache.org/jira/browse/FLINK-14671

> Refactor the SQL CLI parser to reuse flink-sql-parser
> -
>
> Key: FLINK-14698
> URL: https://issues.apache.org/jira/browse/FLINK-14698
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.9.1
>Reporter: Danny Chen
>Priority: Major
> Fix For: 1.10.0
>
>
> We should reuse the SQL CLI for commands parsing, especially for those 
> statements that are sql queries. There are at lease 2 benefits i can see:
> # To reduce the bugs because the parsing work by regex expression now is very 
> easy to encounter that for complex queries
> # To reduce the redundant parse work, we only need to maintain the 
> flink-sql-parser



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-14666) support multiple catalog in flink table sql

2019-11-10 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-14666.
--
Fix Version/s: 1.10.0
   Resolution: Not A Problem

> support multiple catalog in flink table sql
> ---
>
> Key: FLINK-14666
> URL: https://issues.apache.org/jira/browse/FLINK-14666
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.8.0, 1.8.2, 1.9.0, 1.9.1
>Reporter: yuemeng
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> currently, calcite will only use the current catalog as schema path to 
> validate sql node,
> maybe this is not reasonable
> {code}
> tableEnvironment.useCatalog("user_catalog");
> tableEnvironment.useDatabase("user_db");
>  Table table = tableEnvironment.sqlQuery("SELECT action, os,count(*) as cnt 
> from music_queue_3 group by action, os,tumble(proctime, INTERVAL '10' 
> SECOND)"); tableEnvironment.registerTable("v1", table);
> Table t2 = tableEnvironment.sqlQuery("select action, os, 1 as cnt from v1");
> tableEnvironment.registerTable("v2", t2);
> tableEnvironment.sqlUpdate("INSERT INTO database2.kafka_table_test1 SELECT 
> action, os,cast (cnt as BIGINT) as cnt from v2");
> {code}
> suppose source table music_queue_3  and sink table kafka_table_test1 both in 
> user_catalog
>  catalog 
>  but some temp table or view such as v1, v2,v3 will register in default 
> catalog.
> when we select temp table v2 and insert it into our own catalog table 
> database2.kafka_table_test1 
> it always failed with sql node validate, because of schema path in
> catalog reader is the current catalog without default catalog,the temp table 
> or view will never be Identified



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (FLINK-14666) support multiple catalog in flink table sql

2019-11-10 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young reopened FLINK-14666:


> support multiple catalog in flink table sql
> ---
>
> Key: FLINK-14666
> URL: https://issues.apache.org/jira/browse/FLINK-14666
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.8.0, 1.8.2, 1.9.0, 1.9.1
>Reporter: yuemeng
>Priority: Critical
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> currently, calcite will only use the current catalog as schema path to 
> validate sql node,
> maybe this is not reasonable
> {code}
> tableEnvironment.useCatalog("user_catalog");
> tableEnvironment.useDatabase("user_db");
>  Table table = tableEnvironment.sqlQuery("SELECT action, os,count(*) as cnt 
> from music_queue_3 group by action, os,tumble(proctime, INTERVAL '10' 
> SECOND)"); tableEnvironment.registerTable("v1", table);
> Table t2 = tableEnvironment.sqlQuery("select action, os, 1 as cnt from v1");
> tableEnvironment.registerTable("v2", t2);
> tableEnvironment.sqlUpdate("INSERT INTO database2.kafka_table_test1 SELECT 
> action, os,cast (cnt as BIGINT) as cnt from v2");
> {code}
> suppose source table music_queue_3  and sink table kafka_table_test1 both in 
> user_catalog
>  catalog 
>  but some temp table or view such as v1, v2,v3 will register in default 
> catalog.
> when we select temp table v2 and insert it into our own catalog table 
> database2.kafka_table_test1 
> it always failed with sql node validate, because of schema path in
> catalog reader is the current catalog without default catalog,the temp table 
> or view will never be Identified



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] TisonKun commented on a change in pull request #10143: [FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main thread

2019-11-10 Thread GitBox
TisonKun commented on a change in pull request #10143: [FLINK-13184]Starting a 
TaskExecutor blocks the YarnResourceManager's main thread
URL: https://github.com/apache/flink/pull/10143#discussion_r344571496
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 ##
 @@ -381,7 +381,8 @@ public static String getTaskManagerShellCommand(
boolean hasLogback,
boolean hasLog4j,
boolean hasKrb5,
-   Class mainClass) {
+   Class mainClass,
 
 Review comment:
   I think BootstrapTools is an internal class so that we don't stick to it, 
but it makes sense to add an override method for preventing some compile issue. 
IIRC it was reported in classes like CloseClosure and CheckpointCoordinator. 
Also an override makes it smooth for forks to pick think commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun commented on a change in pull request #10143: [FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main thread

2019-11-10 Thread GitBox
TisonKun commented on a change in pull request #10143: [FLINK-13184]Starting a 
TaskExecutor blocks the YarnResourceManager's main thread
URL: https://github.com/apache/flink/pull/10143#discussion_r344571496
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 ##
 @@ -381,7 +381,8 @@ public static String getTaskManagerShellCommand(
boolean hasLogback,
boolean hasLog4j,
boolean hasKrb5,
-   Class mainClass) {
+   Class mainClass,
 
 Review comment:
   I think BootstrapTools is an internal class so that we don't stick to it, 
but it makes sense to add an override method for preventing some compile issue. 
IIRC it was reported in classes like CloseClosure and CheckpointCoordinator. 
Also an override makes it smooth for forks to pick this commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #10146: [FLINK-14188][runtime] TaskExecutor derive and register with default slot resource profile

2019-11-10 Thread GitBox
flinkbot commented on issue #10146: [FLINK-14188][runtime] TaskExecutor derive 
and register with default slot resource profile
URL: https://github.com/apache/flink/pull/10146#issuecomment-552307115
 
 
   
   ## CI report:
   
   * 25f9e4b87846e5a736aa329c834f82962e1f50c4 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10106: [FLINK-11463][e2e] Design the e2e java framework so that at least the Kafka streaming tests can run on it

2019-11-10 Thread GitBox
flinkbot edited a comment on issue #10106: [FLINK-11463][e2e] Design the e2e 
java framework so that at least the Kafka streaming tests can run on it
URL: https://github.com/apache/flink/pull/10106#issuecomment-550309075
 
 
   
   ## CI report:
   
   * 9817ef9c8be75fd02ccef3825e706497f3e6c1b6 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/135261385)
   * 48033e52f0aa852d03c351319d464dedab3c8088 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/135397835)
   * 75bf191da0a22986a3877236a0f4be9dbe2e8606 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/135874115)
   * 676cae9dbfa13d89e1618655da7b67cba8b647d4 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on issue #10112: [FLINK-14640] Change Type of Field currentExecutions from ConcurrentHashMap to HashMap

2019-11-10 Thread GitBox
yanghua commented on issue #10112: [FLINK-14640] Change Type of Field 
currentExecutions from ConcurrentHashMap to HashMap
URL: https://github.com/apache/flink/pull/10112#issuecomment-552303110
 
 
   @GJL I have modified the test based on your suggestion. WDYT about the new 
changes?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #10146: [FLINK-14188][runtime] TaskExecutor derive and register with default slot resource profile

2019-11-10 Thread GitBox
flinkbot commented on issue #10146: [FLINK-14188][runtime] TaskExecutor derive 
and register with default slot resource profile
URL: https://github.com/apache/flink/pull/10146#issuecomment-552302004
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 25f9e4b87846e5a736aa329c834f82962e1f50c4 (Mon Nov 11 
05:46:03 UTC 2019)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-14188) TaskExecutor derive and register with default slot resource profile

2019-11-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-14188:
---
Labels: pull-request-available  (was: )

> TaskExecutor derive and register with default slot resource profile
> ---
>
> Key: FLINK-14188
> URL: https://issues.apache.org/jira/browse/FLINK-14188
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Xintong Song
>Assignee: Xintong Song
>Priority: Major
>  Labels: pull-request-available
>
> * Introduce config option for defaultSlotFraction
>  * Derive default slot resource profile from the new config option, or the 
> legacy config option "taskmanager.numberOfTaskSlots".
>  * Register task executor with the default slot resource profile.
> This step should not introduce any behavior changes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] xintongsong opened a new pull request #10146: [FLINK-14188][runtime] TaskExecutor derive and register with default slot resource profile

2019-11-10 Thread GitBox
xintongsong opened a new pull request #10146: [FLINK-14188][runtime] 
TaskExecutor derive and register with default slot resource profile
URL: https://github.com/apache/flink/pull/10146
 
 
   ## What is the purpose of the change
   
   This PR is a subtask of FLIP-56. It makes task executor to derive the 
default slot resource profile, and register it to the resource manager.
   
   This PR is based on #9910 and #10141.
   
   ## Brief change log
   
   - 
8f633ca0096ea14ff885025ab6e62115e5529992..88f393794eb6a39c05bc520589fc4ddc2b894d6b:
 Commits of previous PRs.
   - e661a3e4bb0dafc3d5c96850e6005460851a1ca2: Introduce config option for 
default slot resource fraction.
   - fd1f894e5356184d8371bb9b4512b6d8fd6b7a68: Set container cpu cores into 
`TaskExecutorResourceSpec`. This will be used for deriving cpu cores of the 
default slot resource profiles.
   - 96cb75d13001c6d53861853dc6a1cc18b4d6bb31: Task executor derive default 
slot resource profile.
   - 25f9e4b87846e5a736aa329c834f82962e1f50c4: Task executor register default 
slot resource profile to RM.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - `TaskExecutorResourceUtilsTest#testConfigCpuCores`
   - `TaskExecutorResourceUtilsTest#testConfigNoCpuCores`
   - `TaskExecutorResourceUtilsTest#testConfigDefaultSlotFraction`
   - `TaskExecutorResourceUtilsTest#testConfigDefaultSlotFractionLegacyNumSlots`
   - `TaskExecutorResourceUtilsTest#testConfigDefaultSlotFractionFailure`
   - `TaskExecutorTest#testRegisterWithDefaultSlotResourceProfile`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10145: [FLINK-14673][hive] Shouldn't expect HMS client to throw NoSuchObject…

2019-11-10 Thread GitBox
flinkbot edited a comment on issue #10145: [FLINK-14673][hive] Shouldn't expect 
HMS client to throw NoSuchObject…
URL: https://github.com/apache/flink/pull/10145#issuecomment-552287128
 
 
   
   ## CI report:
   
   * fa9222c68fa3dd623180c52f5b769ed7e733676e : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/135869245)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-14698) Refactor the SQL CLI parser to reuse flink-sql-parser

2019-11-10 Thread Danny Chen (Jira)
Danny Chen created FLINK-14698:
--

 Summary: Refactor the SQL CLI parser to reuse flink-sql-parser
 Key: FLINK-14698
 URL: https://issues.apache.org/jira/browse/FLINK-14698
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.9.1
Reporter: Danny Chen
 Fix For: 1.10.0


We should reuse the SQL CLI for commands parsing, especially for those 
statements that are sql queries. There are at lease 2 benefits i can see:
# To reduce the bugs because the parsing work by regex expression now is very 
easy to encounter that for complex queries
# To reduce the redundant parse work, we only need to maintain the 
flink-sql-parser



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #10106: [FLINK-11463][e2e] Design the e2e java framework so that at least the Kafka streaming tests can run on it

2019-11-10 Thread GitBox
flinkbot edited a comment on issue #10106: [FLINK-11463][e2e] Design the e2e 
java framework so that at least the Kafka streaming tests can run on it
URL: https://github.com/apache/flink/pull/10106#issuecomment-550309075
 
 
   
   ## CI report:
   
   * 9817ef9c8be75fd02ccef3825e706497f3e6c1b6 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/135261385)
   * 48033e52f0aa852d03c351319d464dedab3c8088 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/135397835)
   * 75bf191da0a22986a3877236a0f4be9dbe2e8606 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on issue #10140: [FLINK-14660][sql cli] add 'SHOW MODULES' to SQL CLI

2019-11-10 Thread GitBox
lirui-apache commented on issue #10140: [FLINK-14660][sql cli] add 'SHOW 
MODULES' to SQL CLI
URL: https://github.com/apache/flink/pull/10140#issuecomment-552298911
 
 
   LGTM but the compile failure seems related.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] openinx commented on issue #10106: [FLINK-11463][e2e] Design the e2e java framework so that at least the Kafka streaming tests can run on it

2019-11-10 Thread GitBox
openinx commented on issue #10106: [FLINK-11463][e2e] Design the e2e java 
framework so that at least the Kafka streaming tests can run on it
URL: https://github.com/apache/flink/pull/10106#issuecomment-552298571
 
 
   Update the pull request: 
   1.  use the Files.walkFileTree to copy the directory of flink dist dir to 
testing directory;
   2. Made the kafka resources and tests into a separate sub-module to 
eliminate the dependency; 
   3. Remove the getRestPort ( which parse rest.port in flink-conf.yaml) .  
Will file separate issue to address the random port binding . 
   FYI @zentol , Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-14697) Connection timeout occurs while checkpointing

2019-11-10 Thread dmgkeke (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dmgkeke updated FLINK-14697:

Environment: 
Flink version : 1.8.0

Yarn cluster mode

8 nodes

16 yarn containers

checkpoint path : hdfs

  was:
Flink version : 1.8.0

Yarn cluster mode

8 nodes

16 yarn containers


> Connection timeout occurs while checkpointing
> -
>
> Key: FLINK-14697
> URL: https://issues.apache.org/jira/browse/FLINK-14697
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.8.0
> Environment: Flink version : 1.8.0
> Yarn cluster mode
> 8 nodes
> 16 yarn containers
> checkpoint path : hdfs
>Reporter: dmgkeke
>Priority: Major
>
> I am currently running a flink streaming application.
> It generally works well.
> But I have one issue.
> Intermittently connection timeout occurs on netty during a checkpoint.
> And restarting app by failover strategy
> All nodes are not busy, Network traffic is also normal.
> But I can't find a solution for this situation.
> Please let me know how to solve the problem.
> I have written the log trace below.
>  
> 
>  
> 2019-11-09 13:59:35,426 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 1412 @ 1573275575348 for job b83d95ff31d96c081f28b4d31b00c000.
> 2019-11-09 14:00:23,460 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - session-window 
> (130/512) (bfeb017b6a81a0b1c2951a39141e6d9d) switched from RUNNING to FAILED.
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: 
> 연결 시간 초과 (connection to ':')
> at 
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:165)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:87)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1401)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:953)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.handleReadException(AbstractNioByteChannel.java:125)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:174)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
> at 
> 

[jira] [Created] (FLINK-14697) Connection timeout occurs while checkpointing

2019-11-10 Thread dmgkeke (Jira)
dmgkeke created FLINK-14697:
---

 Summary: Connection timeout occurs while checkpointing
 Key: FLINK-14697
 URL: https://issues.apache.org/jira/browse/FLINK-14697
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.8.0
 Environment: Flink version : 1.8.0

Yarn cluster mode

8 nodes

16 yarn containers
Reporter: dmgkeke


I am currently running a flink streaming application.

It generally works well.

But I have one issue.

Intermittently connection timeout occurs on netty during a checkpoint.

And restarting app by failover strategy

All nodes are not busy, Network traffic is also normal.

But I can't find a solution for this situation.

Please let me know how to solve the problem.

I have written the log trace below.

 

 

2019-11-09 13:59:35,426 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 1412 @ 1573275575348 for job b83d95ff31d96c081f28b4d31b00c000.
2019-11-09 14:00:23,460 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - session-window 
(130/512) (bfeb017b6a81a0b1c2951a39141e6d9d) switched from RUNNING to FAILED.
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: 연결 
시간 초과 (connection to ':')
at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:165)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
at 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
at 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:87)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1401)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:953)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.handleReadException(AbstractNioByteChannel.java:125)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:174)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: 연결 시간 초과
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
at 

[jira] [Commented] (FLINK-14693) python tox checks fails on travis

2019-11-10 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16971309#comment-16971309
 ] 

Dian Fu commented on FLINK-14693:
-

Great thanks for reporting this issue. We have also found this issue at the 
last weekend and it seems that it's the problem of the tox. We have found some 
solutions (but not pretty sure) and will discuss with the tox community and 
then fix it.

> python tox checks fails on travis
> -
>
> Key: FLINK-14693
> URL: https://issues.apache.org/jira/browse/FLINK-14693
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: Kurt Young
>Priority: Major
>
> ImportError: cannot import name 'ensure_is_path' from 
> 'importlib_metadata._compat' 
> (/home/travis/build/apache/flink/flink-python/dev/.conda/lib/python3.7/site-packages/importlib_metadata/_compat.py)
> tox checks... [FAILED]
> see: [https://api.travis-ci.org/v3/job/609614353/log.txt]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #10145: [FLINK-14673][hive] Shouldn't expect HMS client to throw NoSuchObject…

2019-11-10 Thread GitBox
flinkbot edited a comment on issue #10145: [FLINK-14673][hive] Shouldn't expect 
HMS client to throw NoSuchObject…
URL: https://github.com/apache/flink/pull/10145#issuecomment-552287128
 
 
   
   ## CI report:
   
   * fa9222c68fa3dd623180c52f5b769ed7e733676e : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/135869245)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10112: [FLINK-14640] Change Type of Field currentExecutions from ConcurrentHashMap to HashMap

2019-11-10 Thread GitBox
flinkbot edited a comment on issue #10112: [FLINK-14640] Change Type of Field 
currentExecutions from ConcurrentHashMap to HashMap
URL: https://github.com/apache/flink/pull/10112#issuecomment-550964937
 
 
   
   ## CI report:
   
   * 13118777453ff58976b17b9aa1b7a4590ce86d5c : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/135400128)
   * 3e4caf67766a23a2d53c4fef790fa95d5ba40914 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/135865214)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun commented on issue #934: Framesize fix

2019-11-10 Thread GitBox
TisonKun commented on issue #934: Framesize fix
URL: https://github.com/apache/flink/pull/934#issuecomment-552290311
 
 
   @thorntree 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#akka-framesize


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] openinx commented on a change in pull request #10106: [FLINK-11463][e2e] Design the e2e java framework so that at least the Kafka streaming tests can run on it

2019-11-10 Thread GitBox
openinx commented on a change in pull request #10106: [FLINK-11463][e2e] Design 
the e2e java framework so that at least the Kafka streaming tests can run on it
URL: https://github.com/apache/flink/pull/10106#discussion_r344554821
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -123,20 +126,28 @@ public void afterTestSuccess() {
 
@Override
public void afterTestFailure() {
-   logBackupDir.ifPresent(backupLocation -> {
+   if (logBackupDir != null) {
final UUID id = UUID.randomUUID();
-   LOG.info("Backing up logs to {}/{}.", backupLocation, 
id);
+   LOG.info("Backing up logs to {}/{}.", logBackupDir, id);
try {
-   Files.createDirectories(backupLocation);
-   FileUtils.copyDirectory(log.toFile(), 
backupLocation.resolve(id.toString()).toFile());
+   Files.createDirectories(logBackupDir);
+   FileUtils.copyDirectory(log.toFile(), 
logBackupDir.resolve(id.toString()).toFile());
} catch (IOException e) {
LOG.warn("An error occurred while backing up 
logs.", e);
}
-   });
-
+   }
afterTestSuccess();
}
 
+   /**
+* Read the value of `rest.port` part in 
FLINK_DIST_DIR/conf/flink-conf.yaml.
+*
+* @return the rest port which standalone Flink cluster will listen.
+*/
+   public int getRestPort() {
+   return defaultConfig.getInteger("rest.port", 8081);
 
 Review comment:
   Fine,  let me revert to use 8081 firstly.  Will file a separate issue to 
improve the port management. Thanks much.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #10145: [FLINK-14673][hive] Shouldn't expect HMS client to throw NoSuchObject…

2019-11-10 Thread GitBox
flinkbot commented on issue #10145: [FLINK-14673][hive] Shouldn't expect HMS 
client to throw NoSuchObject…
URL: https://github.com/apache/flink/pull/10145#issuecomment-552287128
 
 
   
   ## CI report:
   
   * fa9222c68fa3dd623180c52f5b769ed7e733676e : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344551534
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest.java
 ##
 @@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.messages.TaskBackPressureResponse;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link BackPressureRequestCoordinator}.
+ */
+public class BackPressureRequestCoordinatorTest extends TestLogger {
+
+   private static final long requestTimeout = 1;
+   private static final double backPressureRatio = 0.5;
+
+   private static ScheduledExecutorService executorService;
+   private BackPressureRequestCoordinator coordinator;
+
+   @BeforeClass
+   public static void setUp() throws Exception {
+   executorService = new ScheduledThreadPoolExecutor(1);
+   }
+
+   @AfterClass
+   public static void tearDown() throws Exception {
+   if (executorService != null) {
+   executorService.shutdown();
+   }
+   }
+
+   @Before
+   public void initCoordinator() throws Exception {
+   coordinator = new 
BackPressureRequestCoordinator(executorService, requestTimeout);
+   }
+
+   @After
+   public void shutdownCoordinator() throws Exception {
+   if (coordinator != null) {
+   coordinator.shutDown();
+   }
+   }
+
+   /** Tests simple request of task back pressure stats. */
+   @Test(timeout = 1L)
+   public void testSuccessfulBackPressureRequest() throws Exception {
+   ExecutionVertex[] vertices = 
createExecutionVertices(ExecutionState.RUNNING, CompletionType.SUCCESSFULLY);
+
+   CompletableFuture requestFuture = 
coordinator.triggerBackPressureRequest(vertices);
+   BackPressureStats backPressureStats = requestFuture.get();
+
+   // verify the request result
+   assertEquals(0, backPressureStats.getRequestId());
+   Map backPressureRatios = 
backPressureStats.getBackPressureRatios();
+   for (ExecutionVertex executionVertex : vertices) {
+   ExecutionAttemptID executionId = 
executionVertex.getCurrentExecutionAttempt().getAttemptId();
+   assertEquals(backPressureRatio, 
backPressureRatios.get(executionId), 0.0);
+   }
+
+   // verify no more pending request
+   assertEquals(0, coordinator.getNumberOfPendingRequests());
+   }
+
+   /** Tests back pressure request of non-running tasks 

[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344550447
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest.java
 ##
 @@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.messages.TaskBackPressureResponse;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link BackPressureRequestCoordinator}.
+ */
+public class BackPressureRequestCoordinatorTest extends TestLogger {
+
+   private static final long requestTimeout = 1;
+   private static final double backPressureRatio = 0.5;
+
+   private static ScheduledExecutorService executorService;
+   private BackPressureRequestCoordinator coordinator;
+
+   @BeforeClass
+   public static void setUp() throws Exception {
+   executorService = new ScheduledThreadPoolExecutor(1);
+   }
+
+   @AfterClass
+   public static void tearDown() throws Exception {
+   if (executorService != null) {
+   executorService.shutdown();
+   }
+   }
+
+   @Before
+   public void initCoordinator() throws Exception {
+   coordinator = new 
BackPressureRequestCoordinator(executorService, requestTimeout);
+   }
+
+   @After
+   public void shutdownCoordinator() throws Exception {
+   if (coordinator != null) {
+   coordinator.shutDown();
+   }
+   }
+
+   /** Tests simple request of task back pressure stats. */
+   @Test(timeout = 1L)
+   public void testSuccessfulBackPressureRequest() throws Exception {
+   ExecutionVertex[] vertices = 
createExecutionVertices(ExecutionState.RUNNING, CompletionType.SUCCESSFULLY);
+
+   CompletableFuture requestFuture = 
coordinator.triggerBackPressureRequest(vertices);
+   BackPressureStats backPressureStats = requestFuture.get();
+
+   // verify the request result
+   assertEquals(0, backPressureStats.getRequestId());
+   Map backPressureRatios = 
backPressureStats.getBackPressureRatios();
+   for (ExecutionVertex executionVertex : vertices) {
+   ExecutionAttemptID executionId = 
executionVertex.getCurrentExecutionAttempt().getAttemptId();
+   assertEquals(backPressureRatio, 
backPressureRatios.get(executionId), 0.0);
+   }
+
+   // verify no more pending request
+   assertEquals(0, coordinator.getNumberOfPendingRequests());
+   }
+
+   /** Tests back pressure request of non-running tasks 

[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344550465
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest.java
 ##
 @@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.messages.TaskBackPressureResponse;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link BackPressureRequestCoordinator}.
+ */
+public class BackPressureRequestCoordinatorTest extends TestLogger {
+
+   private static final long requestTimeout = 1;
+   private static final double backPressureRatio = 0.5;
+
+   private static ScheduledExecutorService executorService;
+   private BackPressureRequestCoordinator coordinator;
+
+   @BeforeClass
+   public static void setUp() throws Exception {
+   executorService = new ScheduledThreadPoolExecutor(1);
+   }
+
+   @AfterClass
+   public static void tearDown() throws Exception {
+   if (executorService != null) {
+   executorService.shutdown();
+   }
+   }
+
+   @Before
+   public void initCoordinator() throws Exception {
+   coordinator = new 
BackPressureRequestCoordinator(executorService, requestTimeout);
+   }
+
+   @After
+   public void shutdownCoordinator() throws Exception {
+   if (coordinator != null) {
+   coordinator.shutDown();
+   }
+   }
+
+   /** Tests simple request of task back pressure stats. */
+   @Test(timeout = 1L)
+   public void testSuccessfulBackPressureRequest() throws Exception {
+   ExecutionVertex[] vertices = 
createExecutionVertices(ExecutionState.RUNNING, CompletionType.SUCCESSFULLY);
+
+   CompletableFuture requestFuture = 
coordinator.triggerBackPressureRequest(vertices);
+   BackPressureStats backPressureStats = requestFuture.get();
+
+   // verify the request result
+   assertEquals(0, backPressureStats.getRequestId());
+   Map backPressureRatios = 
backPressureStats.getBackPressureRatios();
+   for (ExecutionVertex executionVertex : vertices) {
+   ExecutionAttemptID executionId = 
executionVertex.getCurrentExecutionAttempt().getAttemptId();
+   assertEquals(backPressureRatio, 
backPressureRatios.get(executionId), 0.0);
+   }
+
+   // verify no more pending request
+   assertEquals(0, coordinator.getNumberOfPendingRequests());
+   }
+
+   /** Tests back pressure request of non-running tasks 

[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344550166
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest.java
 ##
 @@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.messages.TaskBackPressureResponse;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link BackPressureRequestCoordinator}.
+ */
+public class BackPressureRequestCoordinatorTest extends TestLogger {
+
+   private static final long requestTimeout = 1;
+   private static final double backPressureRatio = 0.5;
+
+   private static ScheduledExecutorService executorService;
+   private BackPressureRequestCoordinator coordinator;
+
+   @BeforeClass
+   public static void setUp() throws Exception {
+   executorService = new ScheduledThreadPoolExecutor(1);
+   }
+
+   @AfterClass
+   public static void tearDown() throws Exception {
+   if (executorService != null) {
+   executorService.shutdown();
+   }
+   }
+
+   @Before
+   public void initCoordinator() throws Exception {
+   coordinator = new 
BackPressureRequestCoordinator(executorService, requestTimeout);
+   }
+
+   @After
+   public void shutdownCoordinator() throws Exception {
+   if (coordinator != null) {
+   coordinator.shutDown();
+   }
+   }
+
+   /** Tests simple request of task back pressure stats. */
+   @Test(timeout = 1L)
 
 Review comment:
   If it is possible to make timeout as class level, no need to tag it for 
every test. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #10145: [FLINK-14673][hive] Shouldn't expect HMS client to throw NoSuchObject…

2019-11-10 Thread GitBox
flinkbot commented on issue #10145: [FLINK-14673][hive] Shouldn't expect HMS 
client to throw NoSuchObject…
URL: https://github.com/apache/flink/pull/10145#issuecomment-552283313
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit fa9222c68fa3dd623180c52f5b769ed7e733676e (Mon Nov 11 
03:53:55 UTC 2019)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on issue #10145: [FLINK-14673][hive] Shouldn't expect HMS client to throw NoSuchObject…

2019-11-10 Thread GitBox
lirui-apache commented on issue #10145: [FLINK-14673][hive] Shouldn't expect 
HMS client to throw NoSuchObject…
URL: https://github.com/apache/flink/pull/10145#issuecomment-552283323
 
 
   @xuefuz @bowenli86 This is to port the fix of FLINK-14673 to 1.9.x. Please 
take a look. Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-10 Thread GitBox
docete commented on a change in pull request #10105: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10105#discussion_r344549947
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
 ##
 @@ -191,6 +191,22 @@ public Decimal getDecimal(int pos, int precision, int 
scale) {
return Decimal.readDecimalFieldFromSegments(segments, offset, 
offsetAndSize, precision, scale);
}
 
+   @Override
+   public SqlTimestamp getTimestamp(int pos, int precision) {
+   assertIndexIsValid(pos);
+
+   if (SqlTimestamp.isCompact(precision)) {
+   return 
SqlTimestamp.fromEpochMillis(segments[0].getLong(getElementOffset(pos, 8)));
 
 Review comment:
   Good catch, it should be `SegmentsUtil.getLong(segments, 
getElementOffset(pos, 8)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-10 Thread GitBox
docete commented on a change in pull request #10105: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10105#discussion_r344549642
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
 ##
 @@ -37,6 +37,17 @@ import java.util.{Locale, TimeZone}
 
 class TemporalTypesTest extends ExpressionTestBase {
 
 Review comment:
   So divide it to TimestampTypeTest/DateTypeTest/TimeTypeTest ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache opened a new pull request #10145: [FLINK-14673][hive] Shouldn't expect HMS client to throw NoSuchObject…

2019-11-10 Thread GitBox
lirui-apache opened a new pull request #10145: [FLINK-14673][hive] Shouldn't 
expect HMS client to throw NoSuchObject…
URL: https://github.com/apache/flink/pull/10145
 
 
   …Exception for non-existing function
   
   
   
   ## What is the purpose of the change
   
   Always to check MetaException when getting function with HMS client.
   
   
   ## Brief change log
   
 - Removed shim method getFunction.
 - Moved the logic to HiveMetastoreClientWrapper.
   
   
   ## Verifying this change
   
   Existing test cases.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? NA
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-10 Thread GitBox
docete commented on a change in pull request #10105: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10105#discussion_r344549396
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
 ##
 @@ -335,7 +336,7 @@ class RexNodeToExpressionConverter(
 
   case TIMESTAMP_WITHOUT_TIME_ZONE =>
 val v = literal.getValueAs(classOf[java.lang.Long])
 
 Review comment:
   Good catch. It should be TimestampString and preserve the nanosecond 
precision.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10112: [FLINK-14640] Change Type of Field currentExecutions from ConcurrentHashMap to HashMap

2019-11-10 Thread GitBox
flinkbot edited a comment on issue #10112: [FLINK-14640] Change Type of Field 
currentExecutions from ConcurrentHashMap to HashMap
URL: https://github.com/apache/flink/pull/10112#issuecomment-550964937
 
 
   
   ## CI report:
   
   * 13118777453ff58976b17b9aa1b7a4590ce86d5c : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/135400128)
   * 3e4caf67766a23a2d53c4fef790fa95d5ba40914 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/135865214)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun commented on a change in pull request #10113: [FLINK-13749][client] Make PackagedProgram respect classloading policy

2019-11-10 Thread GitBox
TisonKun commented on a change in pull request #10113: [FLINK-13749][client] 
Make PackagedProgram respect classloading policy
URL: https://github.com/apache/flink/pull/10113#discussion_r344548623
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/classloading/jar/ClassLoadingPolicyProgram.java
 ##
 @@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.classloading.jar;
+
+import java.io.File;
+import java.net.URL;
+
+/**
+ * A simple program that verifies the classloading policy by ensuring the 
resource loaded is under the specified
+ * directory.
+ **/
+public class ClassLoadingPolicyProgram {
+
+   public static void main(String[] args) throws Exception {
+   if (args.length < 2) {
+   throw new IllegalArgumentException("Missing 
parameters");
+   }
+   String resourceName = args[0];
+   String expectedResourceDir = args[1];
+   URL url = 
Thread.currentThread().getContextClassLoader().getResource(resourceName);
+   if (url == null) {
 
 Review comment:
   `checkNotNull`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344548191
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest.java
 ##
 @@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.messages.TaskBackPressureResponse;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link BackPressureRequestCoordinator}.
+ */
+public class BackPressureRequestCoordinatorTest extends TestLogger {
+
+   private static final long requestTimeout = 1;
+   private static final double backPressureRatio = 0.5;
+
+   private static ScheduledExecutorService executorService;
+   private BackPressureRequestCoordinator coordinator;
+
+   @BeforeClass
+   public static void setUp() throws Exception {
+   executorService = new ScheduledThreadPoolExecutor(1);
+   }
+
+   @AfterClass
+   public static void tearDown() throws Exception {
+   if (executorService != null) {
+   executorService.shutdown();
+   }
+   }
+
+   @Before
+   public void initCoordinator() throws Exception {
+   coordinator = new 
BackPressureRequestCoordinator(executorService, requestTimeout);
+   }
+
+   @After
+   public void shutdownCoordinator() throws Exception {
+   if (coordinator != null) {
+   coordinator.shutDown();
+   }
+   }
+
+   /** Tests simple request of task back pressure stats. */
+   @Test(timeout = 1L)
+   public void testSuccessfulBackPressureRequest() throws Exception {
+   ExecutionVertex[] vertices = 
createExecutionVertices(ExecutionState.RUNNING, CompletionType.SUCCESSFULLY);
+
+   CompletableFuture requestFuture = 
coordinator.triggerBackPressureRequest(vertices);
+   BackPressureStats backPressureStats = requestFuture.get();
+
+   // verify the request result
+   assertEquals(0, backPressureStats.getRequestId());
+   Map backPressureRatios = 
backPressureStats.getBackPressureRatios();
+   for (ExecutionVertex executionVertex : vertices) {
+   ExecutionAttemptID executionId = 
executionVertex.getCurrentExecutionAttempt().getAttemptId();
+   assertEquals(backPressureRatio, 
backPressureRatios.get(executionId), 0.0);
+   }
+
+   // verify no more pending request
+   assertEquals(0, coordinator.getNumberOfPendingRequests());
+   }
+
+   /** Tests back pressure request of non-running tasks 

[GitHub] [flink] docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-10 Thread GitBox
docete commented on a change in pull request #10105: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10105#discussion_r344548144
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
 ##
 @@ -2184,7 +2236,20 @@ object ScalarOperatorGens {
   case TIME_WITHOUT_TIME_ZONE =>
 s"${qualifyMethod(BuiltInMethods.UNIX_TIME_TO_STRING)}($operandTerm)"
   case TIMESTAMP_WITHOUT_TIME_ZONE => // including rowtime indicator
-s"${qualifyMethod(BuiltInMethods.TIMESTAMP_TO_STRING)}($operandTerm, 
3)"
+// casting TimestampType to VARCHAR, if precision <= 3, keep the 
string representation
 
 Review comment:
   Just keep consistent with the original design. Or the behavior is changed 
from the users' side. Should we change this in this PR?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344548105
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest.java
 ##
 @@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.messages.TaskBackPressureResponse;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link BackPressureRequestCoordinator}.
+ */
+public class BackPressureRequestCoordinatorTest extends TestLogger {
+
+   private static final long requestTimeout = 1;
+   private static final double backPressureRatio = 0.5;
+
+   private static ScheduledExecutorService executorService;
+   private BackPressureRequestCoordinator coordinator;
+
+   @BeforeClass
+   public static void setUp() throws Exception {
+   executorService = new ScheduledThreadPoolExecutor(1);
+   }
+
+   @AfterClass
+   public static void tearDown() throws Exception {
+   if (executorService != null) {
+   executorService.shutdown();
+   }
+   }
+
+   @Before
+   public void initCoordinator() throws Exception {
+   coordinator = new 
BackPressureRequestCoordinator(executorService, requestTimeout);
+   }
+
+   @After
+   public void shutdownCoordinator() throws Exception {
+   if (coordinator != null) {
+   coordinator.shutDown();
+   }
+   }
+
+   /** Tests simple request of task back pressure stats. */
+   @Test(timeout = 1L)
+   public void testSuccessfulBackPressureRequest() throws Exception {
+   ExecutionVertex[] vertices = 
createExecutionVertices(ExecutionState.RUNNING, CompletionType.SUCCESSFULLY);
+
+   CompletableFuture requestFuture = 
coordinator.triggerBackPressureRequest(vertices);
+   BackPressureStats backPressureStats = requestFuture.get();
+
+   // verify the request result
+   assertEquals(0, backPressureStats.getRequestId());
+   Map backPressureRatios = 
backPressureStats.getBackPressureRatios();
+   for (ExecutionVertex executionVertex : vertices) {
+   ExecutionAttemptID executionId = 
executionVertex.getCurrentExecutionAttempt().getAttemptId();
+   assertEquals(backPressureRatio, 
backPressureRatios.get(executionId), 0.0);
+   }
+
+   // verify no more pending request
+   assertEquals(0, coordinator.getNumberOfPendingRequests());
+   }
+
+   /** Tests back pressure request of non-running tasks 

[GitHub] [flink] docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-10 Thread GitBox
docete commented on a change in pull request #10105: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10105#discussion_r344547747
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
 ##
 @@ -174,8 +174,8 @@ object ScalarOperatorGens {
   (l, r) => s"$l $op ((int) ($r / ${MILLIS_PER_DAY}L))"
 }
   case TIMESTAMP_WITHOUT_TIME_ZONE =>
-generateOperatorIfNotNull(ctx, new TimestampType(), left, right) {
-  (l, r) => s"($l * ${MILLIS_PER_DAY}L) $op $r"
+generateOperatorIfNotNull(ctx, new TimestampType(3), left, right) {
 
 Review comment:
   The original `new TimestampType()` just returns `TimestampType(3)`. And just 
keep consistent with the original implementation. I will confirm the right 
behavior with Jack and Danny Chan and Fix it in the next PR 
[FLINK-14696](https://issues.apache.org/jira/browse/FLINK-14696)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-10 Thread GitBox
docete commented on a change in pull request #10105: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10105#discussion_r344547802
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
 ##
 @@ -1073,51 +1124,51 @@ object ScalarOperatorGens {
  (INTEGER, TIMESTAMP_WITHOUT_TIME_ZONE) |
  (BIGINT, TIMESTAMP_WITHOUT_TIME_ZONE) =>
   generateUnaryOperatorIfNotNull(ctx, targetType, operand) {
-operandTerm => s"(((long) $operandTerm) * 1000)"
+operandTerm => s"$SQL_TIMESTAMP.fromEpochMillis(((long) $operandTerm) 
* 1000)"
   }
 
 // Float -> Timestamp
 // Double -> Timestamp
 case (FLOAT, TIMESTAMP_WITHOUT_TIME_ZONE) |
  (DOUBLE, TIMESTAMP_WITHOUT_TIME_ZONE) =>
   generateUnaryOperatorIfNotNull(ctx, targetType, operand) {
-operandTerm => s"((long) ($operandTerm * 1000))"
+operandTerm => s"$SQL_TIMESTAMP.fromEpochMillis((long) ($operandTerm * 
1000))"
   }
 
 // Timestamp -> Tinyint
 case (TIMESTAMP_WITHOUT_TIME_ZONE, TINYINT) =>
   generateUnaryOperatorIfNotNull(ctx, targetType, operand) {
-operandTerm => s"((byte) ($operandTerm / 1000))"
+operandTerm => s"((byte) ($operandTerm.getMillisecond() / 1000))"
   }
 
 // Timestamp -> Smallint
 case (TIMESTAMP_WITHOUT_TIME_ZONE, SMALLINT) =>
   generateUnaryOperatorIfNotNull(ctx, targetType, operand) {
-operandTerm => s"((short) ($operandTerm / 1000))"
+operandTerm => s"((short) ($operandTerm.getMillisecond() / 1000))"
   }
 
 // Timestamp -> Int
 case (TIMESTAMP_WITHOUT_TIME_ZONE, INTEGER) =>
   generateUnaryOperatorIfNotNull(ctx, targetType, operand) {
-operandTerm => s"((int) ($operandTerm / 1000))"
+operandTerm => s"((int) ($operandTerm.getMillisecond() / 1000))"
   }
 
 // Timestamp -> BigInt
 case (TIMESTAMP_WITHOUT_TIME_ZONE, BIGINT) =>
   generateUnaryOperatorIfNotNull(ctx, targetType, operand) {
-operandTerm => s"((long) ($operandTerm / 1000))"
+operandTerm => s"((long) ($operandTerm.getMillisecond() / 1000))"
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-14696) Support precision of TimestampType in built-in SQL functions and operators

2019-11-10 Thread Zhenghua Gao (Jira)
Zhenghua Gao created FLINK-14696:


 Summary: Support precision of TimestampType in built-in SQL 
functions and operators
 Key: FLINK-14696
 URL: https://issues.apache.org/jira/browse/FLINK-14696
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Zhenghua Gao


Many built-in SQL functions and operators use long as internal representation 
of Timestamp type and only support millisecond precision. This ticket will 
check fix it and let them support nanosecond precision. The related SQL 
functions and operators are: (To Be Confirmed)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344546978
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest.java
 ##
 @@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.messages.TaskBackPressureResponse;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link BackPressureRequestCoordinator}.
+ */
+public class BackPressureRequestCoordinatorTest extends TestLogger {
+
+   private static final long requestTimeout = 1;
+   private static final double backPressureRatio = 0.5;
+
+   private static ScheduledExecutorService executorService;
+   private BackPressureRequestCoordinator coordinator;
+
+   @BeforeClass
+   public static void setUp() throws Exception {
+   executorService = new ScheduledThreadPoolExecutor(1);
+   }
+
+   @AfterClass
+   public static void tearDown() throws Exception {
+   if (executorService != null) {
+   executorService.shutdown();
+   }
+   }
+
+   @Before
+   public void initCoordinator() throws Exception {
+   coordinator = new 
BackPressureRequestCoordinator(executorService, requestTimeout);
+   }
+
+   @After
+   public void shutdownCoordinator() throws Exception {
+   if (coordinator != null) {
+   coordinator.shutDown();
+   }
+   }
+
+   /** Tests simple request of task back pressure stats. */
+   @Test(timeout = 1L)
+   public void testSuccessfulBackPressureRequest() throws Exception {
+   ExecutionVertex[] vertices = 
createExecutionVertices(ExecutionState.RUNNING, CompletionType.SUCCESSFULLY);
+
+   CompletableFuture requestFuture = 
coordinator.triggerBackPressureRequest(vertices);
+   BackPressureStats backPressureStats = requestFuture.get();
+
+   // verify the request result
+   assertEquals(0, backPressureStats.getRequestId());
+   Map backPressureRatios = 
backPressureStats.getBackPressureRatios();
+   for (ExecutionVertex executionVertex : vertices) {
+   ExecutionAttemptID executionId = 
executionVertex.getCurrentExecutionAttempt().getAttemptId();
+   assertEquals(backPressureRatio, 
backPressureRatios.get(executionId), 0.0);
+   }
+
+   // verify no more pending request
+   assertEquals(0, coordinator.getNumberOfPendingRequests());
+   }
+
+   /** Tests back pressure request of non-running tasks 

[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344546570
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest.java
 ##
 @@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.messages.TaskBackPressureResponse;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link BackPressureRequestCoordinator}.
+ */
+public class BackPressureRequestCoordinatorTest extends TestLogger {
+
+   private static final long requestTimeout = 1;
+   private static final double backPressureRatio = 0.5;
+
+   private static ScheduledExecutorService executorService;
+   private BackPressureRequestCoordinator coordinator;
+
+   @BeforeClass
+   public static void setUp() throws Exception {
+   executorService = new ScheduledThreadPoolExecutor(1);
+   }
+
+   @AfterClass
+   public static void tearDown() throws Exception {
+   if (executorService != null) {
+   executorService.shutdown();
+   }
+   }
+
+   @Before
+   public void initCoordinator() throws Exception {
+   coordinator = new 
BackPressureRequestCoordinator(executorService, requestTimeout);
+   }
+
+   @After
+   public void shutdownCoordinator() throws Exception {
+   if (coordinator != null) {
+   coordinator.shutDown();
+   }
+   }
+
+   /** Tests simple request of task back pressure stats. */
+   @Test(timeout = 1L)
+   public void testSuccessfulBackPressureRequest() throws Exception {
+   ExecutionVertex[] vertices = 
createExecutionVertices(ExecutionState.RUNNING, CompletionType.SUCCESSFULLY);
+
+   CompletableFuture requestFuture = 
coordinator.triggerBackPressureRequest(vertices);
+   BackPressureStats backPressureStats = requestFuture.get();
+
+   // verify the request result
+   assertEquals(0, backPressureStats.getRequestId());
+   Map backPressureRatios = 
backPressureStats.getBackPressureRatios();
+   for (ExecutionVertex executionVertex : vertices) {
+   ExecutionAttemptID executionId = 
executionVertex.getCurrentExecutionAttempt().getAttemptId();
+   assertEquals(backPressureRatio, 
backPressureRatios.get(executionId), 0.0);
+   }
+
+   // verify no more pending request
+   assertEquals(0, coordinator.getNumberOfPendingRequests());
 
 Review comment:
   Migrate this assert into @Aftter 

[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344546173
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest.java
 ##
 @@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.messages.TaskBackPressureResponse;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link BackPressureRequestCoordinator}.
+ */
+public class BackPressureRequestCoordinatorTest extends TestLogger {
+
+   private static final long requestTimeout = 1;
+   private static final double backPressureRatio = 0.5;
+
+   private static ScheduledExecutorService executorService;
+   private BackPressureRequestCoordinator coordinator;
+
+   @BeforeClass
+   public static void setUp() throws Exception {
+   executorService = new ScheduledThreadPoolExecutor(1);
+   }
+
+   @AfterClass
+   public static void tearDown() throws Exception {
+   if (executorService != null) {
+   executorService.shutdown();
+   }
+   }
+
+   @Before
+   public void initCoordinator() throws Exception {
+   coordinator = new 
BackPressureRequestCoordinator(executorService, requestTimeout);
+   }
+
+   @After
+   public void shutdownCoordinator() throws Exception {
+   if (coordinator != null) {
+   coordinator.shutDown();
+   }
+   }
+
+   /** Tests simple request of task back pressure stats. */
+   @Test(timeout = 1L)
+   public void testSuccessfulBackPressureRequest() throws Exception {
+   ExecutionVertex[] vertices = 
createExecutionVertices(ExecutionState.RUNNING, CompletionType.SUCCESSFULLY);
+
+   CompletableFuture requestFuture = 
coordinator.triggerBackPressureRequest(vertices);
+   BackPressureStats backPressureStats = requestFuture.get();
+
+   // verify the request result
+   assertEquals(0, backPressureStats.getRequestId());
+   Map backPressureRatios = 
backPressureStats.getBackPressureRatios();
+   for (ExecutionVertex executionVertex : vertices) {
+   ExecutionAttemptID executionId = 
executionVertex.getCurrentExecutionAttempt().getAttemptId();
+   assertEquals(backPressureRatio, 
backPressureRatios.get(executionId), 0.0);
+   }
+
+   // verify no more pending request
+   assertEquals(0, coordinator.getNumberOfPendingRequests());
+   }
+
+   /** Tests back pressure request of non-running tasks 

[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344546013
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest.java
 ##
 @@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.messages.TaskBackPressureResponse;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link BackPressureRequestCoordinator}.
+ */
+public class BackPressureRequestCoordinatorTest extends TestLogger {
+
+   private static final long requestTimeout = 1;
+   private static final double backPressureRatio = 0.5;
+
+   private static ScheduledExecutorService executorService;
+   private BackPressureRequestCoordinator coordinator;
+
+   @BeforeClass
+   public static void setUp() throws Exception {
+   executorService = new ScheduledThreadPoolExecutor(1);
+   }
+
+   @AfterClass
+   public static void tearDown() throws Exception {
+   if (executorService != null) {
+   executorService.shutdown();
+   }
+   }
+
+   @Before
+   public void initCoordinator() throws Exception {
+   coordinator = new 
BackPressureRequestCoordinator(executorService, requestTimeout);
+   }
+
+   @After
+   public void shutdownCoordinator() throws Exception {
+   if (coordinator != null) {
+   coordinator.shutDown();
+   }
+   }
+
+   /** Tests simple request of task back pressure stats. */
+   @Test(timeout = 1L)
+   public void testSuccessfulBackPressureRequest() throws Exception {
+   ExecutionVertex[] vertices = 
createExecutionVertices(ExecutionState.RUNNING, CompletionType.SUCCESSFULLY);
+
+   CompletableFuture requestFuture = 
coordinator.triggerBackPressureRequest(vertices);
+   BackPressureStats backPressureStats = requestFuture.get();
+
+   // verify the request result
+   assertEquals(0, backPressureStats.getRequestId());
+   Map backPressureRatios = 
backPressureStats.getBackPressureRatios();
+   for (ExecutionVertex executionVertex : vertices) {
+   ExecutionAttemptID executionId = 
executionVertex.getCurrentExecutionAttempt().getAttemptId();
+   assertEquals(backPressureRatio, 
backPressureRatios.get(executionId), 0.0);
+   }
+
+   // verify no more pending request
+   assertEquals(0, coordinator.getNumberOfPendingRequests());
+   }
+
+   /** Tests back pressure request of non-running tasks 

[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344545951
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest.java
 ##
 @@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.messages.TaskBackPressureResponse;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link BackPressureRequestCoordinator}.
+ */
+public class BackPressureRequestCoordinatorTest extends TestLogger {
+
+   private static final long requestTimeout = 1;
+   private static final double backPressureRatio = 0.5;
+
+   private static ScheduledExecutorService executorService;
+   private BackPressureRequestCoordinator coordinator;
+
+   @BeforeClass
+   public static void setUp() throws Exception {
+   executorService = new ScheduledThreadPoolExecutor(1);
+   }
+
+   @AfterClass
+   public static void tearDown() throws Exception {
+   if (executorService != null) {
+   executorService.shutdown();
+   }
+   }
+
+   @Before
+   public void initCoordinator() throws Exception {
+   coordinator = new 
BackPressureRequestCoordinator(executorService, requestTimeout);
+   }
+
+   @After
+   public void shutdownCoordinator() throws Exception {
+   if (coordinator != null) {
+   coordinator.shutDown();
+   }
+   }
+
+   /** Tests simple request of task back pressure stats. */
+   @Test(timeout = 1L)
+   public void testSuccessfulBackPressureRequest() throws Exception {
+   ExecutionVertex[] vertices = 
createExecutionVertices(ExecutionState.RUNNING, CompletionType.SUCCESSFULLY);
+
+   CompletableFuture requestFuture = 
coordinator.triggerBackPressureRequest(vertices);
+   BackPressureStats backPressureStats = requestFuture.get();
+
+   // verify the request result
+   assertEquals(0, backPressureStats.getRequestId());
+   Map backPressureRatios = 
backPressureStats.getBackPressureRatios();
+   for (ExecutionVertex executionVertex : vertices) {
+   ExecutionAttemptID executionId = 
executionVertex.getCurrentExecutionAttempt().getAttemptId();
+   assertEquals(backPressureRatio, 
backPressureRatios.get(executionId), 0.0);
+   }
+
+   // verify no more pending request
+   assertEquals(0, coordinator.getNumberOfPendingRequests());
+   }
+
+   /** Tests back pressure request of non-running tasks 

[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344545905
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest.java
 ##
 @@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.messages.TaskBackPressureResponse;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link BackPressureRequestCoordinator}.
+ */
+public class BackPressureRequestCoordinatorTest extends TestLogger {
+
+   private static final long requestTimeout = 1;
+   private static final double backPressureRatio = 0.5;
+
+   private static ScheduledExecutorService executorService;
+   private BackPressureRequestCoordinator coordinator;
+
+   @BeforeClass
+   public static void setUp() throws Exception {
+   executorService = new ScheduledThreadPoolExecutor(1);
+   }
+
+   @AfterClass
+   public static void tearDown() throws Exception {
+   if (executorService != null) {
+   executorService.shutdown();
+   }
+   }
+
+   @Before
+   public void initCoordinator() throws Exception {
+   coordinator = new 
BackPressureRequestCoordinator(executorService, requestTimeout);
+   }
+
+   @After
+   public void shutdownCoordinator() throws Exception {
+   if (coordinator != null) {
+   coordinator.shutDown();
+   }
+   }
+
+   /** Tests simple request of task back pressure stats. */
+   @Test(timeout = 1L)
+   public void testSuccessfulBackPressureRequest() throws Exception {
+   ExecutionVertex[] vertices = 
createExecutionVertices(ExecutionState.RUNNING, CompletionType.SUCCESSFULLY);
+
+   CompletableFuture requestFuture = 
coordinator.triggerBackPressureRequest(vertices);
+   BackPressureStats backPressureStats = requestFuture.get();
+
+   // verify the request result
+   assertEquals(0, backPressureStats.getRequestId());
+   Map backPressureRatios = 
backPressureStats.getBackPressureRatios();
+   for (ExecutionVertex executionVertex : vertices) {
+   ExecutionAttemptID executionId = 
executionVertex.getCurrentExecutionAttempt().getAttemptId();
+   assertEquals(backPressureRatio, 
backPressureRatios.get(executionId), 0.0);
+   }
+
+   // verify no more pending request
+   assertEquals(0, coordinator.getNumberOfPendingRequests());
+   }
+
+   /** Tests back pressure request of non-running tasks 

[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344545581
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest.java
 ##
 @@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.messages.TaskBackPressureResponse;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link BackPressureRequestCoordinator}.
+ */
+public class BackPressureRequestCoordinatorTest extends TestLogger {
+
+   private static final long requestTimeout = 1;
+   private static final double backPressureRatio = 0.5;
+
+   private static ScheduledExecutorService executorService;
+   private BackPressureRequestCoordinator coordinator;
+
+   @BeforeClass
+   public static void setUp() throws Exception {
+   executorService = new ScheduledThreadPoolExecutor(1);
+   }
+
+   @AfterClass
+   public static void tearDown() throws Exception {
+   if (executorService != null) {
+   executorService.shutdown();
+   }
+   }
+
+   @Before
+   public void initCoordinator() throws Exception {
+   coordinator = new 
BackPressureRequestCoordinator(executorService, requestTimeout);
+   }
+
+   @After
+   public void shutdownCoordinator() throws Exception {
+   if (coordinator != null) {
+   coordinator.shutDown();
+   }
+   }
+
+   /** Tests simple request of task back pressure stats. */
 
 Review comment:
   Refactor the format of javadoc for all the methods. Should be
   /**
   *
   */
   Also this description is not very accurate, what is "simple request" mean.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344545581
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinatorTest.java
 ##
 @@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.messages.TaskBackPressureResponse;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link BackPressureRequestCoordinator}.
+ */
+public class BackPressureRequestCoordinatorTest extends TestLogger {
+
+   private static final long requestTimeout = 1;
+   private static final double backPressureRatio = 0.5;
+
+   private static ScheduledExecutorService executorService;
+   private BackPressureRequestCoordinator coordinator;
+
+   @BeforeClass
+   public static void setUp() throws Exception {
+   executorService = new ScheduledThreadPoolExecutor(1);
+   }
+
+   @AfterClass
+   public static void tearDown() throws Exception {
+   if (executorService != null) {
+   executorService.shutdown();
+   }
+   }
+
+   @Before
+   public void initCoordinator() throws Exception {
+   coordinator = new 
BackPressureRequestCoordinator(executorService, requestTimeout);
+   }
+
+   @After
+   public void shutdownCoordinator() throws Exception {
+   if (coordinator != null) {
+   coordinator.shutDown();
+   }
+   }
+
+   /** Tests simple request of task back pressure stats. */
 
 Review comment:
   Refactor the format of javadoc for all the methods. Should be
   /**
   *
   */


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10112: [FLINK-14640] Change Type of Field currentExecutions from ConcurrentHashMap to HashMap

2019-11-10 Thread GitBox
flinkbot edited a comment on issue #10112: [FLINK-14640] Change Type of Field 
currentExecutions from ConcurrentHashMap to HashMap
URL: https://github.com/apache/flink/pull/10112#issuecomment-550964937
 
 
   
   ## CI report:
   
   * 13118777453ff58976b17b9aa1b7a4590ce86d5c : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/135400128)
   * 3e4caf67766a23a2d53c4fef790fa95d5ba40914 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] qiuxiafei commented on a change in pull request #9373: [FLINK-13596][ml] Add two utils for Table transformations

2019-11-10 Thread GitBox
qiuxiafei commented on a change in pull request #9373: [FLINK-13596][ml] Add 
two utils for Table transformations
URL: https://github.com/apache/flink/pull/9373#discussion_r344544335
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/utils/DataSetConversionUtil.java
 ##
 @@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common.utils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.SingleInputUdfOperator;
+import org.apache.flink.api.java.operators.TwoInputUdfOperator;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.common.MLEnvironment;
+import org.apache.flink.ml.common.MLEnvironmentFactory;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+/**
+ * Provide functions of conversions between DataSet and Table.
+ */
+public class DataSetConversionUtil {
+   /**
+* Convert the given Table to {@link DataSet}<{@link Row}>.
+*
+* @param sessionId the sessionId of {@link MLEnvironmentFactory}
+* @param table the Table to convert.
+* @return the converted DataSet.
+*/
+   public static DataSet  fromTable(Long sessionId, Table table) {
+   return MLEnvironmentFactory
+   .get(sessionId)
+   .getBatchTableEnvironment()
+   .toDataSet(table, Row.class);
+   }
+
+   /**
+* Convert the given DataSet into a Table with specified TableSchema.
+*
+* @param sessionId the sessionId of {@link MLEnvironmentFactory}
+* @param data   the DataSet to convert.
+* @param schema the specified TableSchema.
+* @return the converted Table.
+*/
+   public static Table toTable(Long sessionId, DataSet  data, 
TableSchema schema) {
+   return toTable(sessionId, data, schema.getFieldNames(), 
schema.getFieldTypes());
+   }
+
+   /**
+* Convert the given DataSet into a Table with specified colNames and 
colTypes.
+*
+* @param sessionId sessionId the sessionId of {@link 
MLEnvironmentFactory}.
+* @param data the DataSet to convert.
+* @param colNames the specified colNames.
+* @param colTypes the specified colTypes. This variable is used only 
when the
+* DataSet is produced by a function and Flink cannot 
determine
+* automatically what the produced type is.
+* @return the converted Table.
+*/
+   public static Table toTable(Long sessionId, DataSet  data, 
String[] colNames, TypeInformation [] colTypes) {
+   return toTable(MLEnvironmentFactory.get(sessionId), data, 
colNames, colTypes);
+   }
+
+   /**
+* Convert the given DataSet into a Table with specified colNames.
+*
+* @param sessionId sessionId the sessionId of {@link 
MLEnvironmentFactory}.
+* @param data the DataSet to convert.
+* @param colNames the specified colNames.
+* @return the converted Table.
+*/
+   public static Table toTable(Long sessionId, DataSet  data, 
String[] colNames) {
+   return toTable(MLEnvironmentFactory.get(sessionId), data, 
colNames);
+   }
+
+   /**
+* Convert the given DataSet into a Table with specified colNames and 
colTypes.
+*
+* @param session the MLEnvironment using to convert DataSet to Table.
+* @param data the DataSet to convert.
+* @param colNames the specified colNames.
+* @param colTypes the specified colTypes. This variable is used only 
when the
+* DataSet is produced by a function and Flink cannot 
determine
+* automatically what the produced type is.
+* @return the converted Table.
+*/
+   public static Table toTable(MLEnvironment session, DataSet  

[GitHub] [flink] thorntree commented on issue #934: Framesize fix

2019-11-10 Thread GitBox
thorntree commented on issue #934: Framesize fix
URL: https://github.com/apache/flink/pull/934#issuecomment-552276083
 
 
@kl0u ,hi kl0u.can you tell me how increase akka.framesize? I don't know 
how config akka.framesize.Thanks! 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14674) some tpc-ds query hang in scheduled stage for long time

2019-11-10 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16971288#comment-16971288
 ] 

Leonard Xu commented on FLINK-14674:


[~lzljs3620320] I think [~ykt836] 's analysis is right, in local machine,I  set 
taskmanager.numberOfTaskSlots as 4,
taskmanager.heap.size as  2g, 
parallelism.default: 2
so that TM  have about 1.26g managed memory available, but operator apply 
"managedMemoryInMB=768"  which can not fillfull in  this parallelism setting.
{code:java}
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 
bfed28d939e924b4b66030bc70c38185)
at 
org.apache.flink.client.ClientUtils.submitJobAndWaitForResult(ClientUtils.java:146)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:64)
at 
org.apache.flink.table.planner.delegation.BatchExecutor.execute(BatchExecutor.java:55)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:511)
at 
org.apache.flink.table.tpcds.TpcdsTestProgram.main(TpcdsTestProgram.java:116)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:403)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:284)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:177)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:753)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:282)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1016)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1089)
at 
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1089)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at 
org.apache.flink.client.ClientUtils.submitJobAndWaitForResult(ClientUtils.java:144)
... 18 more
Caused by: 
org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException:
 Could not fulfill slot request 16d054a481a09a766e4f5c452e8a24b7. Requested 
resource profile (ResourceProfile{cpuCores=0.0, heapMemoryInMB=0, 
directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0, 
managedMemoryInMB=768}) is unfulfillable.
at org.apache.flink.runtime.resourcemanager.sloat 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.setFailUnfulfillableRequest(SlotManagerImpl.java:491)
at 
org.apache.flink.runtime.resourcemanager.ResourceManager.setFailUnfulfillableRequest(ResourceManager.java:1070)
at 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager.lambda$startStartupPeriod$0(StandaloneResourceManager.java:116)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at 

[GitHub] [flink] docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-10 Thread GitBox
docete commented on a change in pull request #10105: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10105#discussion_r344543636
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarFunctionCallGen.scala
 ##
 @@ -66,7 +69,15 @@ class ScalarFunctionCallGen(scalarFunction: ScalarFunction) 
extends CallGenerato
   boxedTypeTermForType(returnType)
 }
 val resultTerm = ctx.addReusableLocalVariable(resultTypeTerm, "result")
-val evalResult = 
s"$functionReference.eval(${parameters.map(_.resultTerm).mkString(", ")})"
+val evalResult =
+  if (returnType.getTypeRoot == 
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) {
 
 Review comment:
   Currently many function use Long as the internal representation of 
Timestamp. And the document recommend this usage, see [Types.TIMESTAMP can be 
represented as 
long](https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/udfs.html#best-practices-for-implementing-udfs).
 We should cover this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-10 Thread GitBox
docete commented on a change in pull request #10105: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10105#discussion_r344542341
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
 ##
 @@ -370,8 +373,56 @@ object GenerateUtils {
 generateNonNullLiteral(literalType, literalValue.toString, 
literalValue)
 
   case TIMESTAMP_WITHOUT_TIME_ZONE =>
-val millis = literalValue.asInstanceOf[Long]
-generateNonNullLiteral(literalType, millis + "L", millis)
+def getNanoOfMillisSinceEpoch(timestampString: TimestampString): Int = 
{
+  val v = timestampString.toString()
+  val length = v.length
+  val nanoOfSeconds = length match {
+case 19 | 20 => 0
+case _ =>
+  JInteger.valueOf(v.substring(20)) * pow(10, 9 - (length - 
20)).intValue()
+  }
+  nanoOfSeconds % 100
+}
+
+// TODO: we copied the logical of TimestampString::getMillisSinceEpoch 
since the copied
+//  DateTimeUtils.ymdToJulian is wrong.
+//  SEE CALCITE-1884
+def getMillisInSecond(timestampString: TimestampString): Int = {
+  val v = timestampString.toString()
+  val length = v.length
+  val milliOfSeconds = length match {
+case 19 => 0
+case 21 => JInteger.valueOf(v.substring(20)).intValue() * 100
+case 22 => JInteger.valueOf(v.substring(20)).intValue() * 10
+case 20 | 23 | _ => JInteger.valueOf(v.substring(20, 
23)).intValue()
+  }
+  milliOfSeconds
+}
+
+def getMillisSinceEpoch(timestampString: TimestampString): Long = {
+  val v = timestampString.toString()
+  val year = JInteger.valueOf(v.substring(0, 4))
+  val month = JInteger.valueOf(v.substring(5, 7))
+  val day = JInteger.valueOf(v.substring(8, 10))
+  val h = JInteger.valueOf(v.substring(11, 13))
+  val m = JInteger.valueOf(v.substring(14, 16))
+  val s = JInteger.valueOf(v.substring(17, 19))
+  val ms = getMillisInSecond(timestampString)
+  val d = SqlDateTimeUtils.ymdToJulian(year, month, day)
+  d * 8640L + h * 360L + m * 6L + s * 1000L + ms.toLong
+}
+
+val fieldTerm = newName("timestamp")
+val millis = 
literalValue.asInstanceOf[TimestampString].getMillisSinceEpoch
 
 Review comment:
   The timestamp literals in SQL text has passed to a TimestampString instant 
of a Long(for preserving precision information). See 
ExprCodeGenerator.scala::visitLiteral (line 389): 
   `
 override def visitLiteral(literal: RexLiteral): GeneratedExpression = {
   val resultType = FlinkTypeFactory.toLogicalType(literal.getType)
   val value = resultType.getTypeRoot match {
 case LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE =>
   literal.getValueAs(classOf[TimestampString])
 case _ =>
   literal.getValue3
   }
   generateLiteral(ctx, resultType, value)
 }
   `


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344541629
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 ##
 @@ -387,7 +389,17 @@ public static TaskExecutor startTaskManager(
metricQueryServiceAddress,
blobCacheService,
fatalErrorHandler,
-   new TaskExecutorPartitionTrackerImpl());
+   new TaskExecutorPartitionTrackerImpl(),
+   createBackPressureSampleService(configuration, 
rpcService.getScheduledExecutor()));
+   }
+
+   static BackPressureSampleService createBackPressureSampleService(
 
 Review comment:
   @VisibleForTesting?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344541629
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 ##
 @@ -387,7 +389,17 @@ public static TaskExecutor startTaskManager(
metricQueryServiceAddress,
blobCacheService,
fatalErrorHandler,
-   new TaskExecutorPartitionTrackerImpl());
+   new TaskExecutorPartitionTrackerImpl(),
+   createBackPressureSampleService(configuration, 
rpcService.getScheduledExecutor()));
+   }
+
+   static BackPressureSampleService createBackPressureSampleService(
 
 Review comment:
   @VisibleForTesting?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.

2019-11-10 Thread GitBox
zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344541099
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/BackPressureSampleService.java
 ##
 @@ -53,64 +53,58 @@
 
this.numSamples = numSamples;
this.delayBetweenSamples = checkNotNull(delayBetweenSamples);
-   this.scheduledExecutor = checkNotNull(scheduledExecutor, "The 
scheduledExecutor must not be null.");
+   this.scheduledExecutor = checkNotNull(scheduledExecutor);
}
 
/**
-* Returns a future that completes with the back pressure ratio of a 
task.
+* Schedules to sample the task back pressure and returns a future that 
completes
+* with the back pressure ratio.
 *
 * @param task The task to be sampled.
-* @return A future of the task back pressure ratio.
+* @return A future containing the task back pressure ratio.
 */
public CompletableFuture 
sampleTaskBackPressure(BackPressureSampleableTask task) {
+   if (!task.isRunning()) {
+   throw new IllegalStateException("Cannot sample task. 
Because it is not running.");
 
 Review comment:
   It is better to give some debug info here. E.g we could give 
`task.toString()` in the message, then in the specific task implementation, it 
can provide `executionId` info as before.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #10105: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-10 Thread GitBox
docete commented on a change in pull request #10105: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10105#discussion_r344540487
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
 ##
 @@ -370,8 +373,56 @@ object GenerateUtils {
 generateNonNullLiteral(literalType, literalValue.toString, 
literalValue)
 
   case TIMESTAMP_WITHOUT_TIME_ZONE =>
-val millis = literalValue.asInstanceOf[Long]
-generateNonNullLiteral(literalType, millis + "L", millis)
+def getNanoOfMillisSinceEpoch(timestampString: TimestampString): Int = 
{
+  val v = timestampString.toString()
+  val length = v.length
+  val nanoOfSeconds = length match {
+case 19 | 20 => 0
+case _ =>
+  JInteger.valueOf(v.substring(20)) * pow(10, 9 - (length - 
20)).intValue()
+  }
+  nanoOfSeconds % 100
+}
+
+// TODO: we copied the logical of TimestampString::getMillisSinceEpoch 
since the copied
+//  DateTimeUtils.ymdToJulian is wrong.
 
 Review comment:
   Yes. CALCITE-1884 is not fixed in our copied DateTimeUtils. And it's the 
root cause of some delicate cases, such as:
   `
   SELECT TIMESTAMP '1500-04-30 00:00:00.123456789' FROM docs;
   SELECT CAST('1500-04-30 00:00:00.123456789' AS DATETIME(9)) FROM docs;
   `
   should returns
   `
   1500-05-10T00:00:00.123456789
   `


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   >