[GitHub] [flink] xintongsong commented on issue #11545: [FLINK-16742][runtime][dist] Extend and use BashJavaUtils to start JM JVM process and pass JVM memory args

2020-04-16 Thread GitBox
xintongsong commented on issue #11545: [FLINK-16742][runtime][dist] Extend and 
use BashJavaUtils to start JM JVM process and pass JVM memory args
URL: https://github.com/apache/flink/pull/11545#issuecomment-615058632
 
 
   Rebased onto #11577.
   A few changes during the rebasing:
   - Pass dynamic properties to `BashJavaUtils` for JM of standalone job 
clusters. (JM of standalone session cluster does not support dynamic properties 
ATM.)
   - Add test case for getting JM resource parameters from `BashJavaUtils` with 
dynamic properties.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (FLINK-17199) Use NeverReturnExpired strategy for states in blink streaming join.

2020-04-16 Thread Benchao Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Benchao Li resolved FLINK-17199.

Resolution: Duplicate

> Use NeverReturnExpired strategy for states in blink streaming join.
> ---
>
> Key: FLINK-17199
> URL: https://issues.apache.org/jira/browse/FLINK-17199
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.9.2, 1.10.0
>Reporter: Benchao Li
>Priority: Major
>
> Currently we use \{{ReturnExpiredIfNotCleanedUp}} for states in blink 
> streaming join operators, which is not very intuitive for users to understand 
> the behavior.
> IMO, \{{NeverReturnExpired}} is more straight forward, and we should change 
> to it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17199) Use NeverReturnExpired strategy for states in blink streaming join.

2020-04-16 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17085456#comment-17085456
 ] 

Benchao Li commented on FLINK-17199:


Yes. Thanks for the reminder.

> Use NeverReturnExpired strategy for states in blink streaming join.
> ---
>
> Key: FLINK-17199
> URL: https://issues.apache.org/jira/browse/FLINK-17199
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.9.2, 1.10.0
>Reporter: Benchao Li
>Priority: Major
>
> Currently we use \{{ReturnExpiredIfNotCleanedUp}} for states in blink 
> streaming join operators, which is not very intuitive for users to understand 
> the behavior.
> IMO, \{{NeverReturnExpired}} is more straight forward, and we should change 
> to it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17200) Add connectors to ClickHouse

2020-04-16 Thread jinhai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jinhai updated FLINK-17200:
---
Description: 
Clickhouse is a powerful OLAP query engine and supports real-time data mini 
batch writing.

We can add flink connectors to ClickHouse

[weisite: https://clickhouse.tech/|https://clickhouse.tech/]

> Add connectors to ClickHouse
> 
>
> Key: FLINK-17200
> URL: https://issues.apache.org/jira/browse/FLINK-17200
> Project: Flink
>  Issue Type: New Feature
>Reporter: jinhai
>Priority: Major
>
> Clickhouse is a powerful OLAP query engine and supports real-time data mini 
> batch writing.
> We can add flink connectors to ClickHouse
> [weisite: https://clickhouse.tech/|https://clickhouse.tech/]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] xintongsong commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
xintongsong commented on a change in pull request #11323: [FLINK-16439][k8s] 
Make KubernetesResourceManager starts workers using WorkerResourceSpec 
requested by SlotManager
URL: https://github.com/apache/flink/pull/11323#discussion_r410006159
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -237,57 +230,73 @@ private void recoverWorkerNodesFromPreviousAttempts() 
throws ResourceManagerExce
++currentMaxAttemptId);
}
 
-   private void requestKubernetesPod() {
-   numPendingPodRequests++;
+   private void requestKubernetesPod(WorkerResourceSpec 
workerResourceSpec) {
+   final KubernetesTaskManagerParameters parameters =
+   
createKubernetesTaskManagerParameters(workerResourceSpec);
+
+   final KubernetesPod taskManagerPod =
+   
KubernetesTaskManagerFactory.createTaskManagerComponent(parameters);
+   kubeClient.createTaskManagerPod(taskManagerPod);
+
+   podWorkerResources.put(parameters.getPodName(), 
workerResourceSpec);
+   final int pendingWorkerNum = 
notifyNewWorkerRequested(workerResourceSpec);
 
log.info("Requesting new TaskManager pod with <{},{}>. Number 
pending requests {}.",
-   defaultMemoryMB,
-   defaultCpus,
-   numPendingPodRequests);
+   parameters.getTaskManagerMemoryMB(),
+   parameters.getTaskManagerCPU(),
+   pendingWorkerNum);
+   log.info("TaskManager {} will be started with {}.", 
parameters.getPodName(), workerResourceSpec);
+   }
+
+   private KubernetesTaskManagerParameters 
createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) {
+   final TaskExecutorProcessSpec taskExecutorProcessSpec =
+   
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
workerResourceSpec);
 
final String podName = String.format(
TASK_MANAGER_POD_FORMAT,
clusterId,
currentMaxAttemptId,
++currentMaxPodId);
 
+   final ContaineredTaskManagerParameters taskManagerParameters =
+   ContaineredTaskManagerParameters.create(flinkConfig, 
taskExecutorProcessSpec);
+
final String dynamicProperties =

BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig);
 
-   final KubernetesTaskManagerParameters 
kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters(
+   return new KubernetesTaskManagerParameters(
flinkConfig,
podName,
dynamicProperties,
taskManagerParameters);
-
-   final KubernetesPod taskManagerPod =
-   
KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters);
-
-   log.info("TaskManager {} will be started with {}.", podName, 
taskExecutorProcessSpec);
-   kubeClient.createTaskManagerPod(taskManagerPod);
}
 
/**
 * Request new pod if pending pods cannot satisfy pending slot requests.
 */
-   private void requestKubernetesPodIfRequired() {
-   final int requiredTaskManagers = 
getNumberRequiredTaskManagers();
+   private void requestKubernetesPodIfRequired(WorkerResourceSpec 
workerResourceSpec) {
+   final int pendingWorkerNum = 
getNumPendingWorkersFor(workerResourceSpec);
+   int requiredTaskManagers = 
getRequiredResources().get(workerResourceSpec);
 
-   while (requiredTaskManagers > numPendingPodRequests) {
-   requestKubernetesPod();
+   while (requiredTaskManagers-- > pendingWorkerNum) {
+   requestKubernetesPod(workerResourceSpec);
}
}
 
private void removePodIfTerminated(KubernetesPod pod) {
if (pod.isTerminated()) {
kubeClient.stopPod(pod.getName());
 
 Review comment:
   It seems there are two issues.
   1. `numPendingPodRequests`/`PendingWorkerCounter` may go out of sync, if pod 
is stopped in `onError` before `onAdded` is called.
   2. Whether should we remove the pod or not when `onError` is called.
   
   I would suggest the following.
   - Try to resolve (1) in this PR, based on the current behavior that pod will 
be removed when `onError` is called. We can check whether the pod is requested 
in the current attempt by looking up in `podWorkerResources`, and whether the 
`onAdded` has been called by looking up in `workerNodes`. If the pod is 
requested in the current attempt and `onAdded` is not 

[jira] [Commented] (FLINK-17201) Implement Streaming ClickHouseSink

2020-04-16 Thread liufangliang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17085454#comment-17085454
 ] 

liufangliang commented on FLINK-17201:
--

Amazing

> Implement Streaming ClickHouseSink
> --
>
> Key: FLINK-17201
> URL: https://issues.apache.org/jira/browse/FLINK-17201
> Project: Flink
>  Issue Type: New Feature
>Reporter: jinhai
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17203) Add metrics for ClickHouse sink

2020-04-16 Thread jinhai (Jira)
jinhai created FLINK-17203:
--

 Summary: Add metrics for ClickHouse sink
 Key: FLINK-17203
 URL: https://issues.apache.org/jira/browse/FLINK-17203
 Project: Flink
  Issue Type: New Feature
Reporter: jinhai






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17202) Add SQL for ClickHouse connector

2020-04-16 Thread jinhai (Jira)
jinhai created FLINK-17202:
--

 Summary: Add SQL for ClickHouse connector
 Key: FLINK-17202
 URL: https://issues.apache.org/jira/browse/FLINK-17202
 Project: Flink
  Issue Type: New Feature
Reporter: jinhai






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zhengcanbin commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
zhengcanbin commented on a change in pull request #11323: [FLINK-16439][k8s] 
Make KubernetesResourceManager starts workers using WorkerResourceSpec 
requested by SlotManager
URL: https://github.com/apache/flink/pull/11323#discussion_r410001005
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -237,57 +230,73 @@ private void recoverWorkerNodesFromPreviousAttempts() 
throws ResourceManagerExce
++currentMaxAttemptId);
}
 
-   private void requestKubernetesPod() {
-   numPendingPodRequests++;
+   private void requestKubernetesPod(WorkerResourceSpec 
workerResourceSpec) {
+   final KubernetesTaskManagerParameters parameters =
+   
createKubernetesTaskManagerParameters(workerResourceSpec);
+
+   final KubernetesPod taskManagerPod =
+   
KubernetesTaskManagerFactory.createTaskManagerComponent(parameters);
+   kubeClient.createTaskManagerPod(taskManagerPod);
+
+   podWorkerResources.put(parameters.getPodName(), 
workerResourceSpec);
+   final int pendingWorkerNum = 
notifyNewWorkerRequested(workerResourceSpec);
 
log.info("Requesting new TaskManager pod with <{},{}>. Number 
pending requests {}.",
-   defaultMemoryMB,
-   defaultCpus,
-   numPendingPodRequests);
+   parameters.getTaskManagerMemoryMB(),
+   parameters.getTaskManagerCPU(),
+   pendingWorkerNum);
+   log.info("TaskManager {} will be started with {}.", 
parameters.getPodName(), workerResourceSpec);
+   }
+
+   private KubernetesTaskManagerParameters 
createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) {
+   final TaskExecutorProcessSpec taskExecutorProcessSpec =
+   
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
workerResourceSpec);
 
final String podName = String.format(
TASK_MANAGER_POD_FORMAT,
clusterId,
currentMaxAttemptId,
++currentMaxPodId);
 
+   final ContaineredTaskManagerParameters taskManagerParameters =
+   ContaineredTaskManagerParameters.create(flinkConfig, 
taskExecutorProcessSpec);
+
final String dynamicProperties =

BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig);
 
-   final KubernetesTaskManagerParameters 
kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters(
+   return new KubernetesTaskManagerParameters(
flinkConfig,
podName,
dynamicProperties,
taskManagerParameters);
-
-   final KubernetesPod taskManagerPod =
-   
KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters);
-
-   log.info("TaskManager {} will be started with {}.", podName, 
taskExecutorProcessSpec);
-   kubeClient.createTaskManagerPod(taskManagerPod);
}
 
/**
 * Request new pod if pending pods cannot satisfy pending slot requests.
 */
-   private void requestKubernetesPodIfRequired() {
-   final int requiredTaskManagers = 
getNumberRequiredTaskManagers();
+   private void requestKubernetesPodIfRequired(WorkerResourceSpec 
workerResourceSpec) {
+   final int pendingWorkerNum = 
getNumPendingWorkersFor(workerResourceSpec);
+   int requiredTaskManagers = 
getRequiredResources().get(workerResourceSpec);
 
-   while (requiredTaskManagers > numPendingPodRequests) {
-   requestKubernetesPod();
+   while (requiredTaskManagers-- > pendingWorkerNum) {
+   requestKubernetesPod(workerResourceSpec);
}
}
 
private void removePodIfTerminated(KubernetesPod pod) {
if (pod.isTerminated()) {
kubeClient.stopPod(pod.getName());
 
 Review comment:
   > Then isn't `numPendingPodRequests` already possibly go out of sync before 
this PR? Do we observe any cases?
   
   Yes, I noticed this problem when I dived into `KubernetesResourceManager` 
yesterday, it is possible that we do not receive an `ADD` Event(we met this 
kind of problem in our K8s production env, but not in Flink so far) even if the 
Pod has been deployed successfully on the K8s cluster.
   
   > @zhengcanbin Would it be possible that a pod is never successfully 
started, that we received `onError` but never `onAdded`, `onModified` or 
`onDeleted`?
   
   Yeah. It could possibly happen if something goes wrong in the 

[GitHub] [flink] zhengcanbin commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
zhengcanbin commented on a change in pull request #11323: [FLINK-16439][k8s] 
Make KubernetesResourceManager starts workers using WorkerResourceSpec 
requested by SlotManager
URL: https://github.com/apache/flink/pull/11323#discussion_r409991098
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -237,57 +230,73 @@ private void recoverWorkerNodesFromPreviousAttempts() 
throws ResourceManagerExce
++currentMaxAttemptId);
}
 
-   private void requestKubernetesPod() {
-   numPendingPodRequests++;
+   private void requestKubernetesPod(WorkerResourceSpec 
workerResourceSpec) {
+   final KubernetesTaskManagerParameters parameters =
+   
createKubernetesTaskManagerParameters(workerResourceSpec);
+
+   final KubernetesPod taskManagerPod =
+   
KubernetesTaskManagerFactory.createTaskManagerComponent(parameters);
+   kubeClient.createTaskManagerPod(taskManagerPod);
+
+   podWorkerResources.put(parameters.getPodName(), 
workerResourceSpec);
+   final int pendingWorkerNum = 
notifyNewWorkerRequested(workerResourceSpec);
 
log.info("Requesting new TaskManager pod with <{},{}>. Number 
pending requests {}.",
-   defaultMemoryMB,
-   defaultCpus,
-   numPendingPodRequests);
+   parameters.getTaskManagerMemoryMB(),
+   parameters.getTaskManagerCPU(),
+   pendingWorkerNum);
+   log.info("TaskManager {} will be started with {}.", 
parameters.getPodName(), workerResourceSpec);
+   }
+
+   private KubernetesTaskManagerParameters 
createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) {
+   final TaskExecutorProcessSpec taskExecutorProcessSpec =
+   
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
workerResourceSpec);
 
final String podName = String.format(
TASK_MANAGER_POD_FORMAT,
clusterId,
currentMaxAttemptId,
++currentMaxPodId);
 
+   final ContaineredTaskManagerParameters taskManagerParameters =
+   ContaineredTaskManagerParameters.create(flinkConfig, 
taskExecutorProcessSpec);
+
final String dynamicProperties =

BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig);
 
-   final KubernetesTaskManagerParameters 
kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters(
+   return new KubernetesTaskManagerParameters(
flinkConfig,
podName,
dynamicProperties,
taskManagerParameters);
-
-   final KubernetesPod taskManagerPod =
-   
KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters);
-
-   log.info("TaskManager {} will be started with {}.", podName, 
taskExecutorProcessSpec);
-   kubeClient.createTaskManagerPod(taskManagerPod);
}
 
/**
 * Request new pod if pending pods cannot satisfy pending slot requests.
 */
-   private void requestKubernetesPodIfRequired() {
-   final int requiredTaskManagers = 
getNumberRequiredTaskManagers();
+   private void requestKubernetesPodIfRequired(WorkerResourceSpec 
workerResourceSpec) {
+   final int pendingWorkerNum = 
getNumPendingWorkersFor(workerResourceSpec);
+   int requiredTaskManagers = 
getRequiredResources().get(workerResourceSpec);
 
-   while (requiredTaskManagers > numPendingPodRequests) {
-   requestKubernetesPod();
+   while (requiredTaskManagers-- > pendingWorkerNum) {
+   requestKubernetesPod(workerResourceSpec);
}
}
 
private void removePodIfTerminated(KubernetesPod pod) {
if (pod.isTerminated()) {
kubeClient.stopPod(pod.getName());
 
 Review comment:
   > So this means that `KubernetesResourceManager.onError` will only be called 
if `onAdded` has been called before? I guess this is also a question for 
@wangyang0918.
   
   There is no guarantee for this, a case in K8s client-go:
   `ERROR` could be thrown in 
https://github.com/kubernetes/kubernetes/blob/343c1e7636fe5c75cdd378c0b170b26935806de5/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go#L121
   
   `ERROR` is an HTTP exception that rarely happens and we shouldn't rely on it 
to handle Pod failure. Once a Pod is terminated, whether or not exited 
normally, it is expected we would receive a `MODIFIED` instead of 

[jira] [Created] (FLINK-17201) Implement Streaming ClickHouseSink

2020-04-16 Thread jinhai (Jira)
jinhai created FLINK-17201:
--

 Summary: Implement Streaming ClickHouseSink
 Key: FLINK-17201
 URL: https://issues.apache.org/jira/browse/FLINK-17201
 Project: Flink
  Issue Type: New Feature
Reporter: jinhai






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17200) Add connectors to ClickHouse

2020-04-16 Thread jinhai (Jira)
jinhai created FLINK-17200:
--

 Summary: Add connectors to ClickHouse
 Key: FLINK-17200
 URL: https://issues.apache.org/jira/browse/FLINK-17200
 Project: Flink
  Issue Type: New Feature
Reporter: jinhai






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zhengcanbin commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
zhengcanbin commented on a change in pull request #11323: [FLINK-16439][k8s] 
Make KubernetesResourceManager starts workers using WorkerResourceSpec 
requested by SlotManager
URL: https://github.com/apache/flink/pull/11323#discussion_r410001005
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -237,57 +230,73 @@ private void recoverWorkerNodesFromPreviousAttempts() 
throws ResourceManagerExce
++currentMaxAttemptId);
}
 
-   private void requestKubernetesPod() {
-   numPendingPodRequests++;
+   private void requestKubernetesPod(WorkerResourceSpec 
workerResourceSpec) {
+   final KubernetesTaskManagerParameters parameters =
+   
createKubernetesTaskManagerParameters(workerResourceSpec);
+
+   final KubernetesPod taskManagerPod =
+   
KubernetesTaskManagerFactory.createTaskManagerComponent(parameters);
+   kubeClient.createTaskManagerPod(taskManagerPod);
+
+   podWorkerResources.put(parameters.getPodName(), 
workerResourceSpec);
+   final int pendingWorkerNum = 
notifyNewWorkerRequested(workerResourceSpec);
 
log.info("Requesting new TaskManager pod with <{},{}>. Number 
pending requests {}.",
-   defaultMemoryMB,
-   defaultCpus,
-   numPendingPodRequests);
+   parameters.getTaskManagerMemoryMB(),
+   parameters.getTaskManagerCPU(),
+   pendingWorkerNum);
+   log.info("TaskManager {} will be started with {}.", 
parameters.getPodName(), workerResourceSpec);
+   }
+
+   private KubernetesTaskManagerParameters 
createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) {
+   final TaskExecutorProcessSpec taskExecutorProcessSpec =
+   
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
workerResourceSpec);
 
final String podName = String.format(
TASK_MANAGER_POD_FORMAT,
clusterId,
currentMaxAttemptId,
++currentMaxPodId);
 
+   final ContaineredTaskManagerParameters taskManagerParameters =
+   ContaineredTaskManagerParameters.create(flinkConfig, 
taskExecutorProcessSpec);
+
final String dynamicProperties =

BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig);
 
-   final KubernetesTaskManagerParameters 
kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters(
+   return new KubernetesTaskManagerParameters(
flinkConfig,
podName,
dynamicProperties,
taskManagerParameters);
-
-   final KubernetesPod taskManagerPod =
-   
KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters);
-
-   log.info("TaskManager {} will be started with {}.", podName, 
taskExecutorProcessSpec);
-   kubeClient.createTaskManagerPod(taskManagerPod);
}
 
/**
 * Request new pod if pending pods cannot satisfy pending slot requests.
 */
-   private void requestKubernetesPodIfRequired() {
-   final int requiredTaskManagers = 
getNumberRequiredTaskManagers();
+   private void requestKubernetesPodIfRequired(WorkerResourceSpec 
workerResourceSpec) {
+   final int pendingWorkerNum = 
getNumPendingWorkersFor(workerResourceSpec);
+   int requiredTaskManagers = 
getRequiredResources().get(workerResourceSpec);
 
-   while (requiredTaskManagers > numPendingPodRequests) {
-   requestKubernetesPod();
+   while (requiredTaskManagers-- > pendingWorkerNum) {
+   requestKubernetesPod(workerResourceSpec);
}
}
 
private void removePodIfTerminated(KubernetesPod pod) {
if (pod.isTerminated()) {
kubeClient.stopPod(pod.getName());
 
 Review comment:
   > Then isn't `numPendingPodRequests` already possibly go out of sync before 
this PR? Do we observe any cases?
   
   Yes, I noticed this problem when I dived into `KubernetesResourceManager` 
yesterday, it is possible that we do not receive an `ADD` Event(we met this 
kind of problem in our K8s production env, but not in Flink so far) even if the 
Pod has been deployed successfully on the K8s cluster.
   
   > @zhengcanbin Would it be possible that a pod is never successfully 
started, that we received `onError` but never `onAdded`, `onModified` or 
`onDeleted`?
   
   It is possible if something goes wrong in the HTTP stream and the 

[jira] [Commented] (FLINK-17199) Use NeverReturnExpired strategy for states in blink streaming join.

2020-04-16 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17085446#comment-17085446
 ] 

Jark Wu commented on FLINK-17199:
-

I think we will change this behavior in FLINK-16581?

> Use NeverReturnExpired strategy for states in blink streaming join.
> ---
>
> Key: FLINK-17199
> URL: https://issues.apache.org/jira/browse/FLINK-17199
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.9.2, 1.10.0
>Reporter: Benchao Li
>Priority: Major
>
> Currently we use \{{ReturnExpiredIfNotCleanedUp}} for states in blink 
> streaming join operators, which is not very intuitive for users to understand 
> the behavior.
> IMO, \{{NeverReturnExpired}} is more straight forward, and we should change 
> to it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink.

2020-04-16 Thread GitBox
flinkbot edited a comment on issue #11768: [FLINK-16943][python] Support set 
the configuration option "pipeline.jars" in PyFlink.
URL: https://github.com/apache/flink/pull/11768#issuecomment-614482552
 
 
   
   ## CI report:
   
   * a3aa6b6cab6c0a7e0e26c0cf3ef0e38f265e8799 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160507586) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7573)
 
   * b796de76daa408c58f6aa13391c93f1a1e88bb57 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160674334) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7624)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-16104) Translate "Streaming Aggregation" page of "Table API & SQL" into Chinese

2020-04-16 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-16104:
---

Assignee: ChaojianZhang

> Translate "Streaming Aggregation" page of "Table API & SQL" into Chinese 
> -
>
> Key: FLINK-16104
> URL: https://issues.apache.org/jira/browse/FLINK-16104
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Jark Wu
>Assignee: ChaojianZhang
>Priority: Major
>
> The page url is 
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/tuning/streaming_aggregation_optimization.html
> The markdown file is located in 
> {{flink/docs/dev/table/tuning/streaming_aggregation_optimization.zh.md}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-17177) Handle ERROR event correctly in KubernetesResourceManager#onError

2020-04-16 Thread Canbin Zheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17085442#comment-17085442
 ] 

Canbin Zheng edited comment on FLINK-17177 at 4/17/20, 5:22 AM:


{quote}I post the {{WatchEvent}} in K8s here[1]. I do not find the "Error" type 
means "HTTP error". So could share some information about how the "Error" type 
is introduced by HTTP layer error?

 

[1]. 
[https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.10/#watchevent-v1-meta]
{quote}
One case in K8s client-go is 
[https://github.com/kubernetes/kubernetes/blob/343c1e7636fe5c75cdd378c0b170b26935806de5/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go#L121]

Also, the K8s server could probably send an {{ERROR}} event if something goes 
wrong in the HTTP stream.


was (Author: felixzheng):
{quote}I post the {{WatchEvent}} in K8s here[1]. I do not find the "Error" type 
means "HTTP error". So could share some information about how the "Error" type 
is introduced by HTTP layer error?

 

[1]. 
[https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.10/#watchevent-v1-meta]
{quote}
One case in K8s client-go is 
[https://github.com/kubernetes/kubernetes/blob/343c1e7636fe5c75cdd378c0b170b26935806de5/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go#L121]

Also, the K8s server could probably send {{ERROR}} event if something goes 
wrong in the HTTP stream.

> Handle ERROR event correctly in KubernetesResourceManager#onError
> -
>
> Key: FLINK-17177
> URL: https://issues.apache.org/jira/browse/FLINK-17177
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.10.0, 1.10.1
>Reporter: Canbin Zheng
>Priority: Major
> Fix For: 1.11.0
>
>
> Currently, once we receive an *ERROR* event that is sent from the K8s API 
> server via the K8s {{Watcher}}, then {{KubernetesResourceManager#onError}} 
> will handle it by calling the 
> {{KubernetesResourceManager#removePodIfTerminated}}. This may be incorrect 
> since the *ERROR* event indicates an exception in the HTTP layer that is 
> caused by the K8s Server, which means the previously created {{Watcher}} may 
> be no longer available and we'd better re-create the {{Watcher}} immediately.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17177) Handle ERROR event correctly in KubernetesResourceManager#onError

2020-04-16 Thread Canbin Zheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17085442#comment-17085442
 ] 

Canbin Zheng commented on FLINK-17177:
--

{quote}I post the {{WatchEvent}} in K8s here[1]. I do not find the "Error" type 
means "HTTP error". So could share some information about how the "Error" type 
is introduced by HTTP layer error?

 

[1]. 
[https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.10/#watchevent-v1-meta]
{quote}
One case in K8s client-go is 
[https://github.com/kubernetes/kubernetes/blob/343c1e7636fe5c75cdd378c0b170b26935806de5/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go#L121]

Also, the K8s server could probably send {{ERROR}} event if something goes 
wrong in the HTTP stream.

> Handle ERROR event correctly in KubernetesResourceManager#onError
> -
>
> Key: FLINK-17177
> URL: https://issues.apache.org/jira/browse/FLINK-17177
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.10.0, 1.10.1
>Reporter: Canbin Zheng
>Priority: Major
> Fix For: 1.11.0
>
>
> Currently, once we receive an *ERROR* event that is sent from the K8s API 
> server via the K8s {{Watcher}}, then {{KubernetesResourceManager#onError}} 
> will handle it by calling the 
> {{KubernetesResourceManager#removePodIfTerminated}}. This may be incorrect 
> since the *ERROR* event indicates an exception in the HTTP layer that is 
> caused by the K8s Server, which means the previously created {{Watcher}} may 
> be no longer available and we'd better re-create the {{Watcher}} immediately.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] xintongsong commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
xintongsong commented on a change in pull request #11323: [FLINK-16439][k8s] 
Make KubernetesResourceManager starts workers using WorkerResourceSpec 
requested by SlotManager
URL: https://github.com/apache/flink/pull/11323#discussion_r409997707
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -237,57 +230,73 @@ private void recoverWorkerNodesFromPreviousAttempts() 
throws ResourceManagerExce
++currentMaxAttemptId);
}
 
-   private void requestKubernetesPod() {
-   numPendingPodRequests++;
+   private void requestKubernetesPod(WorkerResourceSpec 
workerResourceSpec) {
+   final KubernetesTaskManagerParameters parameters =
+   
createKubernetesTaskManagerParameters(workerResourceSpec);
+
+   final KubernetesPod taskManagerPod =
+   
KubernetesTaskManagerFactory.createTaskManagerComponent(parameters);
+   kubeClient.createTaskManagerPod(taskManagerPod);
+
+   podWorkerResources.put(parameters.getPodName(), 
workerResourceSpec);
+   final int pendingWorkerNum = 
notifyNewWorkerRequested(workerResourceSpec);
 
log.info("Requesting new TaskManager pod with <{},{}>. Number 
pending requests {}.",
-   defaultMemoryMB,
-   defaultCpus,
-   numPendingPodRequests);
+   parameters.getTaskManagerMemoryMB(),
+   parameters.getTaskManagerCPU(),
+   pendingWorkerNum);
+   log.info("TaskManager {} will be started with {}.", 
parameters.getPodName(), workerResourceSpec);
+   }
+
+   private KubernetesTaskManagerParameters 
createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) {
+   final TaskExecutorProcessSpec taskExecutorProcessSpec =
+   
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
workerResourceSpec);
 
final String podName = String.format(
TASK_MANAGER_POD_FORMAT,
clusterId,
currentMaxAttemptId,
++currentMaxPodId);
 
+   final ContaineredTaskManagerParameters taskManagerParameters =
+   ContaineredTaskManagerParameters.create(flinkConfig, 
taskExecutorProcessSpec);
+
final String dynamicProperties =

BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig);
 
-   final KubernetesTaskManagerParameters 
kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters(
+   return new KubernetesTaskManagerParameters(
flinkConfig,
podName,
dynamicProperties,
taskManagerParameters);
-
-   final KubernetesPod taskManagerPod =
-   
KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters);
-
-   log.info("TaskManager {} will be started with {}.", podName, 
taskExecutorProcessSpec);
-   kubeClient.createTaskManagerPod(taskManagerPod);
}
 
/**
 * Request new pod if pending pods cannot satisfy pending slot requests.
 */
-   private void requestKubernetesPodIfRequired() {
-   final int requiredTaskManagers = 
getNumberRequiredTaskManagers();
+   private void requestKubernetesPodIfRequired(WorkerResourceSpec 
workerResourceSpec) {
+   final int pendingWorkerNum = 
getNumPendingWorkersFor(workerResourceSpec);
+   int requiredTaskManagers = 
getRequiredResources().get(workerResourceSpec);
 
-   while (requiredTaskManagers > numPendingPodRequests) {
-   requestKubernetesPod();
+   while (requiredTaskManagers-- > pendingWorkerNum) {
+   requestKubernetesPod(workerResourceSpec);
}
}
 
private void removePodIfTerminated(KubernetesPod pod) {
if (pod.isTerminated()) {
kubeClient.stopPod(pod.getName());
 
 Review comment:
   It seems I was wrong about `onError` always called after `onAdded`.
   Then isn't `numPendingPodRequests` already possibly go out of sync before 
this PR? Do we observe any cases?
   
   @zhengcanbin Would it be possible that a pod is never successfully started, 
that we received `onError` but never `onAdded`, `onModified` or `onDeleted`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:

[GitHub] [flink] zhengcanbin commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
zhengcanbin commented on a change in pull request #11323: [FLINK-16439][k8s] 
Make KubernetesResourceManager starts workers using WorkerResourceSpec 
requested by SlotManager
URL: https://github.com/apache/flink/pull/11323#discussion_r409991098
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -237,57 +230,73 @@ private void recoverWorkerNodesFromPreviousAttempts() 
throws ResourceManagerExce
++currentMaxAttemptId);
}
 
-   private void requestKubernetesPod() {
-   numPendingPodRequests++;
+   private void requestKubernetesPod(WorkerResourceSpec 
workerResourceSpec) {
+   final KubernetesTaskManagerParameters parameters =
+   
createKubernetesTaskManagerParameters(workerResourceSpec);
+
+   final KubernetesPod taskManagerPod =
+   
KubernetesTaskManagerFactory.createTaskManagerComponent(parameters);
+   kubeClient.createTaskManagerPod(taskManagerPod);
+
+   podWorkerResources.put(parameters.getPodName(), 
workerResourceSpec);
+   final int pendingWorkerNum = 
notifyNewWorkerRequested(workerResourceSpec);
 
log.info("Requesting new TaskManager pod with <{},{}>. Number 
pending requests {}.",
-   defaultMemoryMB,
-   defaultCpus,
-   numPendingPodRequests);
+   parameters.getTaskManagerMemoryMB(),
+   parameters.getTaskManagerCPU(),
+   pendingWorkerNum);
+   log.info("TaskManager {} will be started with {}.", 
parameters.getPodName(), workerResourceSpec);
+   }
+
+   private KubernetesTaskManagerParameters 
createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) {
+   final TaskExecutorProcessSpec taskExecutorProcessSpec =
+   
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
workerResourceSpec);
 
final String podName = String.format(
TASK_MANAGER_POD_FORMAT,
clusterId,
currentMaxAttemptId,
++currentMaxPodId);
 
+   final ContaineredTaskManagerParameters taskManagerParameters =
+   ContaineredTaskManagerParameters.create(flinkConfig, 
taskExecutorProcessSpec);
+
final String dynamicProperties =

BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig);
 
-   final KubernetesTaskManagerParameters 
kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters(
+   return new KubernetesTaskManagerParameters(
flinkConfig,
podName,
dynamicProperties,
taskManagerParameters);
-
-   final KubernetesPod taskManagerPod =
-   
KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters);
-
-   log.info("TaskManager {} will be started with {}.", podName, 
taskExecutorProcessSpec);
-   kubeClient.createTaskManagerPod(taskManagerPod);
}
 
/**
 * Request new pod if pending pods cannot satisfy pending slot requests.
 */
-   private void requestKubernetesPodIfRequired() {
-   final int requiredTaskManagers = 
getNumberRequiredTaskManagers();
+   private void requestKubernetesPodIfRequired(WorkerResourceSpec 
workerResourceSpec) {
+   final int pendingWorkerNum = 
getNumPendingWorkersFor(workerResourceSpec);
+   int requiredTaskManagers = 
getRequiredResources().get(workerResourceSpec);
 
-   while (requiredTaskManagers > numPendingPodRequests) {
-   requestKubernetesPod();
+   while (requiredTaskManagers-- > pendingWorkerNum) {
+   requestKubernetesPod(workerResourceSpec);
}
}
 
private void removePodIfTerminated(KubernetesPod pod) {
if (pod.isTerminated()) {
kubeClient.stopPod(pod.getName());
 
 Review comment:
   > So this means that `KubernetesResourceManager.onError` will only be called 
if `onAdded` has been called before? I guess this is also a question for 
@wangyang0918.
   
   There is no guarantee for this, a case in K8s client-go:
   `ERROR` could be thrown in 
https://github.com/kubernetes/kubernetes/blob/343c1e7636fe5c75cdd378c0b170b26935806de5/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go#L121
   
   `ERROR` is an HTTP exception that rarely happens and we shouldn't rely on it 
to handle Pod failure. Once a Pod is terminated, whether or not exited 
normally, it is expected we would receive a `MODIFIED` event.


[GitHub] [flink] flinkbot edited a comment on issue #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

2020-04-16 Thread GitBox
flinkbot edited a comment on issue #11749: [FLINK-16669][python][table] Support 
Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#issuecomment-613901508
 
 
   
   ## CI report:
   
   * 7cbe2380ac7246cf703397d9a78d002c0491a959 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/160664905) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7621)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11615: [FLINK-16605] Add max limitation to the total number of slots

2020-04-16 Thread GitBox
flinkbot edited a comment on issue #11615: [FLINK-16605] Add max limitation to 
the total number of slots
URL: https://github.com/apache/flink/pull/11615#issuecomment-607717339
 
 
   
   ## CI report:
   
   * 2db316d977d0e790de8fa98b27dd219b68abb136 UNKNOWN
   * 0b0111ba893c9ecc7632394fc59e455f8d5c9db7 UNKNOWN
   * cf7728e0b162ed6bb27c75aa2d02ca9be7e2a6c4 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/160663186) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7618)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink.

2020-04-16 Thread GitBox
flinkbot edited a comment on issue #11768: [FLINK-16943][python] Support set 
the configuration option "pipeline.jars" in PyFlink.
URL: https://github.com/apache/flink/pull/11768#issuecomment-614482552
 
 
   
   ## CI report:
   
   * a3aa6b6cab6c0a7e0e26c0cf3ef0e38f265e8799 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160507586) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7573)
 
   * b796de76daa408c58f6aa13391c93f1a1e88bb57 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11010: [FLINK-15836][k8s] Throw fatal error in KubernetesResourceManager when the pods watcher is closed with exception

2020-04-16 Thread GitBox
flinkbot edited a comment on issue #11010: [FLINK-15836][k8s] Throw fatal error 
in KubernetesResourceManager when the pods watcher is closed with exception
URL: https://github.com/apache/flink/pull/11010#issuecomment-581769412
 
 
   
   ## CI report:
   
   * 1130280dfe557a0268fed248089fb32d15f832d9 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160664763) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7620)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-17199) Use NeverReturnExpired strategy for states in blink streaming join.

2020-04-16 Thread Benchao Li (Jira)
Benchao Li created FLINK-17199:
--

 Summary: Use NeverReturnExpired strategy for states in blink 
streaming join.
 Key: FLINK-17199
 URL: https://issues.apache.org/jira/browse/FLINK-17199
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Affects Versions: 1.10.0, 1.9.2
Reporter: Benchao Li


Currently we use \{{ReturnExpiredIfNotCleanedUp}} for states in blink streaming 
join operators, which is not very intuitive for users to understand the 
behavior.

IMO, \{{NeverReturnExpired}} is more straight forward, and we should change to 
it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zhijiangW commented on a change in pull request #11687: [FLINK-16536][network][checkpointing] Implement InputChannel state recovery for unaligned checkpoint

2020-04-16 Thread GitBox
zhijiangW commented on a change in pull request #11687: 
[FLINK-16536][network][checkpointing] Implement InputChannel state recovery for 
unaligned checkpoint
URL: https://github.com/apache/flink/pull/11687#discussion_r409995125
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ##
 @@ -213,19 +215,23 @@ public SingleInputGate(
}
 
@Override
-   public void setup() throws IOException, InterruptedException {
+   public void setup() throws IOException {
 
 Review comment:
   I agree that it has benefits to reduce steps for managing the lifecycle of 
input gate. But I thought of another potential concern if integrating it within 
`setup`. 
   
   The previous assumption was that the output recovery should execute earlier 
than the input recovery in order to occupy more floating buffers from global, 
to get the benefit to speedup recovery process. Now the output recovery is 
executed by task thread during `StreamTask#beforeInvoke`. If we integrate the 
input recovery within `setup` process, then it would perform before output 
recovery to occupy more floating buffers in limited buffers environment.
   
   I also considered integrating the output recovery within 
`ResultPartition#setup`, but the output recovery is executed by task thread 
then it would block the following operations, so I wonder it might also bring 
potential risks to do so.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11784: [FLINK-17120][python] Add Cython support for operations

2020-04-16 Thread GitBox
dianfu commented on a change in pull request #11784: [FLINK-17120][python] Add 
Cython support for operations
URL: https://github.com/apache/flink/pull/11784#discussion_r409994413
 
 

 ##
 File path: flink-python/pyflink/fn_execution/fast_operations.pxd
 ##
 @@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# cython: language_level=3
+
+cimport libc.stdint
+
+from apache_beam.utils.windowed_value cimport WindowedValue
+from apache_beam.runners.worker.operations cimport Operation
+from apache_beam.coders.coder_impl cimport StreamCoderImpl, CoderImpl, 
OutputStream, InputStream
+from pyflink.fn_execution.fast_coder_impl cimport InputStreamAndFunctionWrapper
 
 Review comment:
   unused import


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11784: [FLINK-17120][python] Add Cython support for operations

2020-04-16 Thread GitBox
dianfu commented on a change in pull request #11784: [FLINK-17120][python] Add 
Cython support for operations
URL: https://github.com/apache/flink/pull/11784#discussion_r409991523
 
 

 ##
 File path: flink-python/pyflink/fn_execution/coders.py
 ##
 @@ -177,6 +187,9 @@ def __init__(self, elem_coder):
 def _create_impl(self):
 return coder_impl.ArrayCoderImpl(self._elem_coder.get_impl())
 
+def get_impl(self):
 
 Review comment:
   If get_impl is already defined in the base class, it is not necessary to 
define it in child class again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11784: [FLINK-17120][python] Add Cython support for operations

2020-04-16 Thread GitBox
dianfu commented on a change in pull request #11784: [FLINK-17120][python] Add 
Cython support for operations
URL: https://github.com/apache/flink/pull/11784#discussion_r409992838
 
 

 ##
 File path: flink-python/pyflink/fn_execution/fast_operations.pyx
 ##
 @@ -0,0 +1,242 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# cython: language_level = 3
+# cython: infer_types = True
+# cython: profile=True
+# cython: boundscheck=False, wraparound=False, initializedcheck=False, 
cdivision=True
+
+import datetime
+
+import cloudpickle
+from apache_beam.runners.worker import bundle_processor
+from apache_beam.runners.worker import operation_specs
+
+from pyflink.fn_execution import flink_fn_execution_pb2
+from pyflink.metrics.metricbase import GenericMetricGroup
+from pyflink.serializers import PickleSerializer
+from pyflink.table import FunctionContext
+from pyflink.table.udf import DelegatingScalarFunction, DelegationTableFunction
+
+SCALAR_FUNCTION_URN = "flink:transform:scalar_function:v1"
+TABLE_FUNCTION_URN = "flink:transform:table_function:v1"
+
+cdef class StatelessFunctionOperation(Operation):
+"""
+Base class of stateless function operation that will execute 
ScalarFunction or TableFunction for
+each input element.
+"""
+
+def __init__(self, name, spec, counter_factory, sampler, consumers):
+super(StatelessFunctionOperation, self).__init__(name, spec, 
counter_factory, sampler)
+self.consumer = consumers['output'][0]
+self._value_coder_impl = 
self.consumer.windowed_coder.wrapped_value_coder.get_impl()
+value_coder = self._value_coder_impl._value_coder
+from pyflink.fn_execution.coder_impl import ArrowCoderImpl
+if isinstance(value_coder, ArrowCoderImpl):
+self._is_python_coder = True
+else:
+self._is_python_coder = False
+
+self.variable_dict = {}
+self.user_defined_funcs = []
+self._func_num = 0
+self._constant_num = 0
+self.func = self.generate_func(self.spec.serialized_fn.udfs)
+self._metric_enabled = self.spec.serialized_fn.metric_enabled
+self.base_metric_group = None
+if self._metric_enabled:
+self.base_metric_group = GenericMetricGroup(None, None)
+for user_defined_func in self.user_defined_funcs:
+user_defined_func.open(FunctionContext(self.base_metric_group))
+
+cpdef start(self):
+with self.scoped_start_state:
+super(StatelessFunctionOperation, self).start()
+
+cpdef finish(self):
+with self.scoped_finish_state:
+super(StatelessFunctionOperation, self).finish()
+self._update_gauge(self.base_metric_group)
+
+cpdef teardown(self):
+with self.scoped_finish_state:
+for user_defined_func in self.user_defined_funcs:
+user_defined_func.close()
+
+cpdef process(self, WindowedValue o):
+cdef InputStreamAndFunctionWrapper wrapper
+with self.scoped_process_state:
+output_stream = self.consumer.output_stream
+if self._is_python_coder:
+self._value_coder_impl.encode_to_stream(self.func(o.value), 
output_stream, True)
+else:
+wrapper = InputStreamAndFunctionWrapper(self.func, o.value)
+self._value_coder_impl.encode_to_stream(wrapper, 
output_stream, True)
+output_stream.maybe_flush()
+
+cpdef monitoring_infos(self, transform_id):
 
 Review comment:
   progress_metrics method is missing


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11784: [FLINK-17120][python] Add Cython support for operations

2020-04-16 Thread GitBox
dianfu commented on a change in pull request #11784: [FLINK-17120][python] Add 
Cython support for operations
URL: https://github.com/apache/flink/pull/11784#discussion_r409994309
 
 

 ##
 File path: flink-python/pyflink/fn_execution/fast_operations.pxd
 ##
 @@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# cython: language_level=3
+
+cimport libc.stdint
+
+from apache_beam.utils.windowed_value cimport WindowedValue
 
 Review comment:
   used import


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11781: [FLINK-16770][checkpointing] Revert commits of FLINK-16945 and FLINK-14971

2020-04-16 Thread GitBox
flinkbot edited a comment on issue #11781: [FLINK-16770][checkpointing] Revert 
commits of FLINK-16945 and FLINK-14971
URL: https://github.com/apache/flink/pull/11781#issuecomment-614694494
 
 
   
   ## CI report:
   
   * c6ea0d7e853fb1b8f244f5ce756fe07348580cf6 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160567250) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7606)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on issue #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink.

2020-04-16 Thread GitBox
WeiZhong94 commented on issue #11768: [FLINK-16943][python] Support set the 
configuration option "pipeline.jars" in PyFlink.
URL: https://github.com/apache/flink/pull/11768#issuecomment-615041372
 
 
   @kl0u Hi Kostas, the change of `LocalExecutor` has been included in this PR. 
It seems all tests are passed. Please take a look. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11782: [FLINK-15801] fix Timestamp extractor created from properties does no…

2020-04-16 Thread GitBox
flinkbot edited a comment on issue #11782: [FLINK-15801] fix Timestamp 
extractor created from properties does no…
URL: https://github.com/apache/flink/pull/11782#issuecomment-614731103
 
 
   
   ## CI report:
   
   * de8e893e40c189d6aaa3de4330df4c3065b8ae62 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/160663230) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7619)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
flinkbot edited a comment on issue #11323: [FLINK-16439][k8s] Make 
KubernetesResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11323#issuecomment-595267888
 
 
   
   ## CI report:
   
   * edd2f98c2a607655aef8823024753fccebe29a7d UNKNOWN
   * 260401deb92878bc2304b23a6988afe812439a7a Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160543406) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7596)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11010: [FLINK-15836][k8s] Throw fatal error in KubernetesResourceManager when the pods watcher is closed with exception

2020-04-16 Thread GitBox
flinkbot edited a comment on issue #11010: [FLINK-15836][k8s] Throw fatal error 
in KubernetesResourceManager when the pods watcher is closed with exception
URL: https://github.com/apache/flink/pull/11010#issuecomment-581769412
 
 
   
   ## CI report:
   
   * 1130280dfe557a0268fed248089fb32d15f832d9 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160664763) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7620)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wangyang0918 commented on issue #11010: [FLINK-15836][k8s] Throw fatal error in KubernetesResourceManager when the pods watcher is closed with exception

2020-04-16 Thread GitBox
wangyang0918 commented on issue #11010: [FLINK-15836][k8s] Throw fatal error in 
KubernetesResourceManager when the pods watcher is closed with exception
URL: https://github.com/apache/flink/pull/11010#issuecomment-615040585
 
 
   @tillrohrmann Could you help to review this PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on issue #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink.

2020-04-16 Thread GitBox
WeiZhong94 commented on issue #11768: [FLINK-16943][python] Support set the 
configuration option "pipeline.jars" in PyFlink.
URL: https://github.com/apache/flink/pull/11768#issuecomment-615039935
 
 
   @dianfu Thanks for your review! I have addressed your comments in the latest 
commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhengcanbin commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
zhengcanbin commented on a change in pull request #11323: [FLINK-16439][k8s] 
Make KubernetesResourceManager starts workers using WorkerResourceSpec 
requested by SlotManager
URL: https://github.com/apache/flink/pull/11323#discussion_r409991098
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -237,57 +230,73 @@ private void recoverWorkerNodesFromPreviousAttempts() 
throws ResourceManagerExce
++currentMaxAttemptId);
}
 
-   private void requestKubernetesPod() {
-   numPendingPodRequests++;
+   private void requestKubernetesPod(WorkerResourceSpec 
workerResourceSpec) {
+   final KubernetesTaskManagerParameters parameters =
+   
createKubernetesTaskManagerParameters(workerResourceSpec);
+
+   final KubernetesPod taskManagerPod =
+   
KubernetesTaskManagerFactory.createTaskManagerComponent(parameters);
+   kubeClient.createTaskManagerPod(taskManagerPod);
+
+   podWorkerResources.put(parameters.getPodName(), 
workerResourceSpec);
+   final int pendingWorkerNum = 
notifyNewWorkerRequested(workerResourceSpec);
 
log.info("Requesting new TaskManager pod with <{},{}>. Number 
pending requests {}.",
-   defaultMemoryMB,
-   defaultCpus,
-   numPendingPodRequests);
+   parameters.getTaskManagerMemoryMB(),
+   parameters.getTaskManagerCPU(),
+   pendingWorkerNum);
+   log.info("TaskManager {} will be started with {}.", 
parameters.getPodName(), workerResourceSpec);
+   }
+
+   private KubernetesTaskManagerParameters 
createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) {
+   final TaskExecutorProcessSpec taskExecutorProcessSpec =
+   
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
workerResourceSpec);
 
final String podName = String.format(
TASK_MANAGER_POD_FORMAT,
clusterId,
currentMaxAttemptId,
++currentMaxPodId);
 
+   final ContaineredTaskManagerParameters taskManagerParameters =
+   ContaineredTaskManagerParameters.create(flinkConfig, 
taskExecutorProcessSpec);
+
final String dynamicProperties =

BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig);
 
-   final KubernetesTaskManagerParameters 
kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters(
+   return new KubernetesTaskManagerParameters(
flinkConfig,
podName,
dynamicProperties,
taskManagerParameters);
-
-   final KubernetesPod taskManagerPod =
-   
KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters);
-
-   log.info("TaskManager {} will be started with {}.", podName, 
taskExecutorProcessSpec);
-   kubeClient.createTaskManagerPod(taskManagerPod);
}
 
/**
 * Request new pod if pending pods cannot satisfy pending slot requests.
 */
-   private void requestKubernetesPodIfRequired() {
-   final int requiredTaskManagers = 
getNumberRequiredTaskManagers();
+   private void requestKubernetesPodIfRequired(WorkerResourceSpec 
workerResourceSpec) {
+   final int pendingWorkerNum = 
getNumPendingWorkersFor(workerResourceSpec);
+   int requiredTaskManagers = 
getRequiredResources().get(workerResourceSpec);
 
-   while (requiredTaskManagers > numPendingPodRequests) {
-   requestKubernetesPod();
+   while (requiredTaskManagers-- > pendingWorkerNum) {
+   requestKubernetesPod(workerResourceSpec);
}
}
 
private void removePodIfTerminated(KubernetesPod pod) {
if (pod.isTerminated()) {
kubeClient.stopPod(pod.getName());
 
 Review comment:
   > So this means that `KubernetesResourceManager.onError` will only be called 
if `onAdded` has been called before? I guess this is also a question for 
@wangyang0918.
   
   There is no guarantee for this, `ERROR` could be thrown in 
https://github.com/kubernetes/kubernetes/blob/343c1e7636fe5c75cdd378c0b170b26935806de5/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go#L121


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this 

[GitHub] [flink] wangyang0918 commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
wangyang0918 commented on a change in pull request #11323: [FLINK-16439][k8s] 
Make KubernetesResourceManager starts workers using WorkerResourceSpec 
requested by SlotManager
URL: https://github.com/apache/flink/pull/11323#discussion_r409989580
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -237,57 +230,73 @@ private void recoverWorkerNodesFromPreviousAttempts() 
throws ResourceManagerExce
++currentMaxAttemptId);
}
 
-   private void requestKubernetesPod() {
-   numPendingPodRequests++;
+   private void requestKubernetesPod(WorkerResourceSpec 
workerResourceSpec) {
+   final KubernetesTaskManagerParameters parameters =
+   
createKubernetesTaskManagerParameters(workerResourceSpec);
+
+   final KubernetesPod taskManagerPod =
+   
KubernetesTaskManagerFactory.createTaskManagerComponent(parameters);
+   kubeClient.createTaskManagerPod(taskManagerPod);
+
+   podWorkerResources.put(parameters.getPodName(), 
workerResourceSpec);
+   final int pendingWorkerNum = 
notifyNewWorkerRequested(workerResourceSpec);
 
log.info("Requesting new TaskManager pod with <{},{}>. Number 
pending requests {}.",
-   defaultMemoryMB,
-   defaultCpus,
-   numPendingPodRequests);
+   parameters.getTaskManagerMemoryMB(),
+   parameters.getTaskManagerCPU(),
+   pendingWorkerNum);
+   log.info("TaskManager {} will be started with {}.", 
parameters.getPodName(), workerResourceSpec);
+   }
+
+   private KubernetesTaskManagerParameters 
createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) {
+   final TaskExecutorProcessSpec taskExecutorProcessSpec =
+   
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
workerResourceSpec);
 
final String podName = String.format(
TASK_MANAGER_POD_FORMAT,
clusterId,
currentMaxAttemptId,
++currentMaxPodId);
 
+   final ContaineredTaskManagerParameters taskManagerParameters =
+   ContaineredTaskManagerParameters.create(flinkConfig, 
taskExecutorProcessSpec);
+
final String dynamicProperties =

BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig);
 
-   final KubernetesTaskManagerParameters 
kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters(
+   return new KubernetesTaskManagerParameters(
flinkConfig,
podName,
dynamicProperties,
taskManagerParameters);
-
-   final KubernetesPod taskManagerPod =
-   
KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters);
-
-   log.info("TaskManager {} will be started with {}.", podName, 
taskExecutorProcessSpec);
-   kubeClient.createTaskManagerPod(taskManagerPod);
}
 
/**
 * Request new pod if pending pods cannot satisfy pending slot requests.
 */
-   private void requestKubernetesPodIfRequired() {
-   final int requiredTaskManagers = 
getNumberRequiredTaskManagers();
+   private void requestKubernetesPodIfRequired(WorkerResourceSpec 
workerResourceSpec) {
+   final int pendingWorkerNum = 
getNumPendingWorkersFor(workerResourceSpec);
+   int requiredTaskManagers = 
getRequiredResources().get(workerResourceSpec);
 
-   while (requiredTaskManagers > numPendingPodRequests) {
-   requestKubernetesPod();
+   while (requiredTaskManagers-- > pendingWorkerNum) {
+   requestKubernetesPod(workerResourceSpec);
}
}
 
private void removePodIfTerminated(KubernetesPod pod) {
if (pod.isTerminated()) {
kubeClient.stopPod(pod.getName());
 
 Review comment:
   I post the `WatchEvent` api in K8s document[1]. From the definition, i do 
not find the the guarantee that `Error` event is always returned after `Added`. 
We already have a [ticket](https://issues.apache.org/jira/browse/FLINK-17177) 
to track how to handle the `onError` event in `KubernetesResourceManager` 
correctly. I think we could keep the current behavior and try to find a 
solution in FLINK-17177.
   
   Moerover, i need to dig into the K8s ApiServer implementation and check when 
the `onError` `WacherEvent` could be returned.
   
   
   [1]. 

[GitHub] [flink] zhuzhurk commented on a change in pull request #11783: [FLINK-17181][runtime] Drop generic Types in SchedulingTopology Interface

2020-04-16 Thread GitBox
zhuzhurk commented on a change in pull request #11783: [FLINK-17181][runtime] 
Drop generic Types in SchedulingTopology Interface
URL: https://github.com/apache/flink/pull/11783#discussion_r409988049
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/topology/BaseTopology.java
 ##
 @@ -30,7 +30,7 @@
 *
 * @return topologically sorted iterable over all vertices
 */
-   Iterable getVertices();
+   Iterable getVertices();
 
 Review comment:
   I think we need to do similar changes for:
   `PipelinedRegion#getVertices()`
   `PipelinedRegion#getConsumedResults()`
   `Topology#getAllPipelinedRegions()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #11783: [FLINK-17181][runtime] Drop generic Types in SchedulingTopology Interface

2020-04-16 Thread GitBox
zhuzhurk commented on a change in pull request #11783: [FLINK-17181][runtime] 
Drop generic Types in SchedulingTopology Interface
URL: https://github.com/apache/flink/pull/11783#discussion_r409988394
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java
 ##
 @@ -174,9 +174,9 @@ private boolean isRegionOfVertexFinished(final 
ExecutionVertexID executionVertex
public static class Factory implements PartitionReleaseStrategy.Factory 
{
 
@Override
-   public PartitionReleaseStrategy createInstance(final 
SchedulingTopology schedulingStrategy) {
+   public PartitionReleaseStrategy createInstance(final 
SchedulingTopology schedulingStrategy) {
 
-   final Set>> distinctRegions =
+   final Set> distinctRegions =
 
 Review comment:
   Now it can be simplified to `Set>`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #11783: [FLINK-17181][runtime] Drop generic Types in SchedulingTopology Interface

2020-04-16 Thread GitBox
zhuzhurk commented on a change in pull request #11783: [FLINK-17181][runtime] 
Drop generic Types in SchedulingTopology Interface
URL: https://github.com/apache/flink/pull/11783#discussion_r409988449
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java
 ##
 @@ -98,15 +98,15 @@ public RestartPipelinedRegionFailoverStrategy(
// 

 
private void buildFailoverRegions() {
-   final Set>> distinctRegions =
+   final Set> 
distinctRegions =
 
 Review comment:
   Now it can be simplified to `Set>`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

2020-04-16 Thread GitBox
dianfu commented on a change in pull request #11749: 
[FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409266063
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/client/python/PythonFunctionFactoryUtil.java
 ##
 @@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.python;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+
+import py4j.GatewayServer;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.client.python.PythonEnvUtils.getGatewayServer;
+import static 
org.apache.flink.client.python.PythonEnvUtils.launchPy4jPythonClient;
+import static org.apache.flink.client.python.PythonEnvUtils.setGatewayServer;
+import static 
org.apache.flink.client.python.PythonEnvUtils.shutdownPythonProcess;
+
+/**
+ * Utils for PythonFunctionFactory.
+ */
+public class PythonFunctionFactoryUtil {
+
+   private static final long CHECK_INTERVAL = 100;
+
+   private static final long TIMEOUT_MILLIS = 3000;
 
 Review comment:
   Seems too small for me, change to 1?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

2020-04-16 Thread GitBox
dianfu commented on a change in pull request #11749: 
[FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409261220
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
 ##
 @@ -247,4 +257,90 @@ static Process startPythonProcess(PythonEnvironment 
pythonEnv, List comm
 
return process;
}
+
+   static void shutdownPythonProcess(Process pythonProcess, long 
timeoutMillis) {
 
 Review comment:
   This method seems very common and is not specific for Python. What about 
move it to PythonFunctionFactoryUtil where it's used?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

2020-04-16 Thread GitBox
dianfu commented on a change in pull request #11749: 
[FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409958033
 
 

 ##
 File path: flink-python/pyflink/table/tests/test_table_environment_api.py
 ##
 @@ -412,6 +419,13 @@ def test_table_environment_with_blink_planner(self):
 
 self.assert_equals(results, ['2,hi,hello\n', '3,hello,hello\n'])
 
+def test_sql_ddl(self):
 
 Review comment:
   Seems that this could be removed as it already exists in the parent class 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

2020-04-16 Thread GitBox
dianfu commented on a change in pull request #11749: 
[FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409959968
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
 ##
 @@ -247,4 +256,63 @@ static Process startPythonProcess(PythonEnvironment 
pythonEnv, List comm
 
return process;
}
+
+   /**
+* Py4J both supports Java to Python RPC and Python to Java RPC. The 
GatewayServer object is
+* the entry point of Java to Python RPC. Since the Py4j Python client 
will only be launched
+* only once, the GatewayServer object needs to be reused.
+*/
+   private static GatewayServer gatewayServer = null;
+
+   /**
+* Creates a GatewayServer run in a daemon thread.
+*
+* @return The created GatewayServer
+*/
+   static GatewayServer startGatewayServer() throws ExecutionException, 
InterruptedException {
+   if (gatewayServer != null) {
+   return gatewayServer;
+   }
+   CompletableFuture gatewayServerFuture = new 
CompletableFuture<>();
+   Thread thread = new Thread(() -> {
+   int freePort = NetUtils.getAvailablePort();
+   GatewayServer server = new 
GatewayServer.GatewayServerBuilder()
+   .gateway(new Gateway(new 
ConcurrentHashMap(), new CallbackClient(freePort)))
+   .javaPort(0)
+   .build();
+   gatewayServerFuture.complete(server);
+   server.start(true);
+   });
+   thread.setName("py4j-gateway");
+   thread.setDaemon(true);
+   thread.start();
+   thread.join();
+   gatewayServer = gatewayServerFuture.get();
+   return gatewayServer;
+   }
+
+   static Process launchPy4jPythonClient(
+   GatewayServer gatewayServer,
+   ReadableConfig config,
+   List commands,
+   String entryPointScript,
+   String tmpDir) throws IOException {
+   PythonEnvironment pythonEnv = 
PythonEnvUtils.preparePythonEnvironment(
+   config, entryPointScript, tmpDir);
+   // set env variable PYFLINK_GATEWAY_PORT for connecting of 
python gateway in python process.
+   pythonEnv.systemEnv.put("PYFLINK_GATEWAY_PORT", 
String.valueOf(gatewayServer.getListeningPort()));
+   // set env variable PYFLINK_CALLBACK_PORT for creating callback 
server in python process.
+   pythonEnv.systemEnv.put("PYFLINK_CALLBACK_PORT", 
String.valueOf(gatewayServer.getCallbackClient().getPort()));
+   // start the python process.
+   return PythonEnvUtils.startPythonProcess(pythonEnv, commands);
+   }
+
+   static GatewayServer getGatewayServer() {
+   return gatewayServer;
+   }
+
+   static void removeGatewayServer() {
 
 Review comment:
   Could we place all the methods about gateway server, e.g. 
startGatewayServer/getGatewayServer together?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

2020-04-16 Thread GitBox
dianfu commented on a change in pull request #11749: 
[FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409972882
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java
 ##
 @@ -18,13 +18,34 @@
 
 package org.apache.flink.table.functions;
 
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.FunctionLanguage;
+
+import java.lang.reflect.InvocationTargetException;
+
 /**
  * A util to instantiate {@link FunctionDefinition} in the default way.
  */
 public class FunctionDefinitionUtil {
 
public static FunctionDefinition createFunctionDefinition(String name, 
String className) {
-   // Currently only handles Java class-based functions
+   return createJavaFunctionDefinition(name, className);
+   }
+
+   public static FunctionDefinition createFunctionDefinition(
+   String name,
+   String className,
+   FunctionLanguage functionLanguage,
+   ReadableConfig config) {
+   switch (functionLanguage) {
 
 Review comment:
   It complains in my IDE that `switch has too few case labels`. Could you 
resolve this warning?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

2020-04-16 Thread GitBox
dianfu commented on a change in pull request #11749: 
[FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409970974
 
 

 ##
 File path: 
flink-python/src/test/java/org/apache/flink/client/python/PythonFunctionFactoryTest.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.python;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.flink.python.PythonOptions.PYTHON_FILES;
+
+/**
+ * Tests for PythonFunctionFactory.
+ */
+public class PythonFunctionFactoryTest {
+
+   private String tmpdir = "";
+   private BatchTableEnvironment flinkTableEnv;
+   private StreamTableEnvironment blinkTableEnv;
+   private Table flinkSourceTable;
+   private Table blinkSourceTable;
+
+   @Before
+   public void prepareEnvironment() throws Exception {
+   tmpdir = new File(System.getProperty("java.io.tmpdir"), 
UUID.randomUUID().toString()).getAbsolutePath();
+   new File(tmpdir).mkdir();
+   File pyFilePath = new File(tmpdir, "test1.py");
+   try (OutputStream out = new FileOutputStream(pyFilePath)) {
+   String code = ""
+   + "from pyflink.table.udf import udf\n"
+   + "from pyflink.table import DataTypes\n"
+   + "@udf(input_types=DataTypes.STRING(), 
result_type=DataTypes.STRING())\n"
+   + "def func1(str):\n"
+   + "return str + str\n";
+   out.write(code.getBytes());
+   }
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   flinkTableEnv = BatchTableEnvironment.create(env);
+   flinkTableEnv.getConfig().getConfiguration().set(PYTHON_FILES, 
pyFilePath.getAbsolutePath());
+   StreamExecutionEnvironment sEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   blinkTableEnv = StreamTableEnvironment.create(
+   sEnv, 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());
+   blinkTableEnv.getConfig().getConfiguration().set(PYTHON_FILES, 
pyFilePath.getAbsolutePath());
+   flinkSourceTable = 
flinkTableEnv.fromDataSet(env.fromElements("1", "2", "3")).as("str");
+   blinkSourceTable = 
blinkTableEnv.fromDataStream(sEnv.fromElements("1", "2", "3")).as("str");
+   }
+
+   @After
+   public void cleanEnvironment() throws Exception {
+   FileUtils.deleteDirectory(new File(tmpdir));
+   }
+
+   @Test
+   public void testPythonFunctionFactory() {
+   // flink temporary catalog
+   flinkTableEnv.sqlUpdate("create temporary function func1 as 
'test1.func1' language python");
+   verifyPlan(flinkSourceTable.select("func1(str)"), 
flinkTableEnv);
+
+   // flink temporary system
+   flinkTableEnv.sqlUpdate("create temporary system function func1 
as 'test1.func1' language python");
+   verifyPlan(flinkSourceTable.select("func1(str)"), 
flinkTableEnv);
+
+   // blink temporary catalog
+   blinkTableEnv.sqlUpdate("create temporary function func1 as 
'test1.func1' language python");
+   

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

2020-04-16 Thread GitBox
dianfu commented on a change in pull request #11749: 
[FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409973891
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
 ##
 @@ -259,10 +259,17 @@ private Operation 
convertCreateFunction(SqlCreateFunction sqlCreateFunction) {
return new CreateTempSystemFunctionOperation(
unresolvedIdentifier.getObjectName(),

sqlCreateFunction.getFunctionClassName().getValueAs(String.class),
-   sqlCreateFunction.isIfNotExists()
+   sqlCreateFunction.isIfNotExists(),
+   
parseLanguage(sqlCreateFunction.getFunctionLanguage())
);
} else {
FunctionLanguage language = 
parseLanguage(sqlCreateFunction.getFunctionLanguage());
+   if (language == FunctionLanguage.PYTHON && 
!sqlCreateFunction.isTemporary()) {
+   throw new ValidationException(String.format(
 
 Review comment:
   fix the warning here


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

2020-04-16 Thread GitBox
dianfu commented on a change in pull request #11749: 
[FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409971668
 
 

 ##
 File path: 
flink-python/src/test/java/org/apache/flink/client/python/PythonFunctionFactoryTest.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.python;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.flink.python.PythonOptions.PYTHON_FILES;
+
+/**
+ * Tests for PythonFunctionFactory.
+ */
+public class PythonFunctionFactoryTest {
+
+   private String tmpdir = "";
+   private BatchTableEnvironment flinkTableEnv;
+   private StreamTableEnvironment blinkTableEnv;
+   private Table flinkSourceTable;
+   private Table blinkSourceTable;
+
+   @Before
+   public void prepareEnvironment() throws Exception {
+   tmpdir = new File(System.getProperty("java.io.tmpdir"), 
UUID.randomUUID().toString()).getAbsolutePath();
+   new File(tmpdir).mkdir();
+   File pyFilePath = new File(tmpdir, "test1.py");
+   try (OutputStream out = new FileOutputStream(pyFilePath)) {
+   String code = ""
+   + "from pyflink.table.udf import udf\n"
+   + "from pyflink.table import DataTypes\n"
+   + "@udf(input_types=DataTypes.STRING(), 
result_type=DataTypes.STRING())\n"
+   + "def func1(str):\n"
+   + "return str + str\n";
+   out.write(code.getBytes());
+   }
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   flinkTableEnv = BatchTableEnvironment.create(env);
+   flinkTableEnv.getConfig().getConfiguration().set(PYTHON_FILES, 
pyFilePath.getAbsolutePath());
+   StreamExecutionEnvironment sEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   blinkTableEnv = StreamTableEnvironment.create(
+   sEnv, 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());
+   blinkTableEnv.getConfig().getConfiguration().set(PYTHON_FILES, 
pyFilePath.getAbsolutePath());
+   flinkSourceTable = 
flinkTableEnv.fromDataSet(env.fromElements("1", "2", "3")).as("str");
+   blinkSourceTable = 
blinkTableEnv.fromDataStream(sEnv.fromElements("1", "2", "3")).as("str");
+   }
+
+   @After
+   public void cleanEnvironment() throws Exception {
+   FileUtils.deleteDirectory(new File(tmpdir));
+   }
+
+   @Test
+   public void testPythonFunctionFactory() {
+   // flink temporary catalog
+   flinkTableEnv.sqlUpdate("create temporary function func1 as 
'test1.func1' language python");
+   verifyPlan(flinkSourceTable.select("func1(str)"), 
flinkTableEnv);
+
+   // flink temporary system
+   flinkTableEnv.sqlUpdate("create temporary system function func1 
as 'test1.func1' language python");
+   verifyPlan(flinkSourceTable.select("func1(str)"), 
flinkTableEnv);
+
+   // blink temporary catalog
+   blinkTableEnv.sqlUpdate("create temporary function func1 as 
'test1.func1' language python");
+   

[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

2020-04-16 Thread GitBox
dianfu commented on a change in pull request #11749: 
[FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409959350
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
 ##
 @@ -247,4 +256,63 @@ static Process startPythonProcess(PythonEnvironment 
pythonEnv, List comm
 
return process;
}
+
+   /**
+* Py4J both supports Java to Python RPC and Python to Java RPC. The 
GatewayServer object is
+* the entry point of Java to Python RPC. Since the Py4j Python client 
will only be launched
+* only once, the GatewayServer object needs to be reused.
+*/
+   private static GatewayServer gatewayServer = null;
+
+   /**
+* Creates a GatewayServer run in a daemon thread.
+*
+* @return The created GatewayServer
+*/
+   static GatewayServer startGatewayServer() throws ExecutionException, 
InterruptedException {
+   if (gatewayServer != null) {
+   return gatewayServer;
+   }
+   CompletableFuture gatewayServerFuture = new 
CompletableFuture<>();
+   Thread thread = new Thread(() -> {
+   int freePort = NetUtils.getAvailablePort();
+   GatewayServer server = new 
GatewayServer.GatewayServerBuilder()
+   .gateway(new Gateway(new 
ConcurrentHashMap(), new CallbackClient(freePort)))
+   .javaPort(0)
+   .build();
+   gatewayServerFuture.complete(server);
+   server.start(true);
+   });
+   thread.setName("py4j-gateway");
+   thread.setDaemon(true);
+   thread.start();
+   thread.join();
+   gatewayServer = gatewayServerFuture.get();
+   return gatewayServer;
+   }
+
+   static Process launchPy4jPythonClient(
+   GatewayServer gatewayServer,
+   ReadableConfig config,
+   List commands,
+   String entryPointScript,
+   String tmpDir) throws IOException {
+   PythonEnvironment pythonEnv = 
PythonEnvUtils.preparePythonEnvironment(
+   config, entryPointScript, tmpDir);
+   // set env variable PYFLINK_GATEWAY_PORT for connecting of 
python gateway in python process.
+   pythonEnv.systemEnv.put("PYFLINK_GATEWAY_PORT", 
String.valueOf(gatewayServer.getListeningPort()));
+   // set env variable PYFLINK_CALLBACK_PORT for creating callback 
server in python process.
+   pythonEnv.systemEnv.put("PYFLINK_CALLBACK_PORT", 
String.valueOf(gatewayServer.getCallbackClient().getPort()));
+   // start the python process.
+   return PythonEnvUtils.startPythonProcess(pythonEnv, commands);
+   }
+
+   static GatewayServer getGatewayServer() {
+   return gatewayServer;
+   }
+
+   static void removeGatewayServer() {
+   gatewayServer.shutdown();
 
 Review comment:
   Should check if gatewayServer is null


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

2020-04-16 Thread GitBox
dianfu commented on a change in pull request #11749: 
[FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409973118
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java
 ##
 @@ -33,8 +54,36 @@ public static FunctionDefinition 
createFunctionDefinition(String name, String cl
String.format("Failed instantiating '%s'", 
className), e);
}
 
+   return createFunctionDefinitionInternal(name, 
(UserDefinedFunction) func);
+   }
+
+   private static FunctionDefinition createPythonFunctionDefinition(
+   String name,
+   String fullyQualifiedName,
+   ReadableConfig config) {
+   Object func;
+   try {
+   Class pythonFunctionFactory = Class.forName(
+   
"org.apache.flink.client.python.PythonFunctionFactory",
+   true,
+   Thread.currentThread().getContextClassLoader());
+   func = pythonFunctionFactory.getMethod(
+   "getPythonFunction",
+   String.class,
+   ReadableConfig.class)
+   .invoke(null, fullyQualifiedName, config);
+
 
 Review comment:
   remove the empty line


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

2020-04-16 Thread GitBox
dianfu commented on a change in pull request #11749: 
[FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409970315
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/client/python/PythonGatewayServer.java
 ##
 @@ -82,4 +81,8 @@ public static void main(String[] args) throws IOException {
System.exit(1);
}
}
+
+   public static boolean ping() {
 
 Review comment:
   this method is not needed any more and should be removed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

2020-04-16 Thread GitBox
dianfu commented on a change in pull request #11749: 
[FLINK-16669][python][table] Support Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#discussion_r409972150
 
 

 ##
 File path: 
flink-python/src/test/java/org/apache/flink/client/python/PythonFunctionFactoryTest.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.python;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.flink.python.PythonOptions.PYTHON_FILES;
+
+/**
+ * Tests for PythonFunctionFactory.
+ */
+public class PythonFunctionFactoryTest {
+
+   private String tmpdir = "";
+   private BatchTableEnvironment flinkTableEnv;
+   private StreamTableEnvironment blinkTableEnv;
+   private Table flinkSourceTable;
+   private Table blinkSourceTable;
+
+   @Before
 
 Review comment:
   Change to BeforeClass?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11765: [FLINK-8864][sql-client] Add cli command history in sql client and make it available after restarts

2020-04-16 Thread GitBox
flinkbot edited a comment on issue #11765: [FLINK-8864][sql-client] Add cli 
command history in sql client and make it available after restarts
URL: https://github.com/apache/flink/pull/11765#issuecomment-614430999
 
 
   
   ## CI report:
   
   * 22eef4c43cd4fd0a6ddffb3cbef00a2fe522dac6 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160495175) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7557)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11615: [FLINK-16605] Add max limitation to the total number of slots

2020-04-16 Thread GitBox
flinkbot edited a comment on issue #11615: [FLINK-16605] Add max limitation to 
the total number of slots
URL: https://github.com/apache/flink/pull/11615#issuecomment-607717339
 
 
   
   ## CI report:
   
   * 2db316d977d0e790de8fa98b27dd219b68abb136 UNKNOWN
   * 0b0111ba893c9ecc7632394fc59e455f8d5c9db7 UNKNOWN
   * cf7728e0b162ed6bb27c75aa2d02ca9be7e2a6c4 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/160663186) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7618)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11351: [FLINK-16404][runtime] Avoid caching buffers for blocked input channels before barrier alignment

2020-04-16 Thread GitBox
flinkbot edited a comment on issue #11351: [FLINK-16404][runtime] Avoid caching 
buffers for blocked input channels before barrier alignment
URL: https://github.com/apache/flink/pull/11351#issuecomment-596351676
 
 
   
   ## CI report:
   
   * 715889a35cfcc3aaf1b17f39dadaa86f755cc75d UNKNOWN
   * 548b22258f5e87fd53b45c7f4bb6de40bfd4e6d2 UNKNOWN
   * 9179aab74eeaf80ea30c0894f3e5a0171338baed UNKNOWN
   * cd385c55e9dc111a061e19e2387f2ae9ce21369e UNKNOWN
   * e869d82f1aa639c02af4a82c5026f939bf4e8f6c UNKNOWN
   * f86372054c1c4724b8dabc8d06e369475e64ac29 UNKNOWN
   * a0c7ff983988f27f5e47f4c71c8fb1ef28f8a24a UNKNOWN
   * f2ba8a55cb8e441a1377b2cc00957e13e7445e47 UNKNOWN
   * 57564cc22bdc5fb9055a9896fce66bc0305b4e31 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160566977) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7605)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #11770: [FLINK-17014][runtime] Implement PipelinedRegionSchedulingStrategy

2020-04-16 Thread GitBox
zhuzhurk commented on a change in pull request #11770: [FLINK-17014][runtime] 
Implement PipelinedRegionSchedulingStrategy
URL: https://github.com/apache/flink/pull/11770#discussion_r409983090
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
 ##
 @@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+import org.apache.flink.util.IterableUtils;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link SchedulingStrategy} instance which schedules tasks in granularity of 
pipelined regions.
+ */
+public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(false);
+
+   /** Result partitions are correlated if they have the same result id. */
+   private final Map>> correlatedResultPartitions = new 
HashMap<>();
+
+   private final Map>> partitionConsumerRegions = new 
HashMap<>();
+
+   public PipelinedRegionSchedulingStrategy(
+   final SchedulerOperations schedulerOperations,
+   final SchedulingTopology schedulingTopology) {
+
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+
+   init();
+   }
+
+   private void init() {
+   for (SchedulingPipelinedRegion region : 
schedulingTopology.getAllPipelinedRegions()) {
+   for (SchedulingResultPartition partition : 
region.getConsumedResults()) {
+   // sanity check
 
 Review comment:
   ok. will remove it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #11770: [FLINK-17014][runtime] Implement PipelinedRegionSchedulingStrategy

2020-04-16 Thread GitBox
zhuzhurk commented on a change in pull request #11770: [FLINK-17014][runtime] 
Implement PipelinedRegionSchedulingStrategy
URL: https://github.com/apache/flink/pull/11770#discussion_r409983049
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java
 ##
 @@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit tests for {@link PipelinedRegionSchedulingStrategy}.
+ */
+public class PipelinedRegionSchedulingStrategyTest extends TestLogger {
+
+   private TestingSchedulerOperations testingSchedulerOperation;
+
+   private int parallelism = 2;
+
+   private TestingSchedulingTopology testingSchedulingTopology;
+
+   private List source;
+
+   private List map;
+
+   private List sink;
+
+   @Before
+   public void setUp() {
+   testingSchedulerOperation = new TestingSchedulerOperations();
+
+   buildTopology();
+   }
+
+   private void buildTopology() {
+   testingSchedulingTopology = new TestingSchedulingTopology();
+
+   source = 
testingSchedulingTopology.addExecutionVertices().withParallelism(parallelism).finish();
+   map = 
testingSchedulingTopology.addExecutionVertices().withParallelism(parallelism).finish();
+   sink =  
testingSchedulingTopology.addExecutionVertices().withParallelism(parallelism).finish();
+
+   testingSchedulingTopology.connectPointwise(source, map)
+   .withResultPartitionState(ResultPartitionState.CREATED)
+   
.withResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED)
+   .finish();
+   testingSchedulingTopology.connectAllToAll(map, sink)
+   .withResultPartitionState(ResultPartitionState.CREATED)
+   .withResultPartitionType(ResultPartitionType.BLOCKING)
+   .finish();
+
+   testingSchedulingTopology.generatePipelinedRegions();
+   }
+
+   @Test
+   public void testStartScheduling() {
+   startScheduling(testingSchedulingTopology);
+
+   final List> 
expectedScheduledVertices = new ArrayList<>();
+   expectedScheduledVertices.add(Arrays.asList(source.get(0), 
map.get(0)));
+   expectedScheduledVertices.add(Arrays.asList(source.get(1), 
map.get(1)));
+   
assertLatestScheduledVerticesAreEqualTo(expectedScheduledVertices);
+   }
+
+   @Test
+   public void testRestartTasks() {
+   final PipelinedRegionSchedulingStrategy schedulingStrategy = 
startScheduling(testingSchedulingTopology);
+
+   final Set verticesToRestart = 
Stream.of(source, map, sink)
+   .flatMap(List::stream)
+   .map(TestingSchedulingExecutionVertex::getId)
+   .collect(Collectors.toSet());
+
+   schedulingStrategy.restartTasks(verticesToRestart);
+
+   final List> 
expectedScheduledVertices = new ArrayList<>();
+   expectedScheduledVertices.add(Arrays.asList(source.get(0), 
map.get(0)));
+   expectedScheduledVertices.add(Arrays.asList(source.get(1), 
map.get(1)));
+   
assertLatestScheduledVerticesAreEqualTo(expectedScheduledVertices);
+   }
+
+   @Test
+   public void 

[GitHub] [flink] zhuzhurk commented on a change in pull request #11770: [FLINK-17014][runtime] Implement PipelinedRegionSchedulingStrategy

2020-04-16 Thread GitBox
zhuzhurk commented on a change in pull request #11770: [FLINK-17014][runtime] 
Implement PipelinedRegionSchedulingStrategy
URL: https://github.com/apache/flink/pull/11770#discussion_r409982245
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyUtils.java
 ##
 @@ -60,4 +61,18 @@

deploymentOptionRetriever.apply(executionVertexID)))
.collect(Collectors.toList());
}
+
+   static List> 
sortPipelinedRegionsInTopologicalOrder(
+   final SchedulingTopology topology,
+   final Set> regions) {
+
+   final Set> deduplicator = new 
HashSet<>();
+   return IterableUtils.toStream(topology.getVertices())
+   .map(SchedulingExecutionVertex::getId)
+   .map(topology::getPipelinedRegionOfVertex)
+   .filter(regions::contains)
+   .filter(region -> !deduplicator.contains(region))
+   .filter(deduplicator::add)
 
 Review comment:
   Good idea!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #11770: [FLINK-17014][runtime] Implement PipelinedRegionSchedulingStrategy

2020-04-16 Thread GitBox
zhuzhurk commented on a change in pull request #11770: [FLINK-17014][runtime] 
Implement PipelinedRegionSchedulingStrategy
URL: https://github.com/apache/flink/pull/11770#discussion_r409982245
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyUtils.java
 ##
 @@ -60,4 +61,18 @@

deploymentOptionRetriever.apply(executionVertexID)))
.collect(Collectors.toList());
}
+
+   static List> 
sortPipelinedRegionsInTopologicalOrder(
+   final SchedulingTopology topology,
+   final Set> regions) {
+
+   final Set> deduplicator = new 
HashSet<>();
+   return IterableUtils.toStream(topology.getVertices())
+   .map(SchedulingExecutionVertex::getId)
+   .map(topology::getPipelinedRegionOfVertex)
+   .filter(regions::contains)
+   .filter(region -> !deduplicator.contains(region))
+   .filter(deduplicator::add)
 
 Review comment:
   Good idea! Will use `distinct()`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #11770: [FLINK-17014][runtime] Implement PipelinedRegionSchedulingStrategy

2020-04-16 Thread GitBox
zhuzhurk commented on a change in pull request #11770: [FLINK-17014][runtime] 
Implement PipelinedRegionSchedulingStrategy
URL: https://github.com/apache/flink/pull/11770#discussion_r409982461
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
 ##
 @@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+import org.apache.flink.util.IterableUtils;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link SchedulingStrategy} instance which schedules tasks in granularity of 
pipelined regions.
+ */
+public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(false);
+
+   /** Result partitions are correlated if they have the same result id. */
+   private final Map>> correlatedResultPartitions = new 
HashMap<>();
+
+   private final Map>> partitionConsumerRegions = new 
HashMap<>();
+
+   public PipelinedRegionSchedulingStrategy(
+   final SchedulerOperations schedulerOperations,
+   final SchedulingTopology schedulingTopology) {
+
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+
+   init();
+   }
+
+   private void init() {
+   for (SchedulingPipelinedRegion region : 
schedulingTopology.getAllPipelinedRegions()) {
+   for (SchedulingResultPartition partition : 
region.getConsumedResults()) {
+   // sanity check
+   checkState(partition.getResultType() == 
ResultPartitionType.BLOCKING);
+
+   
partitionConsumerRegions.computeIfAbsent(partition.getId(), pid -> new 
HashSet<>()).add(region);
+   
correlatedResultPartitions.computeIfAbsent(partition.getResultId(), rid -> new 
HashSet<>()).add(partition);
+   }
+   }
+   }
+
+   @Override
+   public void startScheduling() {
+   final Set> sourceRegions = 
IterableUtils
+   .toStream(schedulingTopology.getAllPipelinedRegions())
+   .filter(region -> 
!region.getConsumedResults().iterator().hasNext())
+   .collect(Collectors.toSet());
+   maybeScheduleRegions(sourceRegions);
+   }
+
+   @Override
+   public void restartTasks(final Set 
verticesToRestart) {
+   final Set> regionsToRestart = 
verticesToRestart.stream()
+   .map(schedulingTopology::getPipelinedRegionOfVertex)
+   .collect(Collectors.toSet());
+   maybeScheduleRegions(regionsToRestart);
+   }
+
+   @Override
+   public void onExecutionStateChange(final ExecutionVertexID 
executionVertexId, final ExecutionState executionState) {
+   if (executionState == ExecutionState.FINISHED) {
+   final Set> 
finishedPartitions = IterableUtils
+   
.toStream(schedulingTopology.getVertex(executionVertexId).getProducedResults())
+   

[jira] [Resolved] (FLINK-16771) NPE when filtering by decimal column

2020-04-16 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee resolved FLINK-16771.
--
Resolution: Fixed

release-1.10: 47d6781de53a0cc237f457ae4f9c74c25ea590c6

> NPE when filtering by decimal column
> 
>
> Key: FLINK-16771
> URL: https://issues.apache.org/jira/browse/FLINK-16771
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: Rui Li
>Assignee: Rui Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.1, 1.11.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> The following SQL can trigger the issue:
> {code}
> create table foo (d decimal(15,8));
> insert into foo values (cast('123.123' as decimal(15,8)));
> select * from foo where d>cast('123456789.123' as decimal(15,8));
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] JingsongLi merged pull request #11736: [FLINK-16771][table-planner-blink] NPE when filtering by decimal column

2020-04-16 Thread GitBox
JingsongLi merged pull request #11736: [FLINK-16771][table-planner-blink] NPE 
when filtering by decimal column
URL: https://github.com/apache/flink/pull/11736
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11775: [FLINK-17132][metrics] Use the latest version of simpleclient_pushgateway to solve the "Failed to push metrics to PushGateway" problem

2020-04-16 Thread GitBox
flinkbot edited a comment on issue #11775: [FLINK-17132][metrics] Use the 
latest version of simpleclient_pushgateway to solve the "Failed to push metrics 
to PushGateway" problem
URL: https://github.com/apache/flink/pull/11775#issuecomment-614586674
 
 
   
   ## CI report:
   
   * 5178c66b32381b67a41cb2a4e36a7498f241502c Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160543598) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7598)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-15795) KubernetesClusterDescriptor seems to report unreachable JobManager interface URL

2020-04-16 Thread Canbin Zheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17085390#comment-17085390
 ] 

Canbin Zheng edited comment on FLINK-15795 at 4/17/20, 3:55 AM:


[~fly_in_gis] I noticed this problem before, it is in the plan of FLINK-17196.

cc [~trohrmann] Could it be a subtask of FLINK-17196?


was (Author: felixzheng):
I noticed this problem before, it is in the plan of FLINK-17196, could it be a 
subtask of FLINK-17196

> KubernetesClusterDescriptor seems to report unreachable JobManager interface 
> URL
> 
>
> Key: FLINK-15795
> URL: https://issues.apache.org/jira/browse/FLINK-15795
> Project: Flink
>  Issue Type: Sub-task
>  Components: Deployment / Kubernetes
>Affects Versions: 1.10.0
>Reporter: Till Rohrmann
>Priority: Minor
> Fix For: 1.10.1, 1.11.0
>
>
> The {{KubernetesClusterDescriptor}} reports the {{JobManager}} web interface 
> URL when deploying the cluster. In my case (using almost the standard 
> configuration), it reported an unreachable URL which consisted of the K8s 
> cluster's endpoint IP. This might be a configuration problem of the K8s 
> cluster but at the same time, the descriptor deployed a LoadBalancer (default 
> configuration) through which one could reach the web UI. In this case, I 
> would have wished that the {{LoadBalancer's}} IP and port is reported instead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17177) Handle ERROR event correctly in KubernetesResourceManager#onError

2020-04-16 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17085412#comment-17085412
 ] 

Yang Wang commented on FLINK-17177:
---

{code:java}
Object is: If Type is Added or Modified: the new state of the object. If Type 
is Deleted: the state of the object immediately before deletion. If Type is 
Error: Status is recommended; other types may make sense depending on context.
{code}
I post the {{WatchEvent}} in K8s here[1]. I do not find the "Error" type means 
"HTTP error". So could share some information about how the "Error" type is 
introduced by HTTP layer error?

 

[1]. 
[https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.10/#watchevent-v1-meta]

> Handle ERROR event correctly in KubernetesResourceManager#onError
> -
>
> Key: FLINK-17177
> URL: https://issues.apache.org/jira/browse/FLINK-17177
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.10.0, 1.10.1
>Reporter: Canbin Zheng
>Priority: Major
> Fix For: 1.11.0
>
>
> Currently, once we receive an *ERROR* event that is sent from the K8s API 
> server via the K8s {{Watcher}}, then {{KubernetesResourceManager#onError}} 
> will handle it by calling the 
> {{KubernetesResourceManager#removePodIfTerminated}}. This may be incorrect 
> since the *ERROR* event indicates an exception in the HTTP layer that is 
> caused by the K8s Server, which means the previously created {{Watcher}} may 
> be no longer available and we'd better re-create the {{Watcher}} immediately.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17198) DDL and DML compatibility for Hive connector

2020-04-16 Thread Rui Li (Jira)
Rui Li created FLINK-17198:
--

 Summary: DDL and DML compatibility for Hive connector
 Key: FLINK-17198
 URL: https://issues.apache.org/jira/browse/FLINK-17198
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Hive, Table SQL / Client
Reporter: Rui Li
 Fix For: 1.11.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17197) ContinuousFileReaderOperator might shade the real exception when processing records

2020-04-16 Thread Caizhi Weng (Jira)
Caizhi Weng created FLINK-17197:
---

 Summary: ContinuousFileReaderOperator might shade the real 
exception when processing records
 Key: FLINK-17197
 URL: https://issues.apache.org/jira/browse/FLINK-17197
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / FileSystem
Affects Versions: 1.11.0
Reporter: Caizhi Weng


In {{ContinuousFileReaderOperator}} we have the following code:

{code:java}
private final transient RunnableWithException processRecordAction = () -> {
try {
processRecord();
} catch (Exception e) {
switchState(ReaderState.CLOSED);
throw e;
}
};
{code}

The reader's state is forced to be {{CLOSED}} when an exception occurs. But 
this {{switchState}} method might throw another exception saying that reader's 
current state can't be switched directly to {{CLOSED}}. This new exception will 
shade the real exception thrown from {{processRecord}}.

To trigger this situation, add the following test method to any blink planner 
IT case class (for example 
{{org.apache.flink.table.planner.runtime.batch.sql.CalcITCase}}):

{code:java}
@Test
def testCsv(): Unit = {
  conf.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
  val tableSource = TestTableSourceSinks.getPersonCsvTableSource
  tEnv.registerTableSource("MyTable", tableSource)
  val t = tEnv.sqlQuery("SELECT * FROM MyTable")
  TableUtils.collectToList(t)
}
{code}

We then need to change {{org.apache.flink.api.java.Utils.CollectHelper}} to 
make it a failing sink:

{code:java}
@Override
public void writeRecord(T record) throws IOException {
throw new RuntimeException("writeRecordFails");
// accumulator.add(record, serializer);
}
{code}

Run the test case and the following exception stack will occur:

{code}
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at 
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:659)
at 
org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:77)
at 
org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:51)
at 
org.apache.flink.table.planner.utils.TestingTableEnvironment.execute(TableTestBase.scala:1054)
at 
org.apache.flink.table.api.TableUtils.collectToList(TableUtils.java:85)
at 
org.apache.flink.table.planner.runtime.batch.sql.CalcITCase.testCsv(CalcITCase.scala:73)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at 

[GitHub] [flink] flinkbot edited a comment on issue #11545: [FLINK-16742][runtime][dist] Extend and use BashJavaUtils to start JM JVM process and pass JVM memory args

2020-04-16 Thread GitBox
flinkbot edited a comment on issue #11545: [FLINK-16742][runtime][dist] Extend 
and use BashJavaUtils to start JM JVM process and pass JVM memory args
URL: https://github.com/apache/flink/pull/11545#issuecomment-604848813
 
 
   
   ## CI report:
   
   * 46b64a4339ed27315eee1eb2d8d877c8dfaae993 UNKNOWN
   * 0e3973318602ca3081a5d5a4401cd5f2df4fa4b3 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160482936) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7548)
 
   * dd69c50599e8e8709ceb2fb055797f94534c0aa4 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160667141) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7623)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-17177) Handle ERROR event correctly in KubernetesResourceManager#onError

2020-04-16 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang updated FLINK-17177:
--
Attachment: (was: image-2020-04-17-11-40-41-869.png)

> Handle ERROR event correctly in KubernetesResourceManager#onError
> -
>
> Key: FLINK-17177
> URL: https://issues.apache.org/jira/browse/FLINK-17177
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.10.0, 1.10.1
>Reporter: Canbin Zheng
>Priority: Major
> Fix For: 1.11.0
>
>
> Currently, once we receive an *ERROR* event that is sent from the K8s API 
> server via the K8s {{Watcher}}, then {{KubernetesResourceManager#onError}} 
> will handle it by calling the 
> {{KubernetesResourceManager#removePodIfTerminated}}. This may be incorrect 
> since the *ERROR* event indicates an exception in the HTTP layer that is 
> caused by the K8s Server, which means the previously created {{Watcher}} may 
> be no longer available and we'd better re-create the {{Watcher}} immediately.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17177) Handle ERROR event correctly in KubernetesResourceManager#onError

2020-04-16 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang updated FLINK-17177:
--
Attachment: image-2020-04-17-11-40-41-869.png

> Handle ERROR event correctly in KubernetesResourceManager#onError
> -
>
> Key: FLINK-17177
> URL: https://issues.apache.org/jira/browse/FLINK-17177
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.10.0, 1.10.1
>Reporter: Canbin Zheng
>Priority: Major
> Fix For: 1.11.0
>
>
> Currently, once we receive an *ERROR* event that is sent from the K8s API 
> server via the K8s {{Watcher}}, then {{KubernetesResourceManager#onError}} 
> will handle it by calling the 
> {{KubernetesResourceManager#removePodIfTerminated}}. This may be incorrect 
> since the *ERROR* event indicates an exception in the HTTP layer that is 
> caused by the K8s Server, which means the previously created {{Watcher}} may 
> be no longer available and we'd better re-create the {{Watcher}} immediately.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zenfenan edited a comment on issue #11759: FLINK-17142: Bump ORC dependency version to 1.5.6

2020-04-16 Thread GitBox
zenfenan edited a comment on issue #11759: FLINK-17142: Bump ORC dependency 
version to 1.5.6
URL: https://github.com/apache/flink/pull/11759#issuecomment-615020177
 
 
   @zentol I checked if NOTICE or LICENSE has to be updated but I don't think 
it is needed. Appreciate if you or @JingsongLi or @kl0u could also check. 
   
   @all If everything checks out, can we close this one? Thanks :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zenfenan edited a comment on issue #11759: FLINK-17142: Bump ORC dependency version to 1.5.6

2020-04-16 Thread GitBox
zenfenan edited a comment on issue #11759: FLINK-17142: Bump ORC dependency 
version to 1.5.6
URL: https://github.com/apache/flink/pull/11759#issuecomment-615020177
 
 
   @zentol I checked if NOTICE or LICENSE has to be updated but I don't think 
it is needed. Appreciate if you or @JingsongLi or @kl0u could check. 
   
   @all If everything checks out, can we close this one? Thanks :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zenfenan edited a comment on issue #11759: FLINK-17142: Bump ORC dependency version to 1.5.6

2020-04-16 Thread GitBox
zenfenan edited a comment on issue #11759: FLINK-17142: Bump ORC dependency 
version to 1.5.6
URL: https://github.com/apache/flink/pull/11759#issuecomment-615020177
 
 
   @zentol I checked if NOTICE or LICENSE has to be updated but I don't think 
it is needed. Appreciate if you or @JingsongLi or @kl0u could also check. 
   
   All, if everything checks out, can we close this one? Thanks :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zenfenan commented on issue #11759: FLINK-17142: Bump ORC dependency version to 1.5.6

2020-04-16 Thread GitBox
zenfenan commented on issue #11759: FLINK-17142: Bump ORC dependency version to 
1.5.6
URL: https://github.com/apache/flink/pull/11759#issuecomment-615020177
 
 
   @zentol I checked if NOTICE or LICENSE has to be updated but I don't think 
if that is needed. Appreciate if you or @JingsongLi or @kl0u could check. 
   
   @all If everything checks out, can we close this one? Thanks :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wangyang0918 commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
wangyang0918 commented on a change in pull request #11323: [FLINK-16439][k8s] 
Make KubernetesResourceManager starts workers using WorkerResourceSpec 
requested by SlotManager
URL: https://github.com/apache/flink/pull/11323#discussion_r409966657
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -237,57 +230,73 @@ private void recoverWorkerNodesFromPreviousAttempts() 
throws ResourceManagerExce
++currentMaxAttemptId);
}
 
-   private void requestKubernetesPod() {
-   numPendingPodRequests++;
+   private void requestKubernetesPod(WorkerResourceSpec 
workerResourceSpec) {
+   final KubernetesTaskManagerParameters parameters =
+   
createKubernetesTaskManagerParameters(workerResourceSpec);
+
+   final KubernetesPod taskManagerPod =
+   
KubernetesTaskManagerFactory.createTaskManagerComponent(parameters);
+   kubeClient.createTaskManagerPod(taskManagerPod);
 
 Review comment:
   @tillrohrmann Sorry to jump to here. I want to add some additional 
explanations here. 
   
   Why i name it to `createTaskManagerPod` instead of `startTaskManagerPod`?
   * we usually use `create` in K8s commands(e.g. `kubectl create -f 
taskmanager.yaml`). The start process will be done internally and automatically 
in K8s `kubelet`. 
   
   Why do not return a `Pod` for `createTaskManagerPod`?
   * Of cause we could return a created `Pod`. However, it is not a final or 
stable state. Since the K8s will do some updates for the `Pod`, the ip address, 
the status. So i do not return the `Pod` to avoid it may be used incorrectly. 
   
   For the `KubernetesJobManagerFactory#createJobManagerComponent` and 
`KubernetesTaskManagerFactory#createTaskManagerComponent`, maybe we could 
rename them to `KubernetesJobManagerFactory#buildJobManagerSpecification` and 
`KubernetesTaskManagerFactory#buildTaskManagerPod`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11784: [FLINK-17120][python] Add Cython support for operations

2020-04-16 Thread GitBox
flinkbot edited a comment on issue #11784: [FLINK-17120][python] Add Cython 
support for operations
URL: https://github.com/apache/flink/pull/11784#issuecomment-615011156
 
 
   
   ## CI report:
   
   * 1ee639f83adcf607b722609782be88207de5fe49 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160666122) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7622)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11545: [FLINK-16742][runtime][dist] Extend and use BashJavaUtils to start JM JVM process and pass JVM memory args

2020-04-16 Thread GitBox
flinkbot edited a comment on issue #11545: [FLINK-16742][runtime][dist] Extend 
and use BashJavaUtils to start JM JVM process and pass JVM memory args
URL: https://github.com/apache/flink/pull/11545#issuecomment-604848813
 
 
   
   ## CI report:
   
   * 46b64a4339ed27315eee1eb2d8d877c8dfaae993 UNKNOWN
   * 0e3973318602ca3081a5d5a4401cd5f2df4fa4b3 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160482936) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7548)
 
   * dd69c50599e8e8709ceb2fb055797f94534c0aa4 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-16 Thread GitBox
flinkbot edited a comment on issue #11320: [FLINK-16437][runtime] Make 
SlotManager allocate resource from ResourceManager at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#issuecomment-595174225
 
 
   
   ## CI report:
   
   * 8e8aca52ebd0560d829ef3945884817668976804 UNKNOWN
   * 40f6f68a12a74aacf1100dfb81b5bed081cd9a4a UNKNOWN
   * 7b2346b7cdc46a7c3b490c6c9a8ce6e1727fe10b Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160592479) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7610)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wangyang0918 commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
wangyang0918 commented on a change in pull request #11323: [FLINK-16439][k8s] 
Make KubernetesResourceManager starts workers using WorkerResourceSpec 
requested by SlotManager
URL: https://github.com/apache/flink/pull/11323#discussion_r409968753
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -189,15 +180,17 @@ public boolean stopWorker(final KubernetesWorkerNode 
worker) {
public void onAdded(List pods) {
runAsync(() -> {
for (KubernetesPod pod : pods) {
-   if (numPendingPodRequests > 0) {
-   numPendingPodRequests--;
+   WorkerResourceSpec workerResourceSpec = 
podWorkerResources.get(pod.getName());
+   final int pendingNum = 
getNumPendingWorkersFor(workerResourceSpec);
+   if (pendingNum > 0) {
 
 Review comment:
   Recently i also realize that the `onAdded` for a same `Pod` may be called 
more that once here, e.g. some network problems cause the pod watcher 
reconnect, so it will be good to check the existence first. Otherwise, the 
`numPendingPodRequests` may be decreased wrongly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-15795) KubernetesClusterDescriptor seems to report unreachable JobManager interface URL

2020-04-16 Thread Canbin Zheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17085390#comment-17085390
 ] 

Canbin Zheng commented on FLINK-15795:
--

I noticed this problem before, it is in the plan of FLINK-17196, could it be a 
subtask of FLINK-17196

> KubernetesClusterDescriptor seems to report unreachable JobManager interface 
> URL
> 
>
> Key: FLINK-15795
> URL: https://issues.apache.org/jira/browse/FLINK-15795
> Project: Flink
>  Issue Type: Sub-task
>  Components: Deployment / Kubernetes
>Affects Versions: 1.10.0
>Reporter: Till Rohrmann
>Priority: Minor
> Fix For: 1.10.1, 1.11.0
>
>
> The {{KubernetesClusterDescriptor}} reports the {{JobManager}} web interface 
> URL when deploying the cluster. In my case (using almost the standard 
> configuration), it reported an unreachable URL which consisted of the K8s 
> cluster's endpoint IP. This might be a configuration problem of the K8s 
> cluster but at the same time, the descriptor deployed a LoadBalancer (default 
> configuration) through which one could reach the web UI. In this case, I 
> would have wished that the {{LoadBalancer's}} IP and port is reported instead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-17185) Value of Row[] parameter of UDF is null in blink planner

2020-04-16 Thread zck (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zck closed FLINK-17185.
---
Resolution: Invalid

> Value of Row[] parameter of UDF is null in blink planner
> 
>
> Key: FLINK-17185
> URL: https://issues.apache.org/jira/browse/FLINK-17185
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: zck
>Priority: Major
>
> source_ddl:
> CREATE TABLE sourceTable (
> event_time_line array `rule_name` VARCHAR,
> `count` VARCHAR
> )>
> ) WITH (
> 'connector.type' = 'kafka',
> 'connector.version' = 'universal',
> 'connector.startup-mode' = 'earliest-offset',
> 'connector.topic' = 'topic_test_1',
> 'connector.properties.zookeeper.connect' = 'localhost:2181',
> 'connector.properties.bootstrap.servers' = 'localhost:9092',
> 'update-mode' = 'append',
> 'format.type' = 'json',
> 'format.derive-schema' = 'true'
> );
>  
>  
> public class MyUDTF extends TableFunction {
>  /**
>  * 
>  * *@param rows blink执行计划 为null,但是数组有长度 。默认执行计划有值*
>  */
>  public void eval(Row [] rows) {
>  collector.collect(Row.of(1, 1));
>  }
> }



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-17176) Slow down Pod re-creation in KubernetesResourceManager#PodCallbackHandler

2020-04-16 Thread Canbin Zheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17085383#comment-17085383
 ] 

Canbin Zheng edited comment on FLINK-17176 at 4/17/20, 3:08 AM:


Of course, they should share the same retry logic. I think you are right, this 
ticket could be a subtask of {color:#4a6ee0}FLINK-17127{color}


was (Author: felixzheng):
Of course, they should share the same retry logic, and this ticket could be a 
subtask of {color:#4a6ee0}FLINK-17127{color}

> Slow down Pod re-creation in KubernetesResourceManager#PodCallbackHandler
> -
>
> Key: FLINK-17176
> URL: https://issues.apache.org/jira/browse/FLINK-17176
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes
>Affects Versions: 1.10.0
>Reporter: Canbin Zheng
>Priority: Major
> Fix For: 1.11.0
>
>
> In the native K8s setups, there are some cases that we do not control the 
> speed of pod re-creation which poses potential risks to flood the K8s API 
> Server in the {{PodCallbackHandler}} implementation of 
> {{KubernetesResourceManager.}}
> Here are steps to reproduce this kind of problems:
>  # Mount the {{/opt/flink/log}} in the Container of TaskManager to a path on 
> the K8s nodes via HostPath, make sure that the path exists but the 
> TaskManager process has no write permission. We can achieve this via the 
> [user-specified pod template 
> support|https://issues.apache.org/jira/browse/FLINK-15656] or just hardcode 
> it for testing only.
>  # Launch a session cluster
>  # Submit a new job to the session cluster, as expected, we can observe that 
> the Pod constantly fails quickly during launching the main Container, then 
> the {{KubernetesResourceManager#onModified}} is invoked to re-create a new 
> Pod immediately, without any speed control.
> To sum up, once the {{KubernetesResourceManager}} receives the Pod *ADD* 
> event and that Pod is terminated before successfully registering into the 
> {{KubernetesResourceManager}}, the {{KubernetesResourceManager}} will send 
> another creation request to K8s API Server immediately.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wangyang0918 commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
wangyang0918 commented on a change in pull request #11323: [FLINK-16439][k8s] 
Make KubernetesResourceManager starts workers using WorkerResourceSpec 
requested by SlotManager
URL: https://github.com/apache/flink/pull/11323#discussion_r409966657
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -237,57 +230,73 @@ private void recoverWorkerNodesFromPreviousAttempts() 
throws ResourceManagerExce
++currentMaxAttemptId);
}
 
-   private void requestKubernetesPod() {
-   numPendingPodRequests++;
+   private void requestKubernetesPod(WorkerResourceSpec 
workerResourceSpec) {
+   final KubernetesTaskManagerParameters parameters =
+   
createKubernetesTaskManagerParameters(workerResourceSpec);
+
+   final KubernetesPod taskManagerPod =
+   
KubernetesTaskManagerFactory.createTaskManagerComponent(parameters);
+   kubeClient.createTaskManagerPod(taskManagerPod);
 
 Review comment:
   @tillrohrmann Sorry to jump to here. I want to add some additional 
explanations here. 
   
   Why i name it to `createTaskManagerPod` instead of `startTaskManagerPod`?
   * we usually use `create` in K8s commands(e.g. `kubectl create -f 
taskmanager.yaml`). The start process will be done internally and automatically 
in K8s `kubelet`. 
   
   Why do not return a `Pod` for `createTaskManagerPod`?
   * Of cause we could return a created `Pod`. However, it is not a final or 
stable state. Since the K8s will do some updates for the `Pod`, the ip address, 
the status. So i do not return the `Pod` to avoid it may be used incorrectly. 
   
   For the `KubernetesJobManagerFactory#createJobManagerComponent` and 
`KubernetesTaskManagerFactory#createTaskManagerComponent`, maybe we could 
rename them to `KubernetesJobManagerFactory#createJobManagerSpecification` and 
`KubernetesTaskManagerFactory#createTaskManagerPod`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-17196) Rework the implementation of Fabric8FlinkKubeClient#getRestEndpoint

2020-04-16 Thread Canbin Zheng (Jira)
Canbin Zheng created FLINK-17196:


 Summary: Rework the implementation of 
Fabric8FlinkKubeClient#getRestEndpoint
 Key: FLINK-17196
 URL: https://issues.apache.org/jira/browse/FLINK-17196
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / Kubernetes
Affects Versions: 1.10.0, 1.10.1
Reporter: Canbin Zheng
 Fix For: 1.11.0


This is the umbrella issue for the rework of the implementation of 
{{Fabric8FlinkKubeClient#getRestEndpoint}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-17184) Flink webui does not display received records/bytes metric when graph contains single operator

2020-04-16 Thread zck (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zck closed FLINK-17184.
---

>  Flink webui does not display received records/bytes metric when graph 
> contains single operator
> ---
>
> Key: FLINK-17184
> URL: https://issues.apache.org/jira/browse/FLINK-17184
> Project: Flink
>  Issue Type: Bug
>  Components: Project Website
>Affects Versions: 1.10.0
>Reporter: zck
>Priority: Major
> Attachments: image-2020-04-16-18-49-11-103.png
>
>
> sink_ddl
> source_ddl
> query_sql : insert into tableB select * form tableA 
> webui 没有指标数据展示
> !image-2020-04-16-18-49-11-103.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wangyang0918 commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
wangyang0918 commented on a change in pull request #11323: [FLINK-16439][k8s] 
Make KubernetesResourceManager starts workers using WorkerResourceSpec 
requested by SlotManager
URL: https://github.com/apache/flink/pull/11323#discussion_r409966657
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -237,57 +230,73 @@ private void recoverWorkerNodesFromPreviousAttempts() 
throws ResourceManagerExce
++currentMaxAttemptId);
}
 
-   private void requestKubernetesPod() {
-   numPendingPodRequests++;
+   private void requestKubernetesPod(WorkerResourceSpec 
workerResourceSpec) {
+   final KubernetesTaskManagerParameters parameters =
+   
createKubernetesTaskManagerParameters(workerResourceSpec);
+
+   final KubernetesPod taskManagerPod =
+   
KubernetesTaskManagerFactory.createTaskManagerComponent(parameters);
+   kubeClient.createTaskManagerPod(taskManagerPod);
 
 Review comment:
   Sorry to jump to here. I want to add some additional explanations here. 
   
   Why i name it to `createTaskManagerPod` instead of `startTaskManagerPod`?
   * we usually use `create` in K8s commands(e.g. `kubectl create -f 
taskmanager.yaml`). The start process will be done internally and automatically 
in K8s `kubelet`. 
   
   Why do not return a `Pod` for `createTaskManagerPod`?
   * Of cause we could return a created `Pod`. However, it is not a final or 
stable state. Since the K8s will do some updates for the `Pod`, the ip address, 
the status. So i do not return the `Pod` to avoid it may be used incorrectly. 
   
   For the `KubernetesJobManagerFactory#createJobManagerComponent` and 
`KubernetesTaskManagerFactory#createTaskManagerComponent`, maybe we could 
rename them to `KubernetesJobManagerFactory#createJobManagerSpecification` and 
`KubernetesTaskManagerFactory#createTaskManagerPod`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #11784: [FLINK-17120][python] Add Cython support for operations

2020-04-16 Thread GitBox
flinkbot commented on issue #11784: [FLINK-17120][python] Add Cython support 
for operations
URL: https://github.com/apache/flink/pull/11784#issuecomment-615011156
 
 
   
   ## CI report:
   
   * 1ee639f83adcf607b722609782be88207de5fe49 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11765: [FLINK-8864][sql-client] Add cli command history in sql client and make it available after restarts

2020-04-16 Thread GitBox
flinkbot edited a comment on issue #11765: [FLINK-8864][sql-client] Add cli 
command history in sql client and make it available after restarts
URL: https://github.com/apache/flink/pull/11765#issuecomment-614430999
 
 
   
   ## CI report:
   
   * 22eef4c43cd4fd0a6ddffb3cbef00a2fe522dac6 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160495175) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7557)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11466: [FLINK-15400][connectors / elasticsearch] elasticsearch table sink support dynamic index.

2020-04-16 Thread GitBox
flinkbot edited a comment on issue #11466:  [FLINK-15400][connectors / 
elasticsearch] elasticsearch table sink support dynamic index.
URL: https://github.com/apache/flink/pull/11466#issuecomment-601789375
 
 
   
   ## CI report:
   
   * 4c252220928052025f30b4dc4c63c0479d7c20cd UNKNOWN
   * c8f8eb3c22bfdd4f82204d281b6f7c5ebbefb769 Travis: 
[CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/160662143) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7616)
 
   * c015b198415d8077f1782e949036cb07bbaa044c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.

2020-04-16 Thread GitBox
flinkbot edited a comment on issue #11749: [FLINK-16669][python][table] Support 
Python UDF in SQL function DDL.
URL: https://github.com/apache/flink/pull/11749#issuecomment-613901508
 
 
   
   ## CI report:
   
   * d2603c1936ff28ab745c711224857bba0693edd8 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/160553016) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7601)
 
   * 7cbe2380ac7246cf703397d9a78d002c0491a959 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160664905) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7621)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11010: [FLINK-15836][k8s] Throw fatal error in KubernetesResourceManager when the pods watcher is closed with exception

2020-04-16 Thread GitBox
flinkbot edited a comment on issue #11010: [FLINK-15836][k8s] Throw fatal error 
in KubernetesResourceManager when the pods watcher is closed with exception
URL: https://github.com/apache/flink/pull/11010#issuecomment-581769412
 
 
   
   ## CI report:
   
   * 2a53bac20fd0c35c38575280744968501d14 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/160493463) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7556)
 
   * 1130280dfe557a0268fed248089fb32d15f832d9 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160664763) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7620)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-17195) Worker threads for source function have wrong ClassLoader

2020-04-16 Thread Teng Fei Liao (Jira)
Teng Fei Liao created FLINK-17195:
-

 Summary: Worker threads for source function have wrong ClassLoader
 Key: FLINK-17195
 URL: https://issues.apache.org/jira/browse/FLINK-17195
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends, Table SQL / Planner
Affects Versions: 1.10.0
Reporter: Teng Fei Liao


Hey, in one of my SourceFunctions, I'm calling 
ServiceLoader.load(MyDefiniedService.class) which internally fetches the 
ClassLoader by calling Thread.currentThread().getContextClassLoader(). It seems 
like whichever ClassLoader this fetches is unaware of any jars/code that I've 
shipped.

My application code has not changed and this came up in my attempt switch 
planners from the old planner to the blink planner. Would appreciate any help I 
can get on this, thanks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17176) Slow down Pod re-creation in KubernetesResourceManager#PodCallbackHandler

2020-04-16 Thread Canbin Zheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17085383#comment-17085383
 ] 

Canbin Zheng commented on FLINK-17176:
--

Of course, they should share the same retry logic, and this ticket could be a 
subtask of {color:#4a6ee0}FLINK-17127{color}

> Slow down Pod re-creation in KubernetesResourceManager#PodCallbackHandler
> -
>
> Key: FLINK-17176
> URL: https://issues.apache.org/jira/browse/FLINK-17176
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes
>Affects Versions: 1.10.0
>Reporter: Canbin Zheng
>Priority: Major
> Fix For: 1.11.0
>
>
> In the native K8s setups, there are some cases that we do not control the 
> speed of pod re-creation which poses potential risks to flood the K8s API 
> Server in the {{PodCallbackHandler}} implementation of 
> {{KubernetesResourceManager.}}
> Here are steps to reproduce this kind of problems:
>  # Mount the {{/opt/flink/log}} in the Container of TaskManager to a path on 
> the K8s nodes via HostPath, make sure that the path exists but the 
> TaskManager process has no write permission. We can achieve this via the 
> [user-specified pod template 
> support|https://issues.apache.org/jira/browse/FLINK-15656] or just hardcode 
> it for testing only.
>  # Launch a session cluster
>  # Submit a new job to the session cluster, as expected, we can observe that 
> the Pod constantly fails quickly during launching the main Container, then 
> the {{KubernetesResourceManager#onModified}} is invoked to re-create a new 
> Pod immediately, without any speed control.
> To sum up, once the {{KubernetesResourceManager}} receives the Pod *ADD* 
> event and that Pod is terminated before successfully registering into the 
> {{KubernetesResourceManager}}, the {{KubernetesResourceManager}} will send 
> another creation request to K8s API Server immediately.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15795) KubernetesClusterDescriptor seems to report unreachable JobManager interface URL

2020-04-16 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17085379#comment-17085379
 ] 

Yang Wang commented on FLINK-15795:
---

[~felixzheng] I post the yesterday discussion[1] here since this ticket is 
related. If you want to completely separate the {{getRestEndpoint}} for 
{{LoadBalancer}} and {{NodePort}}, this ticket should be taken into account.

 

Just to make sure that we print a proper and accessible JobManager url in the 
Flink client log. Given that the rest port is configured to 8081.
 * For LoadBalancer, it should be {{LB_IP:8081}}. If we do not get a LB_IP, 
then it will great if a warning log shows up.
 * For NodePort, it should be {{K8S_MASTER:NodePort}} or {{K8s_NODE:NodePort}}.

 

[1]. https://github.com/apache/flink/pull/11715#discussion_r409295087

> KubernetesClusterDescriptor seems to report unreachable JobManager interface 
> URL
> 
>
> Key: FLINK-15795
> URL: https://issues.apache.org/jira/browse/FLINK-15795
> Project: Flink
>  Issue Type: Sub-task
>  Components: Deployment / Kubernetes
>Affects Versions: 1.10.0
>Reporter: Till Rohrmann
>Priority: Minor
> Fix For: 1.10.1, 1.11.0
>
>
> The {{KubernetesClusterDescriptor}} reports the {{JobManager}} web interface 
> URL when deploying the cluster. In my case (using almost the standard 
> configuration), it reported an unreachable URL which consisted of the K8s 
> cluster's endpoint IP. This might be a configuration problem of the K8s 
> cluster but at the same time, the descriptor deployed a LoadBalancer (default 
> configuration) through which one could reach the web UI. In this case, I 
> would have wished that the {{LoadBalancer's}} IP and port is reported instead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] godfreyhe edited a comment on issue #11765: [FLINK-8864][sql-client] Add cli command history in sql client and make it available after restarts

2020-04-16 Thread GitBox
godfreyhe edited a comment on issue #11765: [FLINK-8864][sql-client] Add cli 
command history in sql client and make it available after restarts
URL: https://github.com/apache/flink/pull/11765#issuecomment-615007657
 
 
   I think we can learn from 
https://github.com/oncewang/hive/blob/master/beeline/src/test/org/apache/hive/beeline/TestBeeLineHistory.java
   
   There are some tests based on `Terminal` in `CliClientTest`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on issue #11765: [FLINK-8864][sql-client] Add cli command history in sql client and make it available after restarts

2020-04-16 Thread GitBox
godfreyhe commented on issue #11765: [FLINK-8864][sql-client] Add cli command 
history in sql client and make it available after restarts
URL: https://github.com/apache/flink/pull/11765#issuecomment-615007657
 
 
   I think we can learn from 
https://github.com/oncewang/hive/blob/master/beeline/src/test/org/apache/hive/beeline/TestBeeLineHistory.java


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11782: [FLINK-15801] fix Timestamp extractor created from properties does no…

2020-04-16 Thread GitBox
flinkbot edited a comment on issue #11782: [FLINK-15801] fix Timestamp 
extractor created from properties does no…
URL: https://github.com/apache/flink/pull/11782#issuecomment-614731103
 
 
   
   ## CI report:
   
   * 3ba08d20930dc7f1bfa750c2e11323ffbd67 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/160578468) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7607)
 
   * de8e893e40c189d6aaa3de4330df4c3065b8ae62 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160663230) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7619)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11765: [FLINK-8864][sql-client] Add cli command history in sql client and make it available after restarts

2020-04-16 Thread GitBox
flinkbot edited a comment on issue #11765: [FLINK-8864][sql-client] Add cli 
command history in sql client and make it available after restarts
URL: https://github.com/apache/flink/pull/11765#issuecomment-614430999
 
 
   
   ## CI report:
   
   * 22eef4c43cd4fd0a6ddffb3cbef00a2fe522dac6 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160495175) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7557)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   5   6   7   >