[GitHub] [flink] xintongsong commented on issue #11545: [FLINK-16742][runtime][dist] Extend and use BashJavaUtils to start JM JVM process and pass JVM memory args
xintongsong commented on issue #11545: [FLINK-16742][runtime][dist] Extend and use BashJavaUtils to start JM JVM process and pass JVM memory args URL: https://github.com/apache/flink/pull/11545#issuecomment-615058632 Rebased onto #11577. A few changes during the rebasing: - Pass dynamic properties to `BashJavaUtils` for JM of standalone job clusters. (JM of standalone session cluster does not support dynamic properties ATM.) - Add test case for getting JM resource parameters from `BashJavaUtils` with dynamic properties. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (FLINK-17199) Use NeverReturnExpired strategy for states in blink streaming join.
[ https://issues.apache.org/jira/browse/FLINK-17199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Benchao Li resolved FLINK-17199. Resolution: Duplicate > Use NeverReturnExpired strategy for states in blink streaming join. > --- > > Key: FLINK-17199 > URL: https://issues.apache.org/jira/browse/FLINK-17199 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.9.2, 1.10.0 >Reporter: Benchao Li >Priority: Major > > Currently we use \{{ReturnExpiredIfNotCleanedUp}} for states in blink > streaming join operators, which is not very intuitive for users to understand > the behavior. > IMO, \{{NeverReturnExpired}} is more straight forward, and we should change > to it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17199) Use NeverReturnExpired strategy for states in blink streaming join.
[ https://issues.apache.org/jira/browse/FLINK-17199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17085456#comment-17085456 ] Benchao Li commented on FLINK-17199: Yes. Thanks for the reminder. > Use NeverReturnExpired strategy for states in blink streaming join. > --- > > Key: FLINK-17199 > URL: https://issues.apache.org/jira/browse/FLINK-17199 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.9.2, 1.10.0 >Reporter: Benchao Li >Priority: Major > > Currently we use \{{ReturnExpiredIfNotCleanedUp}} for states in blink > streaming join operators, which is not very intuitive for users to understand > the behavior. > IMO, \{{NeverReturnExpired}} is more straight forward, and we should change > to it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17200) Add connectors to ClickHouse
[ https://issues.apache.org/jira/browse/FLINK-17200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jinhai updated FLINK-17200: --- Description: Clickhouse is a powerful OLAP query engine and supports real-time data mini batch writing. We can add flink connectors to ClickHouse [weisite: https://clickhouse.tech/|https://clickhouse.tech/] > Add connectors to ClickHouse > > > Key: FLINK-17200 > URL: https://issues.apache.org/jira/browse/FLINK-17200 > Project: Flink > Issue Type: New Feature >Reporter: jinhai >Priority: Major > > Clickhouse is a powerful OLAP query engine and supports real-time data mini > batch writing. > We can add flink connectors to ClickHouse > [weisite: https://clickhouse.tech/|https://clickhouse.tech/] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] xintongsong commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager
xintongsong commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11323#discussion_r410006159 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -237,57 +230,73 @@ private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerExce ++currentMaxAttemptId); } - private void requestKubernetesPod() { - numPendingPodRequests++; + private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { + final KubernetesTaskManagerParameters parameters = + createKubernetesTaskManagerParameters(workerResourceSpec); + + final KubernetesPod taskManagerPod = + KubernetesTaskManagerFactory.createTaskManagerComponent(parameters); + kubeClient.createTaskManagerPod(taskManagerPod); + + podWorkerResources.put(parameters.getPodName(), workerResourceSpec); + final int pendingWorkerNum = notifyNewWorkerRequested(workerResourceSpec); log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", - defaultMemoryMB, - defaultCpus, - numPendingPodRequests); + parameters.getTaskManagerMemoryMB(), + parameters.getTaskManagerCPU(), + pendingWorkerNum); + log.info("TaskManager {} will be started with {}.", parameters.getPodName(), workerResourceSpec); + } + + private KubernetesTaskManagerParameters createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) { + final TaskExecutorProcessSpec taskExecutorProcessSpec = + TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, workerResourceSpec); final String podName = String.format( TASK_MANAGER_POD_FORMAT, clusterId, currentMaxAttemptId, ++currentMaxPodId); + final ContaineredTaskManagerParameters taskManagerParameters = + ContaineredTaskManagerParameters.create(flinkConfig, taskExecutorProcessSpec); + final String dynamicProperties = BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig); - final KubernetesTaskManagerParameters kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters( + return new KubernetesTaskManagerParameters( flinkConfig, podName, dynamicProperties, taskManagerParameters); - - final KubernetesPod taskManagerPod = - KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters); - - log.info("TaskManager {} will be started with {}.", podName, taskExecutorProcessSpec); - kubeClient.createTaskManagerPod(taskManagerPod); } /** * Request new pod if pending pods cannot satisfy pending slot requests. */ - private void requestKubernetesPodIfRequired() { - final int requiredTaskManagers = getNumberRequiredTaskManagers(); + private void requestKubernetesPodIfRequired(WorkerResourceSpec workerResourceSpec) { + final int pendingWorkerNum = getNumPendingWorkersFor(workerResourceSpec); + int requiredTaskManagers = getRequiredResources().get(workerResourceSpec); - while (requiredTaskManagers > numPendingPodRequests) { - requestKubernetesPod(); + while (requiredTaskManagers-- > pendingWorkerNum) { + requestKubernetesPod(workerResourceSpec); } } private void removePodIfTerminated(KubernetesPod pod) { if (pod.isTerminated()) { kubeClient.stopPod(pod.getName()); Review comment: It seems there are two issues. 1. `numPendingPodRequests`/`PendingWorkerCounter` may go out of sync, if pod is stopped in `onError` before `onAdded` is called. 2. Whether should we remove the pod or not when `onError` is called. I would suggest the following. - Try to resolve (1) in this PR, based on the current behavior that pod will be removed when `onError` is called. We can check whether the pod is requested in the current attempt by looking up in `podWorkerResources`, and whether the `onAdded` has been called by looking up in `workerNodes`. If the pod is requested in the current attempt and `onAdded` is not
[jira] [Commented] (FLINK-17201) Implement Streaming ClickHouseSink
[ https://issues.apache.org/jira/browse/FLINK-17201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17085454#comment-17085454 ] liufangliang commented on FLINK-17201: -- Amazing > Implement Streaming ClickHouseSink > -- > > Key: FLINK-17201 > URL: https://issues.apache.org/jira/browse/FLINK-17201 > Project: Flink > Issue Type: New Feature >Reporter: jinhai >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17203) Add metrics for ClickHouse sink
jinhai created FLINK-17203: -- Summary: Add metrics for ClickHouse sink Key: FLINK-17203 URL: https://issues.apache.org/jira/browse/FLINK-17203 Project: Flink Issue Type: New Feature Reporter: jinhai -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17202) Add SQL for ClickHouse connector
jinhai created FLINK-17202: -- Summary: Add SQL for ClickHouse connector Key: FLINK-17202 URL: https://issues.apache.org/jira/browse/FLINK-17202 Project: Flink Issue Type: New Feature Reporter: jinhai -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zhengcanbin commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager
zhengcanbin commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11323#discussion_r410001005 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -237,57 +230,73 @@ private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerExce ++currentMaxAttemptId); } - private void requestKubernetesPod() { - numPendingPodRequests++; + private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { + final KubernetesTaskManagerParameters parameters = + createKubernetesTaskManagerParameters(workerResourceSpec); + + final KubernetesPod taskManagerPod = + KubernetesTaskManagerFactory.createTaskManagerComponent(parameters); + kubeClient.createTaskManagerPod(taskManagerPod); + + podWorkerResources.put(parameters.getPodName(), workerResourceSpec); + final int pendingWorkerNum = notifyNewWorkerRequested(workerResourceSpec); log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", - defaultMemoryMB, - defaultCpus, - numPendingPodRequests); + parameters.getTaskManagerMemoryMB(), + parameters.getTaskManagerCPU(), + pendingWorkerNum); + log.info("TaskManager {} will be started with {}.", parameters.getPodName(), workerResourceSpec); + } + + private KubernetesTaskManagerParameters createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) { + final TaskExecutorProcessSpec taskExecutorProcessSpec = + TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, workerResourceSpec); final String podName = String.format( TASK_MANAGER_POD_FORMAT, clusterId, currentMaxAttemptId, ++currentMaxPodId); + final ContaineredTaskManagerParameters taskManagerParameters = + ContaineredTaskManagerParameters.create(flinkConfig, taskExecutorProcessSpec); + final String dynamicProperties = BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig); - final KubernetesTaskManagerParameters kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters( + return new KubernetesTaskManagerParameters( flinkConfig, podName, dynamicProperties, taskManagerParameters); - - final KubernetesPod taskManagerPod = - KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters); - - log.info("TaskManager {} will be started with {}.", podName, taskExecutorProcessSpec); - kubeClient.createTaskManagerPod(taskManagerPod); } /** * Request new pod if pending pods cannot satisfy pending slot requests. */ - private void requestKubernetesPodIfRequired() { - final int requiredTaskManagers = getNumberRequiredTaskManagers(); + private void requestKubernetesPodIfRequired(WorkerResourceSpec workerResourceSpec) { + final int pendingWorkerNum = getNumPendingWorkersFor(workerResourceSpec); + int requiredTaskManagers = getRequiredResources().get(workerResourceSpec); - while (requiredTaskManagers > numPendingPodRequests) { - requestKubernetesPod(); + while (requiredTaskManagers-- > pendingWorkerNum) { + requestKubernetesPod(workerResourceSpec); } } private void removePodIfTerminated(KubernetesPod pod) { if (pod.isTerminated()) { kubeClient.stopPod(pod.getName()); Review comment: > Then isn't `numPendingPodRequests` already possibly go out of sync before this PR? Do we observe any cases? Yes, I noticed this problem when I dived into `KubernetesResourceManager` yesterday, it is possible that we do not receive an `ADD` Event(we met this kind of problem in our K8s production env, but not in Flink so far) even if the Pod has been deployed successfully on the K8s cluster. > @zhengcanbin Would it be possible that a pod is never successfully started, that we received `onError` but never `onAdded`, `onModified` or `onDeleted`? Yeah. It could possibly happen if something goes wrong in the
[GitHub] [flink] zhengcanbin commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager
zhengcanbin commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11323#discussion_r409991098 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -237,57 +230,73 @@ private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerExce ++currentMaxAttemptId); } - private void requestKubernetesPod() { - numPendingPodRequests++; + private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { + final KubernetesTaskManagerParameters parameters = + createKubernetesTaskManagerParameters(workerResourceSpec); + + final KubernetesPod taskManagerPod = + KubernetesTaskManagerFactory.createTaskManagerComponent(parameters); + kubeClient.createTaskManagerPod(taskManagerPod); + + podWorkerResources.put(parameters.getPodName(), workerResourceSpec); + final int pendingWorkerNum = notifyNewWorkerRequested(workerResourceSpec); log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", - defaultMemoryMB, - defaultCpus, - numPendingPodRequests); + parameters.getTaskManagerMemoryMB(), + parameters.getTaskManagerCPU(), + pendingWorkerNum); + log.info("TaskManager {} will be started with {}.", parameters.getPodName(), workerResourceSpec); + } + + private KubernetesTaskManagerParameters createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) { + final TaskExecutorProcessSpec taskExecutorProcessSpec = + TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, workerResourceSpec); final String podName = String.format( TASK_MANAGER_POD_FORMAT, clusterId, currentMaxAttemptId, ++currentMaxPodId); + final ContaineredTaskManagerParameters taskManagerParameters = + ContaineredTaskManagerParameters.create(flinkConfig, taskExecutorProcessSpec); + final String dynamicProperties = BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig); - final KubernetesTaskManagerParameters kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters( + return new KubernetesTaskManagerParameters( flinkConfig, podName, dynamicProperties, taskManagerParameters); - - final KubernetesPod taskManagerPod = - KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters); - - log.info("TaskManager {} will be started with {}.", podName, taskExecutorProcessSpec); - kubeClient.createTaskManagerPod(taskManagerPod); } /** * Request new pod if pending pods cannot satisfy pending slot requests. */ - private void requestKubernetesPodIfRequired() { - final int requiredTaskManagers = getNumberRequiredTaskManagers(); + private void requestKubernetesPodIfRequired(WorkerResourceSpec workerResourceSpec) { + final int pendingWorkerNum = getNumPendingWorkersFor(workerResourceSpec); + int requiredTaskManagers = getRequiredResources().get(workerResourceSpec); - while (requiredTaskManagers > numPendingPodRequests) { - requestKubernetesPod(); + while (requiredTaskManagers-- > pendingWorkerNum) { + requestKubernetesPod(workerResourceSpec); } } private void removePodIfTerminated(KubernetesPod pod) { if (pod.isTerminated()) { kubeClient.stopPod(pod.getName()); Review comment: > So this means that `KubernetesResourceManager.onError` will only be called if `onAdded` has been called before? I guess this is also a question for @wangyang0918. There is no guarantee for this, a case in K8s client-go: `ERROR` could be thrown in https://github.com/kubernetes/kubernetes/blob/343c1e7636fe5c75cdd378c0b170b26935806de5/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go#L121 `ERROR` is an HTTP exception that rarely happens and we shouldn't rely on it to handle Pod failure. Once a Pod is terminated, whether or not exited normally, it is expected we would receive a `MODIFIED` instead of
[jira] [Created] (FLINK-17201) Implement Streaming ClickHouseSink
jinhai created FLINK-17201: -- Summary: Implement Streaming ClickHouseSink Key: FLINK-17201 URL: https://issues.apache.org/jira/browse/FLINK-17201 Project: Flink Issue Type: New Feature Reporter: jinhai -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17200) Add connectors to ClickHouse
jinhai created FLINK-17200: -- Summary: Add connectors to ClickHouse Key: FLINK-17200 URL: https://issues.apache.org/jira/browse/FLINK-17200 Project: Flink Issue Type: New Feature Reporter: jinhai -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zhengcanbin commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager
zhengcanbin commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11323#discussion_r410001005 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -237,57 +230,73 @@ private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerExce ++currentMaxAttemptId); } - private void requestKubernetesPod() { - numPendingPodRequests++; + private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { + final KubernetesTaskManagerParameters parameters = + createKubernetesTaskManagerParameters(workerResourceSpec); + + final KubernetesPod taskManagerPod = + KubernetesTaskManagerFactory.createTaskManagerComponent(parameters); + kubeClient.createTaskManagerPod(taskManagerPod); + + podWorkerResources.put(parameters.getPodName(), workerResourceSpec); + final int pendingWorkerNum = notifyNewWorkerRequested(workerResourceSpec); log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", - defaultMemoryMB, - defaultCpus, - numPendingPodRequests); + parameters.getTaskManagerMemoryMB(), + parameters.getTaskManagerCPU(), + pendingWorkerNum); + log.info("TaskManager {} will be started with {}.", parameters.getPodName(), workerResourceSpec); + } + + private KubernetesTaskManagerParameters createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) { + final TaskExecutorProcessSpec taskExecutorProcessSpec = + TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, workerResourceSpec); final String podName = String.format( TASK_MANAGER_POD_FORMAT, clusterId, currentMaxAttemptId, ++currentMaxPodId); + final ContaineredTaskManagerParameters taskManagerParameters = + ContaineredTaskManagerParameters.create(flinkConfig, taskExecutorProcessSpec); + final String dynamicProperties = BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig); - final KubernetesTaskManagerParameters kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters( + return new KubernetesTaskManagerParameters( flinkConfig, podName, dynamicProperties, taskManagerParameters); - - final KubernetesPod taskManagerPod = - KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters); - - log.info("TaskManager {} will be started with {}.", podName, taskExecutorProcessSpec); - kubeClient.createTaskManagerPod(taskManagerPod); } /** * Request new pod if pending pods cannot satisfy pending slot requests. */ - private void requestKubernetesPodIfRequired() { - final int requiredTaskManagers = getNumberRequiredTaskManagers(); + private void requestKubernetesPodIfRequired(WorkerResourceSpec workerResourceSpec) { + final int pendingWorkerNum = getNumPendingWorkersFor(workerResourceSpec); + int requiredTaskManagers = getRequiredResources().get(workerResourceSpec); - while (requiredTaskManagers > numPendingPodRequests) { - requestKubernetesPod(); + while (requiredTaskManagers-- > pendingWorkerNum) { + requestKubernetesPod(workerResourceSpec); } } private void removePodIfTerminated(KubernetesPod pod) { if (pod.isTerminated()) { kubeClient.stopPod(pod.getName()); Review comment: > Then isn't `numPendingPodRequests` already possibly go out of sync before this PR? Do we observe any cases? Yes, I noticed this problem when I dived into `KubernetesResourceManager` yesterday, it is possible that we do not receive an `ADD` Event(we met this kind of problem in our K8s production env, but not in Flink so far) even if the Pod has been deployed successfully on the K8s cluster. > @zhengcanbin Would it be possible that a pod is never successfully started, that we received `onError` but never `onAdded`, `onModified` or `onDeleted`? It is possible if something goes wrong in the HTTP stream and the
[jira] [Commented] (FLINK-17199) Use NeverReturnExpired strategy for states in blink streaming join.
[ https://issues.apache.org/jira/browse/FLINK-17199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17085446#comment-17085446 ] Jark Wu commented on FLINK-17199: - I think we will change this behavior in FLINK-16581? > Use NeverReturnExpired strategy for states in blink streaming join. > --- > > Key: FLINK-17199 > URL: https://issues.apache.org/jira/browse/FLINK-17199 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.9.2, 1.10.0 >Reporter: Benchao Li >Priority: Major > > Currently we use \{{ReturnExpiredIfNotCleanedUp}} for states in blink > streaming join operators, which is not very intuitive for users to understand > the behavior. > IMO, \{{NeverReturnExpired}} is more straight forward, and we should change > to it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink.
flinkbot edited a comment on issue #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink. URL: https://github.com/apache/flink/pull/11768#issuecomment-614482552 ## CI report: * a3aa6b6cab6c0a7e0e26c0cf3ef0e38f265e8799 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160507586) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7573) * b796de76daa408c58f6aa13391c93f1a1e88bb57 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160674334) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7624) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-16104) Translate "Streaming Aggregation" page of "Table API & SQL" into Chinese
[ https://issues.apache.org/jira/browse/FLINK-16104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-16104: --- Assignee: ChaojianZhang > Translate "Streaming Aggregation" page of "Table API & SQL" into Chinese > - > > Key: FLINK-16104 > URL: https://issues.apache.org/jira/browse/FLINK-16104 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: ChaojianZhang >Priority: Major > > The page url is > https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/tuning/streaming_aggregation_optimization.html > The markdown file is located in > {{flink/docs/dev/table/tuning/streaming_aggregation_optimization.zh.md}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-17177) Handle ERROR event correctly in KubernetesResourceManager#onError
[ https://issues.apache.org/jira/browse/FLINK-17177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17085442#comment-17085442 ] Canbin Zheng edited comment on FLINK-17177 at 4/17/20, 5:22 AM: {quote}I post the {{WatchEvent}} in K8s here[1]. I do not find the "Error" type means "HTTP error". So could share some information about how the "Error" type is introduced by HTTP layer error? [1]. [https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.10/#watchevent-v1-meta] {quote} One case in K8s client-go is [https://github.com/kubernetes/kubernetes/blob/343c1e7636fe5c75cdd378c0b170b26935806de5/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go#L121] Also, the K8s server could probably send an {{ERROR}} event if something goes wrong in the HTTP stream. was (Author: felixzheng): {quote}I post the {{WatchEvent}} in K8s here[1]. I do not find the "Error" type means "HTTP error". So could share some information about how the "Error" type is introduced by HTTP layer error? [1]. [https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.10/#watchevent-v1-meta] {quote} One case in K8s client-go is [https://github.com/kubernetes/kubernetes/blob/343c1e7636fe5c75cdd378c0b170b26935806de5/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go#L121] Also, the K8s server could probably send {{ERROR}} event if something goes wrong in the HTTP stream. > Handle ERROR event correctly in KubernetesResourceManager#onError > - > > Key: FLINK-17177 > URL: https://issues.apache.org/jira/browse/FLINK-17177 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes >Affects Versions: 1.10.0, 1.10.1 >Reporter: Canbin Zheng >Priority: Major > Fix For: 1.11.0 > > > Currently, once we receive an *ERROR* event that is sent from the K8s API > server via the K8s {{Watcher}}, then {{KubernetesResourceManager#onError}} > will handle it by calling the > {{KubernetesResourceManager#removePodIfTerminated}}. This may be incorrect > since the *ERROR* event indicates an exception in the HTTP layer that is > caused by the K8s Server, which means the previously created {{Watcher}} may > be no longer available and we'd better re-create the {{Watcher}} immediately. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17177) Handle ERROR event correctly in KubernetesResourceManager#onError
[ https://issues.apache.org/jira/browse/FLINK-17177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17085442#comment-17085442 ] Canbin Zheng commented on FLINK-17177: -- {quote}I post the {{WatchEvent}} in K8s here[1]. I do not find the "Error" type means "HTTP error". So could share some information about how the "Error" type is introduced by HTTP layer error? [1]. [https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.10/#watchevent-v1-meta] {quote} One case in K8s client-go is [https://github.com/kubernetes/kubernetes/blob/343c1e7636fe5c75cdd378c0b170b26935806de5/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go#L121] Also, the K8s server could probably send {{ERROR}} event if something goes wrong in the HTTP stream. > Handle ERROR event correctly in KubernetesResourceManager#onError > - > > Key: FLINK-17177 > URL: https://issues.apache.org/jira/browse/FLINK-17177 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes >Affects Versions: 1.10.0, 1.10.1 >Reporter: Canbin Zheng >Priority: Major > Fix For: 1.11.0 > > > Currently, once we receive an *ERROR* event that is sent from the K8s API > server via the K8s {{Watcher}}, then {{KubernetesResourceManager#onError}} > will handle it by calling the > {{KubernetesResourceManager#removePodIfTerminated}}. This may be incorrect > since the *ERROR* event indicates an exception in the HTTP layer that is > caused by the K8s Server, which means the previously created {{Watcher}} may > be no longer available and we'd better re-create the {{Watcher}} immediately. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] xintongsong commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager
xintongsong commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11323#discussion_r409997707 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -237,57 +230,73 @@ private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerExce ++currentMaxAttemptId); } - private void requestKubernetesPod() { - numPendingPodRequests++; + private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { + final KubernetesTaskManagerParameters parameters = + createKubernetesTaskManagerParameters(workerResourceSpec); + + final KubernetesPod taskManagerPod = + KubernetesTaskManagerFactory.createTaskManagerComponent(parameters); + kubeClient.createTaskManagerPod(taskManagerPod); + + podWorkerResources.put(parameters.getPodName(), workerResourceSpec); + final int pendingWorkerNum = notifyNewWorkerRequested(workerResourceSpec); log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", - defaultMemoryMB, - defaultCpus, - numPendingPodRequests); + parameters.getTaskManagerMemoryMB(), + parameters.getTaskManagerCPU(), + pendingWorkerNum); + log.info("TaskManager {} will be started with {}.", parameters.getPodName(), workerResourceSpec); + } + + private KubernetesTaskManagerParameters createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) { + final TaskExecutorProcessSpec taskExecutorProcessSpec = + TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, workerResourceSpec); final String podName = String.format( TASK_MANAGER_POD_FORMAT, clusterId, currentMaxAttemptId, ++currentMaxPodId); + final ContaineredTaskManagerParameters taskManagerParameters = + ContaineredTaskManagerParameters.create(flinkConfig, taskExecutorProcessSpec); + final String dynamicProperties = BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig); - final KubernetesTaskManagerParameters kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters( + return new KubernetesTaskManagerParameters( flinkConfig, podName, dynamicProperties, taskManagerParameters); - - final KubernetesPod taskManagerPod = - KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters); - - log.info("TaskManager {} will be started with {}.", podName, taskExecutorProcessSpec); - kubeClient.createTaskManagerPod(taskManagerPod); } /** * Request new pod if pending pods cannot satisfy pending slot requests. */ - private void requestKubernetesPodIfRequired() { - final int requiredTaskManagers = getNumberRequiredTaskManagers(); + private void requestKubernetesPodIfRequired(WorkerResourceSpec workerResourceSpec) { + final int pendingWorkerNum = getNumPendingWorkersFor(workerResourceSpec); + int requiredTaskManagers = getRequiredResources().get(workerResourceSpec); - while (requiredTaskManagers > numPendingPodRequests) { - requestKubernetesPod(); + while (requiredTaskManagers-- > pendingWorkerNum) { + requestKubernetesPod(workerResourceSpec); } } private void removePodIfTerminated(KubernetesPod pod) { if (pod.isTerminated()) { kubeClient.stopPod(pod.getName()); Review comment: It seems I was wrong about `onError` always called after `onAdded`. Then isn't `numPendingPodRequests` already possibly go out of sync before this PR? Do we observe any cases? @zhengcanbin Would it be possible that a pod is never successfully started, that we received `onError` but never `onAdded`, `onModified` or `onDeleted`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at:
[GitHub] [flink] zhengcanbin commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager
zhengcanbin commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11323#discussion_r409991098 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -237,57 +230,73 @@ private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerExce ++currentMaxAttemptId); } - private void requestKubernetesPod() { - numPendingPodRequests++; + private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { + final KubernetesTaskManagerParameters parameters = + createKubernetesTaskManagerParameters(workerResourceSpec); + + final KubernetesPod taskManagerPod = + KubernetesTaskManagerFactory.createTaskManagerComponent(parameters); + kubeClient.createTaskManagerPod(taskManagerPod); + + podWorkerResources.put(parameters.getPodName(), workerResourceSpec); + final int pendingWorkerNum = notifyNewWorkerRequested(workerResourceSpec); log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", - defaultMemoryMB, - defaultCpus, - numPendingPodRequests); + parameters.getTaskManagerMemoryMB(), + parameters.getTaskManagerCPU(), + pendingWorkerNum); + log.info("TaskManager {} will be started with {}.", parameters.getPodName(), workerResourceSpec); + } + + private KubernetesTaskManagerParameters createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) { + final TaskExecutorProcessSpec taskExecutorProcessSpec = + TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, workerResourceSpec); final String podName = String.format( TASK_MANAGER_POD_FORMAT, clusterId, currentMaxAttemptId, ++currentMaxPodId); + final ContaineredTaskManagerParameters taskManagerParameters = + ContaineredTaskManagerParameters.create(flinkConfig, taskExecutorProcessSpec); + final String dynamicProperties = BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig); - final KubernetesTaskManagerParameters kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters( + return new KubernetesTaskManagerParameters( flinkConfig, podName, dynamicProperties, taskManagerParameters); - - final KubernetesPod taskManagerPod = - KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters); - - log.info("TaskManager {} will be started with {}.", podName, taskExecutorProcessSpec); - kubeClient.createTaskManagerPod(taskManagerPod); } /** * Request new pod if pending pods cannot satisfy pending slot requests. */ - private void requestKubernetesPodIfRequired() { - final int requiredTaskManagers = getNumberRequiredTaskManagers(); + private void requestKubernetesPodIfRequired(WorkerResourceSpec workerResourceSpec) { + final int pendingWorkerNum = getNumPendingWorkersFor(workerResourceSpec); + int requiredTaskManagers = getRequiredResources().get(workerResourceSpec); - while (requiredTaskManagers > numPendingPodRequests) { - requestKubernetesPod(); + while (requiredTaskManagers-- > pendingWorkerNum) { + requestKubernetesPod(workerResourceSpec); } } private void removePodIfTerminated(KubernetesPod pod) { if (pod.isTerminated()) { kubeClient.stopPod(pod.getName()); Review comment: > So this means that `KubernetesResourceManager.onError` will only be called if `onAdded` has been called before? I guess this is also a question for @wangyang0918. There is no guarantee for this, a case in K8s client-go: `ERROR` could be thrown in https://github.com/kubernetes/kubernetes/blob/343c1e7636fe5c75cdd378c0b170b26935806de5/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go#L121 `ERROR` is an HTTP exception that rarely happens and we shouldn't rely on it to handle Pod failure. Once a Pod is terminated, whether or not exited normally, it is expected we would receive a `MODIFIED` event.
[GitHub] [flink] flinkbot edited a comment on issue #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
flinkbot edited a comment on issue #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL. URL: https://github.com/apache/flink/pull/11749#issuecomment-613901508 ## CI report: * 7cbe2380ac7246cf703397d9a78d002c0491a959 Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/160664905) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7621) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11615: [FLINK-16605] Add max limitation to the total number of slots
flinkbot edited a comment on issue #11615: [FLINK-16605] Add max limitation to the total number of slots URL: https://github.com/apache/flink/pull/11615#issuecomment-607717339 ## CI report: * 2db316d977d0e790de8fa98b27dd219b68abb136 UNKNOWN * 0b0111ba893c9ecc7632394fc59e455f8d5c9db7 UNKNOWN * cf7728e0b162ed6bb27c75aa2d02ca9be7e2a6c4 Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/160663186) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7618) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink.
flinkbot edited a comment on issue #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink. URL: https://github.com/apache/flink/pull/11768#issuecomment-614482552 ## CI report: * a3aa6b6cab6c0a7e0e26c0cf3ef0e38f265e8799 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160507586) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7573) * b796de76daa408c58f6aa13391c93f1a1e88bb57 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11010: [FLINK-15836][k8s] Throw fatal error in KubernetesResourceManager when the pods watcher is closed with exception
flinkbot edited a comment on issue #11010: [FLINK-15836][k8s] Throw fatal error in KubernetesResourceManager when the pods watcher is closed with exception URL: https://github.com/apache/flink/pull/11010#issuecomment-581769412 ## CI report: * 1130280dfe557a0268fed248089fb32d15f832d9 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160664763) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7620) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-17199) Use NeverReturnExpired strategy for states in blink streaming join.
Benchao Li created FLINK-17199: -- Summary: Use NeverReturnExpired strategy for states in blink streaming join. Key: FLINK-17199 URL: https://issues.apache.org/jira/browse/FLINK-17199 Project: Flink Issue Type: Improvement Components: Table SQL / Runtime Affects Versions: 1.10.0, 1.9.2 Reporter: Benchao Li Currently we use \{{ReturnExpiredIfNotCleanedUp}} for states in blink streaming join operators, which is not very intuitive for users to understand the behavior. IMO, \{{NeverReturnExpired}} is more straight forward, and we should change to it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zhijiangW commented on a change in pull request #11687: [FLINK-16536][network][checkpointing] Implement InputChannel state recovery for unaligned checkpoint
zhijiangW commented on a change in pull request #11687: [FLINK-16536][network][checkpointing] Implement InputChannel state recovery for unaligned checkpoint URL: https://github.com/apache/flink/pull/11687#discussion_r409995125 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ## @@ -213,19 +215,23 @@ public SingleInputGate( } @Override - public void setup() throws IOException, InterruptedException { + public void setup() throws IOException { Review comment: I agree that it has benefits to reduce steps for managing the lifecycle of input gate. But I thought of another potential concern if integrating it within `setup`. The previous assumption was that the output recovery should execute earlier than the input recovery in order to occupy more floating buffers from global, to get the benefit to speedup recovery process. Now the output recovery is executed by task thread during `StreamTask#beforeInvoke`. If we integrate the input recovery within `setup` process, then it would perform before output recovery to occupy more floating buffers in limited buffers environment. I also considered integrating the output recovery within `ResultPartition#setup`, but the output recovery is executed by task thread then it would block the following operations, so I wonder it might also bring potential risks to do so. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #11784: [FLINK-17120][python] Add Cython support for operations
dianfu commented on a change in pull request #11784: [FLINK-17120][python] Add Cython support for operations URL: https://github.com/apache/flink/pull/11784#discussion_r409994413 ## File path: flink-python/pyflink/fn_execution/fast_operations.pxd ## @@ -0,0 +1,48 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# cython: language_level=3 + +cimport libc.stdint + +from apache_beam.utils.windowed_value cimport WindowedValue +from apache_beam.runners.worker.operations cimport Operation +from apache_beam.coders.coder_impl cimport StreamCoderImpl, CoderImpl, OutputStream, InputStream +from pyflink.fn_execution.fast_coder_impl cimport InputStreamAndFunctionWrapper Review comment: unused import This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #11784: [FLINK-17120][python] Add Cython support for operations
dianfu commented on a change in pull request #11784: [FLINK-17120][python] Add Cython support for operations URL: https://github.com/apache/flink/pull/11784#discussion_r409991523 ## File path: flink-python/pyflink/fn_execution/coders.py ## @@ -177,6 +187,9 @@ def __init__(self, elem_coder): def _create_impl(self): return coder_impl.ArrayCoderImpl(self._elem_coder.get_impl()) +def get_impl(self): Review comment: If get_impl is already defined in the base class, it is not necessary to define it in child class again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #11784: [FLINK-17120][python] Add Cython support for operations
dianfu commented on a change in pull request #11784: [FLINK-17120][python] Add Cython support for operations URL: https://github.com/apache/flink/pull/11784#discussion_r409992838 ## File path: flink-python/pyflink/fn_execution/fast_operations.pyx ## @@ -0,0 +1,242 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# cython: language_level = 3 +# cython: infer_types = True +# cython: profile=True +# cython: boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True + +import datetime + +import cloudpickle +from apache_beam.runners.worker import bundle_processor +from apache_beam.runners.worker import operation_specs + +from pyflink.fn_execution import flink_fn_execution_pb2 +from pyflink.metrics.metricbase import GenericMetricGroup +from pyflink.serializers import PickleSerializer +from pyflink.table import FunctionContext +from pyflink.table.udf import DelegatingScalarFunction, DelegationTableFunction + +SCALAR_FUNCTION_URN = "flink:transform:scalar_function:v1" +TABLE_FUNCTION_URN = "flink:transform:table_function:v1" + +cdef class StatelessFunctionOperation(Operation): +""" +Base class of stateless function operation that will execute ScalarFunction or TableFunction for +each input element. +""" + +def __init__(self, name, spec, counter_factory, sampler, consumers): +super(StatelessFunctionOperation, self).__init__(name, spec, counter_factory, sampler) +self.consumer = consumers['output'][0] +self._value_coder_impl = self.consumer.windowed_coder.wrapped_value_coder.get_impl() +value_coder = self._value_coder_impl._value_coder +from pyflink.fn_execution.coder_impl import ArrowCoderImpl +if isinstance(value_coder, ArrowCoderImpl): +self._is_python_coder = True +else: +self._is_python_coder = False + +self.variable_dict = {} +self.user_defined_funcs = [] +self._func_num = 0 +self._constant_num = 0 +self.func = self.generate_func(self.spec.serialized_fn.udfs) +self._metric_enabled = self.spec.serialized_fn.metric_enabled +self.base_metric_group = None +if self._metric_enabled: +self.base_metric_group = GenericMetricGroup(None, None) +for user_defined_func in self.user_defined_funcs: +user_defined_func.open(FunctionContext(self.base_metric_group)) + +cpdef start(self): +with self.scoped_start_state: +super(StatelessFunctionOperation, self).start() + +cpdef finish(self): +with self.scoped_finish_state: +super(StatelessFunctionOperation, self).finish() +self._update_gauge(self.base_metric_group) + +cpdef teardown(self): +with self.scoped_finish_state: +for user_defined_func in self.user_defined_funcs: +user_defined_func.close() + +cpdef process(self, WindowedValue o): +cdef InputStreamAndFunctionWrapper wrapper +with self.scoped_process_state: +output_stream = self.consumer.output_stream +if self._is_python_coder: +self._value_coder_impl.encode_to_stream(self.func(o.value), output_stream, True) +else: +wrapper = InputStreamAndFunctionWrapper(self.func, o.value) +self._value_coder_impl.encode_to_stream(wrapper, output_stream, True) +output_stream.maybe_flush() + +cpdef monitoring_infos(self, transform_id): Review comment: progress_metrics method is missing This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #11784: [FLINK-17120][python] Add Cython support for operations
dianfu commented on a change in pull request #11784: [FLINK-17120][python] Add Cython support for operations URL: https://github.com/apache/flink/pull/11784#discussion_r409994309 ## File path: flink-python/pyflink/fn_execution/fast_operations.pxd ## @@ -0,0 +1,48 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# cython: language_level=3 + +cimport libc.stdint + +from apache_beam.utils.windowed_value cimport WindowedValue Review comment: used import This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11781: [FLINK-16770][checkpointing] Revert commits of FLINK-16945 and FLINK-14971
flinkbot edited a comment on issue #11781: [FLINK-16770][checkpointing] Revert commits of FLINK-16945 and FLINK-14971 URL: https://github.com/apache/flink/pull/11781#issuecomment-614694494 ## CI report: * c6ea0d7e853fb1b8f244f5ce756fe07348580cf6 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160567250) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7606) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on issue #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink.
WeiZhong94 commented on issue #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink. URL: https://github.com/apache/flink/pull/11768#issuecomment-615041372 @kl0u Hi Kostas, the change of `LocalExecutor` has been included in this PR. It seems all tests are passed. Please take a look. :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11782: [FLINK-15801] fix Timestamp extractor created from properties does no…
flinkbot edited a comment on issue #11782: [FLINK-15801] fix Timestamp extractor created from properties does no… URL: https://github.com/apache/flink/pull/11782#issuecomment-614731103 ## CI report: * de8e893e40c189d6aaa3de4330df4c3065b8ae62 Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/160663230) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7619) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager
flinkbot edited a comment on issue #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11323#issuecomment-595267888 ## CI report: * edd2f98c2a607655aef8823024753fccebe29a7d UNKNOWN * 260401deb92878bc2304b23a6988afe812439a7a Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160543406) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7596) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11010: [FLINK-15836][k8s] Throw fatal error in KubernetesResourceManager when the pods watcher is closed with exception
flinkbot edited a comment on issue #11010: [FLINK-15836][k8s] Throw fatal error in KubernetesResourceManager when the pods watcher is closed with exception URL: https://github.com/apache/flink/pull/11010#issuecomment-581769412 ## CI report: * 1130280dfe557a0268fed248089fb32d15f832d9 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160664763) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7620) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wangyang0918 commented on issue #11010: [FLINK-15836][k8s] Throw fatal error in KubernetesResourceManager when the pods watcher is closed with exception
wangyang0918 commented on issue #11010: [FLINK-15836][k8s] Throw fatal error in KubernetesResourceManager when the pods watcher is closed with exception URL: https://github.com/apache/flink/pull/11010#issuecomment-615040585 @tillrohrmann Could you help to review this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on issue #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink.
WeiZhong94 commented on issue #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink. URL: https://github.com/apache/flink/pull/11768#issuecomment-615039935 @dianfu Thanks for your review! I have addressed your comments in the latest commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhengcanbin commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager
zhengcanbin commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11323#discussion_r409991098 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -237,57 +230,73 @@ private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerExce ++currentMaxAttemptId); } - private void requestKubernetesPod() { - numPendingPodRequests++; + private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { + final KubernetesTaskManagerParameters parameters = + createKubernetesTaskManagerParameters(workerResourceSpec); + + final KubernetesPod taskManagerPod = + KubernetesTaskManagerFactory.createTaskManagerComponent(parameters); + kubeClient.createTaskManagerPod(taskManagerPod); + + podWorkerResources.put(parameters.getPodName(), workerResourceSpec); + final int pendingWorkerNum = notifyNewWorkerRequested(workerResourceSpec); log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", - defaultMemoryMB, - defaultCpus, - numPendingPodRequests); + parameters.getTaskManagerMemoryMB(), + parameters.getTaskManagerCPU(), + pendingWorkerNum); + log.info("TaskManager {} will be started with {}.", parameters.getPodName(), workerResourceSpec); + } + + private KubernetesTaskManagerParameters createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) { + final TaskExecutorProcessSpec taskExecutorProcessSpec = + TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, workerResourceSpec); final String podName = String.format( TASK_MANAGER_POD_FORMAT, clusterId, currentMaxAttemptId, ++currentMaxPodId); + final ContaineredTaskManagerParameters taskManagerParameters = + ContaineredTaskManagerParameters.create(flinkConfig, taskExecutorProcessSpec); + final String dynamicProperties = BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig); - final KubernetesTaskManagerParameters kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters( + return new KubernetesTaskManagerParameters( flinkConfig, podName, dynamicProperties, taskManagerParameters); - - final KubernetesPod taskManagerPod = - KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters); - - log.info("TaskManager {} will be started with {}.", podName, taskExecutorProcessSpec); - kubeClient.createTaskManagerPod(taskManagerPod); } /** * Request new pod if pending pods cannot satisfy pending slot requests. */ - private void requestKubernetesPodIfRequired() { - final int requiredTaskManagers = getNumberRequiredTaskManagers(); + private void requestKubernetesPodIfRequired(WorkerResourceSpec workerResourceSpec) { + final int pendingWorkerNum = getNumPendingWorkersFor(workerResourceSpec); + int requiredTaskManagers = getRequiredResources().get(workerResourceSpec); - while (requiredTaskManagers > numPendingPodRequests) { - requestKubernetesPod(); + while (requiredTaskManagers-- > pendingWorkerNum) { + requestKubernetesPod(workerResourceSpec); } } private void removePodIfTerminated(KubernetesPod pod) { if (pod.isTerminated()) { kubeClient.stopPod(pod.getName()); Review comment: > So this means that `KubernetesResourceManager.onError` will only be called if `onAdded` has been called before? I guess this is also a question for @wangyang0918. There is no guarantee for this, `ERROR` could be thrown in https://github.com/kubernetes/kubernetes/blob/343c1e7636fe5c75cdd378c0b170b26935806de5/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go#L121 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this
[GitHub] [flink] wangyang0918 commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager
wangyang0918 commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11323#discussion_r409989580 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -237,57 +230,73 @@ private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerExce ++currentMaxAttemptId); } - private void requestKubernetesPod() { - numPendingPodRequests++; + private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { + final KubernetesTaskManagerParameters parameters = + createKubernetesTaskManagerParameters(workerResourceSpec); + + final KubernetesPod taskManagerPod = + KubernetesTaskManagerFactory.createTaskManagerComponent(parameters); + kubeClient.createTaskManagerPod(taskManagerPod); + + podWorkerResources.put(parameters.getPodName(), workerResourceSpec); + final int pendingWorkerNum = notifyNewWorkerRequested(workerResourceSpec); log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", - defaultMemoryMB, - defaultCpus, - numPendingPodRequests); + parameters.getTaskManagerMemoryMB(), + parameters.getTaskManagerCPU(), + pendingWorkerNum); + log.info("TaskManager {} will be started with {}.", parameters.getPodName(), workerResourceSpec); + } + + private KubernetesTaskManagerParameters createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) { + final TaskExecutorProcessSpec taskExecutorProcessSpec = + TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, workerResourceSpec); final String podName = String.format( TASK_MANAGER_POD_FORMAT, clusterId, currentMaxAttemptId, ++currentMaxPodId); + final ContaineredTaskManagerParameters taskManagerParameters = + ContaineredTaskManagerParameters.create(flinkConfig, taskExecutorProcessSpec); + final String dynamicProperties = BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig); - final KubernetesTaskManagerParameters kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters( + return new KubernetesTaskManagerParameters( flinkConfig, podName, dynamicProperties, taskManagerParameters); - - final KubernetesPod taskManagerPod = - KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters); - - log.info("TaskManager {} will be started with {}.", podName, taskExecutorProcessSpec); - kubeClient.createTaskManagerPod(taskManagerPod); } /** * Request new pod if pending pods cannot satisfy pending slot requests. */ - private void requestKubernetesPodIfRequired() { - final int requiredTaskManagers = getNumberRequiredTaskManagers(); + private void requestKubernetesPodIfRequired(WorkerResourceSpec workerResourceSpec) { + final int pendingWorkerNum = getNumPendingWorkersFor(workerResourceSpec); + int requiredTaskManagers = getRequiredResources().get(workerResourceSpec); - while (requiredTaskManagers > numPendingPodRequests) { - requestKubernetesPod(); + while (requiredTaskManagers-- > pendingWorkerNum) { + requestKubernetesPod(workerResourceSpec); } } private void removePodIfTerminated(KubernetesPod pod) { if (pod.isTerminated()) { kubeClient.stopPod(pod.getName()); Review comment: I post the `WatchEvent` api in K8s document[1]. From the definition, i do not find the the guarantee that `Error` event is always returned after `Added`. We already have a [ticket](https://issues.apache.org/jira/browse/FLINK-17177) to track how to handle the `onError` event in `KubernetesResourceManager` correctly. I think we could keep the current behavior and try to find a solution in FLINK-17177. Moerover, i need to dig into the K8s ApiServer implementation and check when the `onError` `WacherEvent` could be returned. [1].
[GitHub] [flink] zhuzhurk commented on a change in pull request #11783: [FLINK-17181][runtime] Drop generic Types in SchedulingTopology Interface
zhuzhurk commented on a change in pull request #11783: [FLINK-17181][runtime] Drop generic Types in SchedulingTopology Interface URL: https://github.com/apache/flink/pull/11783#discussion_r409988049 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/topology/BaseTopology.java ## @@ -30,7 +30,7 @@ * * @return topologically sorted iterable over all vertices */ - Iterable getVertices(); + Iterable getVertices(); Review comment: I think we need to do similar changes for: `PipelinedRegion#getVertices()` `PipelinedRegion#getConsumedResults()` `Topology#getAllPipelinedRegions()` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #11783: [FLINK-17181][runtime] Drop generic Types in SchedulingTopology Interface
zhuzhurk commented on a change in pull request #11783: [FLINK-17181][runtime] Drop generic Types in SchedulingTopology Interface URL: https://github.com/apache/flink/pull/11783#discussion_r409988394 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java ## @@ -174,9 +174,9 @@ private boolean isRegionOfVertexFinished(final ExecutionVertexID executionVertex public static class Factory implements PartitionReleaseStrategy.Factory { @Override - public PartitionReleaseStrategy createInstance(final SchedulingTopology schedulingStrategy) { + public PartitionReleaseStrategy createInstance(final SchedulingTopology schedulingStrategy) { - final Set>> distinctRegions = + final Set> distinctRegions = Review comment: Now it can be simplified to `Set>`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #11783: [FLINK-17181][runtime] Drop generic Types in SchedulingTopology Interface
zhuzhurk commented on a change in pull request #11783: [FLINK-17181][runtime] Drop generic Types in SchedulingTopology Interface URL: https://github.com/apache/flink/pull/11783#discussion_r409988449 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java ## @@ -98,15 +98,15 @@ public RestartPipelinedRegionFailoverStrategy( // private void buildFailoverRegions() { - final Set>> distinctRegions = + final Set> distinctRegions = Review comment: Now it can be simplified to `Set>`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL. URL: https://github.com/apache/flink/pull/11749#discussion_r409266063 ## File path: flink-python/src/main/java/org/apache/flink/client/python/PythonFunctionFactoryUtil.java ## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.python; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ReadableConfig; + +import py4j.GatewayServer; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutionException; + +import static org.apache.flink.client.python.PythonEnvUtils.getGatewayServer; +import static org.apache.flink.client.python.PythonEnvUtils.launchPy4jPythonClient; +import static org.apache.flink.client.python.PythonEnvUtils.setGatewayServer; +import static org.apache.flink.client.python.PythonEnvUtils.shutdownPythonProcess; + +/** + * Utils for PythonFunctionFactory. + */ +public class PythonFunctionFactoryUtil { + + private static final long CHECK_INTERVAL = 100; + + private static final long TIMEOUT_MILLIS = 3000; Review comment: Seems too small for me, change to 1? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL. URL: https://github.com/apache/flink/pull/11749#discussion_r409261220 ## File path: flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java ## @@ -247,4 +257,90 @@ static Process startPythonProcess(PythonEnvironment pythonEnv, List comm return process; } + + static void shutdownPythonProcess(Process pythonProcess, long timeoutMillis) { Review comment: This method seems very common and is not specific for Python. What about move it to PythonFunctionFactoryUtil where it's used? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL. URL: https://github.com/apache/flink/pull/11749#discussion_r409958033 ## File path: flink-python/pyflink/table/tests/test_table_environment_api.py ## @@ -412,6 +419,13 @@ def test_table_environment_with_blink_planner(self): self.assert_equals(results, ['2,hi,hello\n', '3,hello,hello\n']) +def test_sql_ddl(self): Review comment: Seems that this could be removed as it already exists in the parent class This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL. URL: https://github.com/apache/flink/pull/11749#discussion_r409959968 ## File path: flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java ## @@ -247,4 +256,63 @@ static Process startPythonProcess(PythonEnvironment pythonEnv, List comm return process; } + + /** +* Py4J both supports Java to Python RPC and Python to Java RPC. The GatewayServer object is +* the entry point of Java to Python RPC. Since the Py4j Python client will only be launched +* only once, the GatewayServer object needs to be reused. +*/ + private static GatewayServer gatewayServer = null; + + /** +* Creates a GatewayServer run in a daemon thread. +* +* @return The created GatewayServer +*/ + static GatewayServer startGatewayServer() throws ExecutionException, InterruptedException { + if (gatewayServer != null) { + return gatewayServer; + } + CompletableFuture gatewayServerFuture = new CompletableFuture<>(); + Thread thread = new Thread(() -> { + int freePort = NetUtils.getAvailablePort(); + GatewayServer server = new GatewayServer.GatewayServerBuilder() + .gateway(new Gateway(new ConcurrentHashMap(), new CallbackClient(freePort))) + .javaPort(0) + .build(); + gatewayServerFuture.complete(server); + server.start(true); + }); + thread.setName("py4j-gateway"); + thread.setDaemon(true); + thread.start(); + thread.join(); + gatewayServer = gatewayServerFuture.get(); + return gatewayServer; + } + + static Process launchPy4jPythonClient( + GatewayServer gatewayServer, + ReadableConfig config, + List commands, + String entryPointScript, + String tmpDir) throws IOException { + PythonEnvironment pythonEnv = PythonEnvUtils.preparePythonEnvironment( + config, entryPointScript, tmpDir); + // set env variable PYFLINK_GATEWAY_PORT for connecting of python gateway in python process. + pythonEnv.systemEnv.put("PYFLINK_GATEWAY_PORT", String.valueOf(gatewayServer.getListeningPort())); + // set env variable PYFLINK_CALLBACK_PORT for creating callback server in python process. + pythonEnv.systemEnv.put("PYFLINK_CALLBACK_PORT", String.valueOf(gatewayServer.getCallbackClient().getPort())); + // start the python process. + return PythonEnvUtils.startPythonProcess(pythonEnv, commands); + } + + static GatewayServer getGatewayServer() { + return gatewayServer; + } + + static void removeGatewayServer() { Review comment: Could we place all the methods about gateway server, e.g. startGatewayServer/getGatewayServer together? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL. URL: https://github.com/apache/flink/pull/11749#discussion_r409972882 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java ## @@ -18,13 +18,34 @@ package org.apache.flink.table.functions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.FunctionLanguage; + +import java.lang.reflect.InvocationTargetException; + /** * A util to instantiate {@link FunctionDefinition} in the default way. */ public class FunctionDefinitionUtil { public static FunctionDefinition createFunctionDefinition(String name, String className) { - // Currently only handles Java class-based functions + return createJavaFunctionDefinition(name, className); + } + + public static FunctionDefinition createFunctionDefinition( + String name, + String className, + FunctionLanguage functionLanguage, + ReadableConfig config) { + switch (functionLanguage) { Review comment: It complains in my IDE that `switch has too few case labels`. Could you resolve this warning? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL. URL: https://github.com/apache/flink/pull/11749#discussion_r409970974 ## File path: flink-python/src/test/java/org/apache/flink/client/python/PythonFunctionFactoryTest.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.python; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.util.FileUtils; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStream; +import java.lang.reflect.Field; +import java.util.Map; +import java.util.UUID; + +import static org.apache.flink.python.PythonOptions.PYTHON_FILES; + +/** + * Tests for PythonFunctionFactory. + */ +public class PythonFunctionFactoryTest { + + private String tmpdir = ""; + private BatchTableEnvironment flinkTableEnv; + private StreamTableEnvironment blinkTableEnv; + private Table flinkSourceTable; + private Table blinkSourceTable; + + @Before + public void prepareEnvironment() throws Exception { + tmpdir = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString()).getAbsolutePath(); + new File(tmpdir).mkdir(); + File pyFilePath = new File(tmpdir, "test1.py"); + try (OutputStream out = new FileOutputStream(pyFilePath)) { + String code = "" + + "from pyflink.table.udf import udf\n" + + "from pyflink.table import DataTypes\n" + + "@udf(input_types=DataTypes.STRING(), result_type=DataTypes.STRING())\n" + + "def func1(str):\n" + + "return str + str\n"; + out.write(code.getBytes()); + } + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + flinkTableEnv = BatchTableEnvironment.create(env); + flinkTableEnv.getConfig().getConfiguration().set(PYTHON_FILES, pyFilePath.getAbsolutePath()); + StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + blinkTableEnv = StreamTableEnvironment.create( + sEnv, EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()); + blinkTableEnv.getConfig().getConfiguration().set(PYTHON_FILES, pyFilePath.getAbsolutePath()); + flinkSourceTable = flinkTableEnv.fromDataSet(env.fromElements("1", "2", "3")).as("str"); + blinkSourceTable = blinkTableEnv.fromDataStream(sEnv.fromElements("1", "2", "3")).as("str"); + } + + @After + public void cleanEnvironment() throws Exception { + FileUtils.deleteDirectory(new File(tmpdir)); + } + + @Test + public void testPythonFunctionFactory() { + // flink temporary catalog + flinkTableEnv.sqlUpdate("create temporary function func1 as 'test1.func1' language python"); + verifyPlan(flinkSourceTable.select("func1(str)"), flinkTableEnv); + + // flink temporary system + flinkTableEnv.sqlUpdate("create temporary system function func1 as 'test1.func1' language python"); + verifyPlan(flinkSourceTable.select("func1(str)"), flinkTableEnv); + + // blink temporary catalog + blinkTableEnv.sqlUpdate("create temporary function func1 as 'test1.func1' language python"); +
[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL. URL: https://github.com/apache/flink/pull/11749#discussion_r409973891 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java ## @@ -259,10 +259,17 @@ private Operation convertCreateFunction(SqlCreateFunction sqlCreateFunction) { return new CreateTempSystemFunctionOperation( unresolvedIdentifier.getObjectName(), sqlCreateFunction.getFunctionClassName().getValueAs(String.class), - sqlCreateFunction.isIfNotExists() + sqlCreateFunction.isIfNotExists(), + parseLanguage(sqlCreateFunction.getFunctionLanguage()) ); } else { FunctionLanguage language = parseLanguage(sqlCreateFunction.getFunctionLanguage()); + if (language == FunctionLanguage.PYTHON && !sqlCreateFunction.isTemporary()) { + throw new ValidationException(String.format( Review comment: fix the warning here This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL. URL: https://github.com/apache/flink/pull/11749#discussion_r409971668 ## File path: flink-python/src/test/java/org/apache/flink/client/python/PythonFunctionFactoryTest.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.python; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.util.FileUtils; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStream; +import java.lang.reflect.Field; +import java.util.Map; +import java.util.UUID; + +import static org.apache.flink.python.PythonOptions.PYTHON_FILES; + +/** + * Tests for PythonFunctionFactory. + */ +public class PythonFunctionFactoryTest { + + private String tmpdir = ""; + private BatchTableEnvironment flinkTableEnv; + private StreamTableEnvironment blinkTableEnv; + private Table flinkSourceTable; + private Table blinkSourceTable; + + @Before + public void prepareEnvironment() throws Exception { + tmpdir = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString()).getAbsolutePath(); + new File(tmpdir).mkdir(); + File pyFilePath = new File(tmpdir, "test1.py"); + try (OutputStream out = new FileOutputStream(pyFilePath)) { + String code = "" + + "from pyflink.table.udf import udf\n" + + "from pyflink.table import DataTypes\n" + + "@udf(input_types=DataTypes.STRING(), result_type=DataTypes.STRING())\n" + + "def func1(str):\n" + + "return str + str\n"; + out.write(code.getBytes()); + } + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + flinkTableEnv = BatchTableEnvironment.create(env); + flinkTableEnv.getConfig().getConfiguration().set(PYTHON_FILES, pyFilePath.getAbsolutePath()); + StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + blinkTableEnv = StreamTableEnvironment.create( + sEnv, EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()); + blinkTableEnv.getConfig().getConfiguration().set(PYTHON_FILES, pyFilePath.getAbsolutePath()); + flinkSourceTable = flinkTableEnv.fromDataSet(env.fromElements("1", "2", "3")).as("str"); + blinkSourceTable = blinkTableEnv.fromDataStream(sEnv.fromElements("1", "2", "3")).as("str"); + } + + @After + public void cleanEnvironment() throws Exception { + FileUtils.deleteDirectory(new File(tmpdir)); + } + + @Test + public void testPythonFunctionFactory() { + // flink temporary catalog + flinkTableEnv.sqlUpdate("create temporary function func1 as 'test1.func1' language python"); + verifyPlan(flinkSourceTable.select("func1(str)"), flinkTableEnv); + + // flink temporary system + flinkTableEnv.sqlUpdate("create temporary system function func1 as 'test1.func1' language python"); + verifyPlan(flinkSourceTable.select("func1(str)"), flinkTableEnv); + + // blink temporary catalog + blinkTableEnv.sqlUpdate("create temporary function func1 as 'test1.func1' language python"); +
[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL. URL: https://github.com/apache/flink/pull/11749#discussion_r409959350 ## File path: flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java ## @@ -247,4 +256,63 @@ static Process startPythonProcess(PythonEnvironment pythonEnv, List comm return process; } + + /** +* Py4J both supports Java to Python RPC and Python to Java RPC. The GatewayServer object is +* the entry point of Java to Python RPC. Since the Py4j Python client will only be launched +* only once, the GatewayServer object needs to be reused. +*/ + private static GatewayServer gatewayServer = null; + + /** +* Creates a GatewayServer run in a daemon thread. +* +* @return The created GatewayServer +*/ + static GatewayServer startGatewayServer() throws ExecutionException, InterruptedException { + if (gatewayServer != null) { + return gatewayServer; + } + CompletableFuture gatewayServerFuture = new CompletableFuture<>(); + Thread thread = new Thread(() -> { + int freePort = NetUtils.getAvailablePort(); + GatewayServer server = new GatewayServer.GatewayServerBuilder() + .gateway(new Gateway(new ConcurrentHashMap(), new CallbackClient(freePort))) + .javaPort(0) + .build(); + gatewayServerFuture.complete(server); + server.start(true); + }); + thread.setName("py4j-gateway"); + thread.setDaemon(true); + thread.start(); + thread.join(); + gatewayServer = gatewayServerFuture.get(); + return gatewayServer; + } + + static Process launchPy4jPythonClient( + GatewayServer gatewayServer, + ReadableConfig config, + List commands, + String entryPointScript, + String tmpDir) throws IOException { + PythonEnvironment pythonEnv = PythonEnvUtils.preparePythonEnvironment( + config, entryPointScript, tmpDir); + // set env variable PYFLINK_GATEWAY_PORT for connecting of python gateway in python process. + pythonEnv.systemEnv.put("PYFLINK_GATEWAY_PORT", String.valueOf(gatewayServer.getListeningPort())); + // set env variable PYFLINK_CALLBACK_PORT for creating callback server in python process. + pythonEnv.systemEnv.put("PYFLINK_CALLBACK_PORT", String.valueOf(gatewayServer.getCallbackClient().getPort())); + // start the python process. + return PythonEnvUtils.startPythonProcess(pythonEnv, commands); + } + + static GatewayServer getGatewayServer() { + return gatewayServer; + } + + static void removeGatewayServer() { + gatewayServer.shutdown(); Review comment: Should check if gatewayServer is null This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL. URL: https://github.com/apache/flink/pull/11749#discussion_r409973118 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java ## @@ -33,8 +54,36 @@ public static FunctionDefinition createFunctionDefinition(String name, String cl String.format("Failed instantiating '%s'", className), e); } + return createFunctionDefinitionInternal(name, (UserDefinedFunction) func); + } + + private static FunctionDefinition createPythonFunctionDefinition( + String name, + String fullyQualifiedName, + ReadableConfig config) { + Object func; + try { + Class pythonFunctionFactory = Class.forName( + "org.apache.flink.client.python.PythonFunctionFactory", + true, + Thread.currentThread().getContextClassLoader()); + func = pythonFunctionFactory.getMethod( + "getPythonFunction", + String.class, + ReadableConfig.class) + .invoke(null, fullyQualifiedName, config); + Review comment: remove the empty line This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL. URL: https://github.com/apache/flink/pull/11749#discussion_r409970315 ## File path: flink-python/src/main/java/org/apache/flink/client/python/PythonGatewayServer.java ## @@ -82,4 +81,8 @@ public static void main(String[] args) throws IOException { System.exit(1); } } + + public static boolean ping() { Review comment: this method is not needed any more and should be removed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
dianfu commented on a change in pull request #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL. URL: https://github.com/apache/flink/pull/11749#discussion_r409972150 ## File path: flink-python/src/test/java/org/apache/flink/client/python/PythonFunctionFactoryTest.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.python; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.util.FileUtils; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStream; +import java.lang.reflect.Field; +import java.util.Map; +import java.util.UUID; + +import static org.apache.flink.python.PythonOptions.PYTHON_FILES; + +/** + * Tests for PythonFunctionFactory. + */ +public class PythonFunctionFactoryTest { + + private String tmpdir = ""; + private BatchTableEnvironment flinkTableEnv; + private StreamTableEnvironment blinkTableEnv; + private Table flinkSourceTable; + private Table blinkSourceTable; + + @Before Review comment: Change to BeforeClass? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11765: [FLINK-8864][sql-client] Add cli command history in sql client and make it available after restarts
flinkbot edited a comment on issue #11765: [FLINK-8864][sql-client] Add cli command history in sql client and make it available after restarts URL: https://github.com/apache/flink/pull/11765#issuecomment-614430999 ## CI report: * 22eef4c43cd4fd0a6ddffb3cbef00a2fe522dac6 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160495175) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7557) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11615: [FLINK-16605] Add max limitation to the total number of slots
flinkbot edited a comment on issue #11615: [FLINK-16605] Add max limitation to the total number of slots URL: https://github.com/apache/flink/pull/11615#issuecomment-607717339 ## CI report: * 2db316d977d0e790de8fa98b27dd219b68abb136 UNKNOWN * 0b0111ba893c9ecc7632394fc59e455f8d5c9db7 UNKNOWN * cf7728e0b162ed6bb27c75aa2d02ca9be7e2a6c4 Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/160663186) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7618) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11351: [FLINK-16404][runtime] Avoid caching buffers for blocked input channels before barrier alignment
flinkbot edited a comment on issue #11351: [FLINK-16404][runtime] Avoid caching buffers for blocked input channels before barrier alignment URL: https://github.com/apache/flink/pull/11351#issuecomment-596351676 ## CI report: * 715889a35cfcc3aaf1b17f39dadaa86f755cc75d UNKNOWN * 548b22258f5e87fd53b45c7f4bb6de40bfd4e6d2 UNKNOWN * 9179aab74eeaf80ea30c0894f3e5a0171338baed UNKNOWN * cd385c55e9dc111a061e19e2387f2ae9ce21369e UNKNOWN * e869d82f1aa639c02af4a82c5026f939bf4e8f6c UNKNOWN * f86372054c1c4724b8dabc8d06e369475e64ac29 UNKNOWN * a0c7ff983988f27f5e47f4c71c8fb1ef28f8a24a UNKNOWN * f2ba8a55cb8e441a1377b2cc00957e13e7445e47 UNKNOWN * 57564cc22bdc5fb9055a9896fce66bc0305b4e31 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160566977) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7605) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #11770: [FLINK-17014][runtime] Implement PipelinedRegionSchedulingStrategy
zhuzhurk commented on a change in pull request #11770: [FLINK-17014][runtime] Implement PipelinedRegionSchedulingStrategy URL: https://github.com/apache/flink/pull/11770#discussion_r409983090 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java ## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; +import org.apache.flink.util.IterableUtils; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * {@link SchedulingStrategy} instance which schedules tasks in granularity of pipelined regions. + */ +public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(false); + + /** Result partitions are correlated if they have the same result id. */ + private final Map>> correlatedResultPartitions = new HashMap<>(); + + private final Map>> partitionConsumerRegions = new HashMap<>(); + + public PipelinedRegionSchedulingStrategy( + final SchedulerOperations schedulerOperations, + final SchedulingTopology schedulingTopology) { + + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + + init(); + } + + private void init() { + for (SchedulingPipelinedRegion region : schedulingTopology.getAllPipelinedRegions()) { + for (SchedulingResultPartition partition : region.getConsumedResults()) { + // sanity check Review comment: ok. will remove it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #11770: [FLINK-17014][runtime] Implement PipelinedRegionSchedulingStrategy
zhuzhurk commented on a change in pull request #11770: [FLINK-17014][runtime] Implement PipelinedRegionSchedulingStrategy URL: https://github.com/apache/flink/pull/11770#discussion_r409983049 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +/** + * Unit tests for {@link PipelinedRegionSchedulingStrategy}. + */ +public class PipelinedRegionSchedulingStrategyTest extends TestLogger { + + private TestingSchedulerOperations testingSchedulerOperation; + + private int parallelism = 2; + + private TestingSchedulingTopology testingSchedulingTopology; + + private List source; + + private List map; + + private List sink; + + @Before + public void setUp() { + testingSchedulerOperation = new TestingSchedulerOperations(); + + buildTopology(); + } + + private void buildTopology() { + testingSchedulingTopology = new TestingSchedulingTopology(); + + source = testingSchedulingTopology.addExecutionVertices().withParallelism(parallelism).finish(); + map = testingSchedulingTopology.addExecutionVertices().withParallelism(parallelism).finish(); + sink = testingSchedulingTopology.addExecutionVertices().withParallelism(parallelism).finish(); + + testingSchedulingTopology.connectPointwise(source, map) + .withResultPartitionState(ResultPartitionState.CREATED) + .withResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED) + .finish(); + testingSchedulingTopology.connectAllToAll(map, sink) + .withResultPartitionState(ResultPartitionState.CREATED) + .withResultPartitionType(ResultPartitionType.BLOCKING) + .finish(); + + testingSchedulingTopology.generatePipelinedRegions(); + } + + @Test + public void testStartScheduling() { + startScheduling(testingSchedulingTopology); + + final List> expectedScheduledVertices = new ArrayList<>(); + expectedScheduledVertices.add(Arrays.asList(source.get(0), map.get(0))); + expectedScheduledVertices.add(Arrays.asList(source.get(1), map.get(1))); + assertLatestScheduledVerticesAreEqualTo(expectedScheduledVertices); + } + + @Test + public void testRestartTasks() { + final PipelinedRegionSchedulingStrategy schedulingStrategy = startScheduling(testingSchedulingTopology); + + final Set verticesToRestart = Stream.of(source, map, sink) + .flatMap(List::stream) + .map(TestingSchedulingExecutionVertex::getId) + .collect(Collectors.toSet()); + + schedulingStrategy.restartTasks(verticesToRestart); + + final List> expectedScheduledVertices = new ArrayList<>(); + expectedScheduledVertices.add(Arrays.asList(source.get(0), map.get(0))); + expectedScheduledVertices.add(Arrays.asList(source.get(1), map.get(1))); + assertLatestScheduledVerticesAreEqualTo(expectedScheduledVertices); + } + + @Test + public void
[GitHub] [flink] zhuzhurk commented on a change in pull request #11770: [FLINK-17014][runtime] Implement PipelinedRegionSchedulingStrategy
zhuzhurk commented on a change in pull request #11770: [FLINK-17014][runtime] Implement PipelinedRegionSchedulingStrategy URL: https://github.com/apache/flink/pull/11770#discussion_r409982245 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyUtils.java ## @@ -60,4 +61,18 @@ deploymentOptionRetriever.apply(executionVertexID))) .collect(Collectors.toList()); } + + static List> sortPipelinedRegionsInTopologicalOrder( + final SchedulingTopology topology, + final Set> regions) { + + final Set> deduplicator = new HashSet<>(); + return IterableUtils.toStream(topology.getVertices()) + .map(SchedulingExecutionVertex::getId) + .map(topology::getPipelinedRegionOfVertex) + .filter(regions::contains) + .filter(region -> !deduplicator.contains(region)) + .filter(deduplicator::add) Review comment: Good idea! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #11770: [FLINK-17014][runtime] Implement PipelinedRegionSchedulingStrategy
zhuzhurk commented on a change in pull request #11770: [FLINK-17014][runtime] Implement PipelinedRegionSchedulingStrategy URL: https://github.com/apache/flink/pull/11770#discussion_r409982245 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyUtils.java ## @@ -60,4 +61,18 @@ deploymentOptionRetriever.apply(executionVertexID))) .collect(Collectors.toList()); } + + static List> sortPipelinedRegionsInTopologicalOrder( + final SchedulingTopology topology, + final Set> regions) { + + final Set> deduplicator = new HashSet<>(); + return IterableUtils.toStream(topology.getVertices()) + .map(SchedulingExecutionVertex::getId) + .map(topology::getPipelinedRegionOfVertex) + .filter(regions::contains) + .filter(region -> !deduplicator.contains(region)) + .filter(deduplicator::add) Review comment: Good idea! Will use `distinct()`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #11770: [FLINK-17014][runtime] Implement PipelinedRegionSchedulingStrategy
zhuzhurk commented on a change in pull request #11770: [FLINK-17014][runtime] Implement PipelinedRegionSchedulingStrategy URL: https://github.com/apache/flink/pull/11770#discussion_r409982461 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java ## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; +import org.apache.flink.util.IterableUtils; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * {@link SchedulingStrategy} instance which schedules tasks in granularity of pipelined regions. + */ +public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(false); + + /** Result partitions are correlated if they have the same result id. */ + private final Map>> correlatedResultPartitions = new HashMap<>(); + + private final Map>> partitionConsumerRegions = new HashMap<>(); + + public PipelinedRegionSchedulingStrategy( + final SchedulerOperations schedulerOperations, + final SchedulingTopology schedulingTopology) { + + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + + init(); + } + + private void init() { + for (SchedulingPipelinedRegion region : schedulingTopology.getAllPipelinedRegions()) { + for (SchedulingResultPartition partition : region.getConsumedResults()) { + // sanity check + checkState(partition.getResultType() == ResultPartitionType.BLOCKING); + + partitionConsumerRegions.computeIfAbsent(partition.getId(), pid -> new HashSet<>()).add(region); + correlatedResultPartitions.computeIfAbsent(partition.getResultId(), rid -> new HashSet<>()).add(partition); + } + } + } + + @Override + public void startScheduling() { + final Set> sourceRegions = IterableUtils + .toStream(schedulingTopology.getAllPipelinedRegions()) + .filter(region -> !region.getConsumedResults().iterator().hasNext()) + .collect(Collectors.toSet()); + maybeScheduleRegions(sourceRegions); + } + + @Override + public void restartTasks(final Set verticesToRestart) { + final Set> regionsToRestart = verticesToRestart.stream() + .map(schedulingTopology::getPipelinedRegionOfVertex) + .collect(Collectors.toSet()); + maybeScheduleRegions(regionsToRestart); + } + + @Override + public void onExecutionStateChange(final ExecutionVertexID executionVertexId, final ExecutionState executionState) { + if (executionState == ExecutionState.FINISHED) { + final Set> finishedPartitions = IterableUtils + .toStream(schedulingTopology.getVertex(executionVertexId).getProducedResults()) +
[jira] [Resolved] (FLINK-16771) NPE when filtering by decimal column
[ https://issues.apache.org/jira/browse/FLINK-16771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee resolved FLINK-16771. -- Resolution: Fixed release-1.10: 47d6781de53a0cc237f457ae4f9c74c25ea590c6 > NPE when filtering by decimal column > > > Key: FLINK-16771 > URL: https://issues.apache.org/jira/browse/FLINK-16771 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: Rui Li >Assignee: Rui Li >Priority: Major > Labels: pull-request-available > Fix For: 1.10.1, 1.11.0 > > Time Spent: 40m > Remaining Estimate: 0h > > The following SQL can trigger the issue: > {code} > create table foo (d decimal(15,8)); > insert into foo values (cast('123.123' as decimal(15,8))); > select * from foo where d>cast('123456789.123' as decimal(15,8)); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] JingsongLi merged pull request #11736: [FLINK-16771][table-planner-blink] NPE when filtering by decimal column
JingsongLi merged pull request #11736: [FLINK-16771][table-planner-blink] NPE when filtering by decimal column URL: https://github.com/apache/flink/pull/11736 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11775: [FLINK-17132][metrics] Use the latest version of simpleclient_pushgateway to solve the "Failed to push metrics to PushGateway" problem
flinkbot edited a comment on issue #11775: [FLINK-17132][metrics] Use the latest version of simpleclient_pushgateway to solve the "Failed to push metrics to PushGateway" problem URL: https://github.com/apache/flink/pull/11775#issuecomment-614586674 ## CI report: * 5178c66b32381b67a41cb2a4e36a7498f241502c Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160543598) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7598) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-15795) KubernetesClusterDescriptor seems to report unreachable JobManager interface URL
[ https://issues.apache.org/jira/browse/FLINK-15795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17085390#comment-17085390 ] Canbin Zheng edited comment on FLINK-15795 at 4/17/20, 3:55 AM: [~fly_in_gis] I noticed this problem before, it is in the plan of FLINK-17196. cc [~trohrmann] Could it be a subtask of FLINK-17196? was (Author: felixzheng): I noticed this problem before, it is in the plan of FLINK-17196, could it be a subtask of FLINK-17196 > KubernetesClusterDescriptor seems to report unreachable JobManager interface > URL > > > Key: FLINK-15795 > URL: https://issues.apache.org/jira/browse/FLINK-15795 > Project: Flink > Issue Type: Sub-task > Components: Deployment / Kubernetes >Affects Versions: 1.10.0 >Reporter: Till Rohrmann >Priority: Minor > Fix For: 1.10.1, 1.11.0 > > > The {{KubernetesClusterDescriptor}} reports the {{JobManager}} web interface > URL when deploying the cluster. In my case (using almost the standard > configuration), it reported an unreachable URL which consisted of the K8s > cluster's endpoint IP. This might be a configuration problem of the K8s > cluster but at the same time, the descriptor deployed a LoadBalancer (default > configuration) through which one could reach the web UI. In this case, I > would have wished that the {{LoadBalancer's}} IP and port is reported instead. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17177) Handle ERROR event correctly in KubernetesResourceManager#onError
[ https://issues.apache.org/jira/browse/FLINK-17177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17085412#comment-17085412 ] Yang Wang commented on FLINK-17177: --- {code:java} Object is: If Type is Added or Modified: the new state of the object. If Type is Deleted: the state of the object immediately before deletion. If Type is Error: Status is recommended; other types may make sense depending on context. {code} I post the {{WatchEvent}} in K8s here[1]. I do not find the "Error" type means "HTTP error". So could share some information about how the "Error" type is introduced by HTTP layer error? [1]. [https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.10/#watchevent-v1-meta] > Handle ERROR event correctly in KubernetesResourceManager#onError > - > > Key: FLINK-17177 > URL: https://issues.apache.org/jira/browse/FLINK-17177 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes >Affects Versions: 1.10.0, 1.10.1 >Reporter: Canbin Zheng >Priority: Major > Fix For: 1.11.0 > > > Currently, once we receive an *ERROR* event that is sent from the K8s API > server via the K8s {{Watcher}}, then {{KubernetesResourceManager#onError}} > will handle it by calling the > {{KubernetesResourceManager#removePodIfTerminated}}. This may be incorrect > since the *ERROR* event indicates an exception in the HTTP layer that is > caused by the K8s Server, which means the previously created {{Watcher}} may > be no longer available and we'd better re-create the {{Watcher}} immediately. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17198) DDL and DML compatibility for Hive connector
Rui Li created FLINK-17198: -- Summary: DDL and DML compatibility for Hive connector Key: FLINK-17198 URL: https://issues.apache.org/jira/browse/FLINK-17198 Project: Flink Issue Type: New Feature Components: Connectors / Hive, Table SQL / Client Reporter: Rui Li Fix For: 1.11.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17197) ContinuousFileReaderOperator might shade the real exception when processing records
Caizhi Weng created FLINK-17197: --- Summary: ContinuousFileReaderOperator might shade the real exception when processing records Key: FLINK-17197 URL: https://issues.apache.org/jira/browse/FLINK-17197 Project: Flink Issue Type: Improvement Components: Connectors / FileSystem Affects Versions: 1.11.0 Reporter: Caizhi Weng In {{ContinuousFileReaderOperator}} we have the following code: {code:java} private final transient RunnableWithException processRecordAction = () -> { try { processRecord(); } catch (Exception e) { switchState(ReaderState.CLOSED); throw e; } }; {code} The reader's state is forced to be {{CLOSED}} when an exception occurs. But this {{switchState}} method might throw another exception saying that reader's current state can't be switched directly to {{CLOSED}}. This new exception will shade the real exception thrown from {{processRecord}}. To trigger this situation, add the following test method to any blink planner IT case class (for example {{org.apache.flink.table.planner.runtime.batch.sql.CalcITCase}}): {code:java} @Test def testCsv(): Unit = { conf.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1) val tableSource = TestTableSourceSinks.getPersonCsvTableSource tEnv.registerTableSource("MyTable", tableSource) val t = tEnv.sqlQuery("SELECT * FROM MyTable") TableUtils.collectToList(t) } {code} We then need to change {{org.apache.flink.api.java.Utils.CollectHelper}} to make it a failing sink: {code:java} @Override public void writeRecord(T record) throws IOException { throw new RuntimeException("writeRecordFails"); // accumulator.add(record, serializer); } {code} Run the test case and the following exception stack will occur: {code} org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:659) at org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:77) at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:51) at org.apache.flink.table.planner.utils.TestingTableEnvironment.execute(TableTestBase.scala:1054) at org.apache.flink.table.api.TableUtils.collectToList(TableUtils.java:85) at org.apache.flink.table.planner.runtime.batch.sql.CalcITCase.testCsv(CalcITCase.scala:73) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) at
[GitHub] [flink] flinkbot edited a comment on issue #11545: [FLINK-16742][runtime][dist] Extend and use BashJavaUtils to start JM JVM process and pass JVM memory args
flinkbot edited a comment on issue #11545: [FLINK-16742][runtime][dist] Extend and use BashJavaUtils to start JM JVM process and pass JVM memory args URL: https://github.com/apache/flink/pull/11545#issuecomment-604848813 ## CI report: * 46b64a4339ed27315eee1eb2d8d877c8dfaae993 UNKNOWN * 0e3973318602ca3081a5d5a4401cd5f2df4fa4b3 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160482936) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7548) * dd69c50599e8e8709ceb2fb055797f94534c0aa4 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160667141) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7623) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-17177) Handle ERROR event correctly in KubernetesResourceManager#onError
[ https://issues.apache.org/jira/browse/FLINK-17177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yang Wang updated FLINK-17177: -- Attachment: (was: image-2020-04-17-11-40-41-869.png) > Handle ERROR event correctly in KubernetesResourceManager#onError > - > > Key: FLINK-17177 > URL: https://issues.apache.org/jira/browse/FLINK-17177 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes >Affects Versions: 1.10.0, 1.10.1 >Reporter: Canbin Zheng >Priority: Major > Fix For: 1.11.0 > > > Currently, once we receive an *ERROR* event that is sent from the K8s API > server via the K8s {{Watcher}}, then {{KubernetesResourceManager#onError}} > will handle it by calling the > {{KubernetesResourceManager#removePodIfTerminated}}. This may be incorrect > since the *ERROR* event indicates an exception in the HTTP layer that is > caused by the K8s Server, which means the previously created {{Watcher}} may > be no longer available and we'd better re-create the {{Watcher}} immediately. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17177) Handle ERROR event correctly in KubernetesResourceManager#onError
[ https://issues.apache.org/jira/browse/FLINK-17177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yang Wang updated FLINK-17177: -- Attachment: image-2020-04-17-11-40-41-869.png > Handle ERROR event correctly in KubernetesResourceManager#onError > - > > Key: FLINK-17177 > URL: https://issues.apache.org/jira/browse/FLINK-17177 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes >Affects Versions: 1.10.0, 1.10.1 >Reporter: Canbin Zheng >Priority: Major > Fix For: 1.11.0 > > > Currently, once we receive an *ERROR* event that is sent from the K8s API > server via the K8s {{Watcher}}, then {{KubernetesResourceManager#onError}} > will handle it by calling the > {{KubernetesResourceManager#removePodIfTerminated}}. This may be incorrect > since the *ERROR* event indicates an exception in the HTTP layer that is > caused by the K8s Server, which means the previously created {{Watcher}} may > be no longer available and we'd better re-create the {{Watcher}} immediately. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zenfenan edited a comment on issue #11759: FLINK-17142: Bump ORC dependency version to 1.5.6
zenfenan edited a comment on issue #11759: FLINK-17142: Bump ORC dependency version to 1.5.6 URL: https://github.com/apache/flink/pull/11759#issuecomment-615020177 @zentol I checked if NOTICE or LICENSE has to be updated but I don't think it is needed. Appreciate if you or @JingsongLi or @kl0u could also check. @all If everything checks out, can we close this one? Thanks :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zenfenan edited a comment on issue #11759: FLINK-17142: Bump ORC dependency version to 1.5.6
zenfenan edited a comment on issue #11759: FLINK-17142: Bump ORC dependency version to 1.5.6 URL: https://github.com/apache/flink/pull/11759#issuecomment-615020177 @zentol I checked if NOTICE or LICENSE has to be updated but I don't think it is needed. Appreciate if you or @JingsongLi or @kl0u could check. @all If everything checks out, can we close this one? Thanks :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zenfenan edited a comment on issue #11759: FLINK-17142: Bump ORC dependency version to 1.5.6
zenfenan edited a comment on issue #11759: FLINK-17142: Bump ORC dependency version to 1.5.6 URL: https://github.com/apache/flink/pull/11759#issuecomment-615020177 @zentol I checked if NOTICE or LICENSE has to be updated but I don't think it is needed. Appreciate if you or @JingsongLi or @kl0u could also check. All, if everything checks out, can we close this one? Thanks :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zenfenan commented on issue #11759: FLINK-17142: Bump ORC dependency version to 1.5.6
zenfenan commented on issue #11759: FLINK-17142: Bump ORC dependency version to 1.5.6 URL: https://github.com/apache/flink/pull/11759#issuecomment-615020177 @zentol I checked if NOTICE or LICENSE has to be updated but I don't think if that is needed. Appreciate if you or @JingsongLi or @kl0u could check. @all If everything checks out, can we close this one? Thanks :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wangyang0918 commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager
wangyang0918 commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11323#discussion_r409966657 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -237,57 +230,73 @@ private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerExce ++currentMaxAttemptId); } - private void requestKubernetesPod() { - numPendingPodRequests++; + private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { + final KubernetesTaskManagerParameters parameters = + createKubernetesTaskManagerParameters(workerResourceSpec); + + final KubernetesPod taskManagerPod = + KubernetesTaskManagerFactory.createTaskManagerComponent(parameters); + kubeClient.createTaskManagerPod(taskManagerPod); Review comment: @tillrohrmann Sorry to jump to here. I want to add some additional explanations here. Why i name it to `createTaskManagerPod` instead of `startTaskManagerPod`? * we usually use `create` in K8s commands(e.g. `kubectl create -f taskmanager.yaml`). The start process will be done internally and automatically in K8s `kubelet`. Why do not return a `Pod` for `createTaskManagerPod`? * Of cause we could return a created `Pod`. However, it is not a final or stable state. Since the K8s will do some updates for the `Pod`, the ip address, the status. So i do not return the `Pod` to avoid it may be used incorrectly. For the `KubernetesJobManagerFactory#createJobManagerComponent` and `KubernetesTaskManagerFactory#createTaskManagerComponent`, maybe we could rename them to `KubernetesJobManagerFactory#buildJobManagerSpecification` and `KubernetesTaskManagerFactory#buildTaskManagerPod`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11784: [FLINK-17120][python] Add Cython support for operations
flinkbot edited a comment on issue #11784: [FLINK-17120][python] Add Cython support for operations URL: https://github.com/apache/flink/pull/11784#issuecomment-615011156 ## CI report: * 1ee639f83adcf607b722609782be88207de5fe49 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160666122) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7622) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11545: [FLINK-16742][runtime][dist] Extend and use BashJavaUtils to start JM JVM process and pass JVM memory args
flinkbot edited a comment on issue #11545: [FLINK-16742][runtime][dist] Extend and use BashJavaUtils to start JM JVM process and pass JVM memory args URL: https://github.com/apache/flink/pull/11545#issuecomment-604848813 ## CI report: * 46b64a4339ed27315eee1eb2d8d877c8dfaae993 UNKNOWN * 0e3973318602ca3081a5d5a4401cd5f2df4fa4b3 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160482936) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7548) * dd69c50599e8e8709ceb2fb055797f94534c0aa4 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
flinkbot edited a comment on issue #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#issuecomment-595174225 ## CI report: * 8e8aca52ebd0560d829ef3945884817668976804 UNKNOWN * 40f6f68a12a74aacf1100dfb81b5bed081cd9a4a UNKNOWN * 7b2346b7cdc46a7c3b490c6c9a8ce6e1727fe10b Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160592479) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7610) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wangyang0918 commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager
wangyang0918 commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11323#discussion_r409968753 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -189,15 +180,17 @@ public boolean stopWorker(final KubernetesWorkerNode worker) { public void onAdded(List pods) { runAsync(() -> { for (KubernetesPod pod : pods) { - if (numPendingPodRequests > 0) { - numPendingPodRequests--; + WorkerResourceSpec workerResourceSpec = podWorkerResources.get(pod.getName()); + final int pendingNum = getNumPendingWorkersFor(workerResourceSpec); + if (pendingNum > 0) { Review comment: Recently i also realize that the `onAdded` for a same `Pod` may be called more that once here, e.g. some network problems cause the pod watcher reconnect, so it will be good to check the existence first. Otherwise, the `numPendingPodRequests` may be decreased wrongly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-15795) KubernetesClusterDescriptor seems to report unreachable JobManager interface URL
[ https://issues.apache.org/jira/browse/FLINK-15795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17085390#comment-17085390 ] Canbin Zheng commented on FLINK-15795: -- I noticed this problem before, it is in the plan of FLINK-17196, could it be a subtask of FLINK-17196 > KubernetesClusterDescriptor seems to report unreachable JobManager interface > URL > > > Key: FLINK-15795 > URL: https://issues.apache.org/jira/browse/FLINK-15795 > Project: Flink > Issue Type: Sub-task > Components: Deployment / Kubernetes >Affects Versions: 1.10.0 >Reporter: Till Rohrmann >Priority: Minor > Fix For: 1.10.1, 1.11.0 > > > The {{KubernetesClusterDescriptor}} reports the {{JobManager}} web interface > URL when deploying the cluster. In my case (using almost the standard > configuration), it reported an unreachable URL which consisted of the K8s > cluster's endpoint IP. This might be a configuration problem of the K8s > cluster but at the same time, the descriptor deployed a LoadBalancer (default > configuration) through which one could reach the web UI. In this case, I > would have wished that the {{LoadBalancer's}} IP and port is reported instead. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-17185) Value of Row[] parameter of UDF is null in blink planner
[ https://issues.apache.org/jira/browse/FLINK-17185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zck closed FLINK-17185. --- Resolution: Invalid > Value of Row[] parameter of UDF is null in blink planner > > > Key: FLINK-17185 > URL: https://issues.apache.org/jira/browse/FLINK-17185 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: zck >Priority: Major > > source_ddl: > CREATE TABLE sourceTable ( > event_time_line array `rule_name` VARCHAR, > `count` VARCHAR > )> > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.startup-mode' = 'earliest-offset', > 'connector.topic' = 'topic_test_1', > 'connector.properties.zookeeper.connect' = 'localhost:2181', > 'connector.properties.bootstrap.servers' = 'localhost:9092', > 'update-mode' = 'append', > 'format.type' = 'json', > 'format.derive-schema' = 'true' > ); > > > public class MyUDTF extends TableFunction { > /** > * > * *@param rows blink执行计划 为null,但是数组有长度 。默认执行计划有值* > */ > public void eval(Row [] rows) { > collector.collect(Row.of(1, 1)); > } > } -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-17176) Slow down Pod re-creation in KubernetesResourceManager#PodCallbackHandler
[ https://issues.apache.org/jira/browse/FLINK-17176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17085383#comment-17085383 ] Canbin Zheng edited comment on FLINK-17176 at 4/17/20, 3:08 AM: Of course, they should share the same retry logic. I think you are right, this ticket could be a subtask of {color:#4a6ee0}FLINK-17127{color} was (Author: felixzheng): Of course, they should share the same retry logic, and this ticket could be a subtask of {color:#4a6ee0}FLINK-17127{color} > Slow down Pod re-creation in KubernetesResourceManager#PodCallbackHandler > - > > Key: FLINK-17176 > URL: https://issues.apache.org/jira/browse/FLINK-17176 > Project: Flink > Issue Type: Improvement > Components: Deployment / Kubernetes >Affects Versions: 1.10.0 >Reporter: Canbin Zheng >Priority: Major > Fix For: 1.11.0 > > > In the native K8s setups, there are some cases that we do not control the > speed of pod re-creation which poses potential risks to flood the K8s API > Server in the {{PodCallbackHandler}} implementation of > {{KubernetesResourceManager.}} > Here are steps to reproduce this kind of problems: > # Mount the {{/opt/flink/log}} in the Container of TaskManager to a path on > the K8s nodes via HostPath, make sure that the path exists but the > TaskManager process has no write permission. We can achieve this via the > [user-specified pod template > support|https://issues.apache.org/jira/browse/FLINK-15656] or just hardcode > it for testing only. > # Launch a session cluster > # Submit a new job to the session cluster, as expected, we can observe that > the Pod constantly fails quickly during launching the main Container, then > the {{KubernetesResourceManager#onModified}} is invoked to re-create a new > Pod immediately, without any speed control. > To sum up, once the {{KubernetesResourceManager}} receives the Pod *ADD* > event and that Pod is terminated before successfully registering into the > {{KubernetesResourceManager}}, the {{KubernetesResourceManager}} will send > another creation request to K8s API Server immediately. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] wangyang0918 commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager
wangyang0918 commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11323#discussion_r409966657 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -237,57 +230,73 @@ private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerExce ++currentMaxAttemptId); } - private void requestKubernetesPod() { - numPendingPodRequests++; + private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { + final KubernetesTaskManagerParameters parameters = + createKubernetesTaskManagerParameters(workerResourceSpec); + + final KubernetesPod taskManagerPod = + KubernetesTaskManagerFactory.createTaskManagerComponent(parameters); + kubeClient.createTaskManagerPod(taskManagerPod); Review comment: @tillrohrmann Sorry to jump to here. I want to add some additional explanations here. Why i name it to `createTaskManagerPod` instead of `startTaskManagerPod`? * we usually use `create` in K8s commands(e.g. `kubectl create -f taskmanager.yaml`). The start process will be done internally and automatically in K8s `kubelet`. Why do not return a `Pod` for `createTaskManagerPod`? * Of cause we could return a created `Pod`. However, it is not a final or stable state. Since the K8s will do some updates for the `Pod`, the ip address, the status. So i do not return the `Pod` to avoid it may be used incorrectly. For the `KubernetesJobManagerFactory#createJobManagerComponent` and `KubernetesTaskManagerFactory#createTaskManagerComponent`, maybe we could rename them to `KubernetesJobManagerFactory#createJobManagerSpecification` and `KubernetesTaskManagerFactory#createTaskManagerPod`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-17196) Rework the implementation of Fabric8FlinkKubeClient#getRestEndpoint
Canbin Zheng created FLINK-17196: Summary: Rework the implementation of Fabric8FlinkKubeClient#getRestEndpoint Key: FLINK-17196 URL: https://issues.apache.org/jira/browse/FLINK-17196 Project: Flink Issue Type: Improvement Components: Deployment / Kubernetes Affects Versions: 1.10.0, 1.10.1 Reporter: Canbin Zheng Fix For: 1.11.0 This is the umbrella issue for the rework of the implementation of {{Fabric8FlinkKubeClient#getRestEndpoint}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-17184) Flink webui does not display received records/bytes metric when graph contains single operator
[ https://issues.apache.org/jira/browse/FLINK-17184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zck closed FLINK-17184. --- > Flink webui does not display received records/bytes metric when graph > contains single operator > --- > > Key: FLINK-17184 > URL: https://issues.apache.org/jira/browse/FLINK-17184 > Project: Flink > Issue Type: Bug > Components: Project Website >Affects Versions: 1.10.0 >Reporter: zck >Priority: Major > Attachments: image-2020-04-16-18-49-11-103.png > > > sink_ddl > source_ddl > query_sql : insert into tableB select * form tableA > webui 没有指标数据展示 > !image-2020-04-16-18-49-11-103.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] wangyang0918 commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager
wangyang0918 commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11323#discussion_r409966657 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -237,57 +230,73 @@ private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerExce ++currentMaxAttemptId); } - private void requestKubernetesPod() { - numPendingPodRequests++; + private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { + final KubernetesTaskManagerParameters parameters = + createKubernetesTaskManagerParameters(workerResourceSpec); + + final KubernetesPod taskManagerPod = + KubernetesTaskManagerFactory.createTaskManagerComponent(parameters); + kubeClient.createTaskManagerPod(taskManagerPod); Review comment: Sorry to jump to here. I want to add some additional explanations here. Why i name it to `createTaskManagerPod` instead of `startTaskManagerPod`? * we usually use `create` in K8s commands(e.g. `kubectl create -f taskmanager.yaml`). The start process will be done internally and automatically in K8s `kubelet`. Why do not return a `Pod` for `createTaskManagerPod`? * Of cause we could return a created `Pod`. However, it is not a final or stable state. Since the K8s will do some updates for the `Pod`, the ip address, the status. So i do not return the `Pod` to avoid it may be used incorrectly. For the `KubernetesJobManagerFactory#createJobManagerComponent` and `KubernetesTaskManagerFactory#createTaskManagerComponent`, maybe we could rename them to `KubernetesJobManagerFactory#createJobManagerSpecification` and `KubernetesTaskManagerFactory#createTaskManagerPod`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #11784: [FLINK-17120][python] Add Cython support for operations
flinkbot commented on issue #11784: [FLINK-17120][python] Add Cython support for operations URL: https://github.com/apache/flink/pull/11784#issuecomment-615011156 ## CI report: * 1ee639f83adcf607b722609782be88207de5fe49 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11765: [FLINK-8864][sql-client] Add cli command history in sql client and make it available after restarts
flinkbot edited a comment on issue #11765: [FLINK-8864][sql-client] Add cli command history in sql client and make it available after restarts URL: https://github.com/apache/flink/pull/11765#issuecomment-614430999 ## CI report: * 22eef4c43cd4fd0a6ddffb3cbef00a2fe522dac6 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160495175) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7557) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11466: [FLINK-15400][connectors / elasticsearch] elasticsearch table sink support dynamic index.
flinkbot edited a comment on issue #11466: [FLINK-15400][connectors / elasticsearch] elasticsearch table sink support dynamic index. URL: https://github.com/apache/flink/pull/11466#issuecomment-601789375 ## CI report: * 4c252220928052025f30b4dc4c63c0479d7c20cd UNKNOWN * c8f8eb3c22bfdd4f82204d281b6f7c5ebbefb769 Travis: [CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/160662143) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7616) * c015b198415d8077f1782e949036cb07bbaa044c UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL.
flinkbot edited a comment on issue #11749: [FLINK-16669][python][table] Support Python UDF in SQL function DDL. URL: https://github.com/apache/flink/pull/11749#issuecomment-613901508 ## CI report: * d2603c1936ff28ab745c711224857bba0693edd8 Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/160553016) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7601) * 7cbe2380ac7246cf703397d9a78d002c0491a959 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160664905) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7621) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11010: [FLINK-15836][k8s] Throw fatal error in KubernetesResourceManager when the pods watcher is closed with exception
flinkbot edited a comment on issue #11010: [FLINK-15836][k8s] Throw fatal error in KubernetesResourceManager when the pods watcher is closed with exception URL: https://github.com/apache/flink/pull/11010#issuecomment-581769412 ## CI report: * 2a53bac20fd0c35c38575280744968501d14 Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/160493463) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7556) * 1130280dfe557a0268fed248089fb32d15f832d9 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160664763) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7620) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-17195) Worker threads for source function have wrong ClassLoader
Teng Fei Liao created FLINK-17195: - Summary: Worker threads for source function have wrong ClassLoader Key: FLINK-17195 URL: https://issues.apache.org/jira/browse/FLINK-17195 Project: Flink Issue Type: Bug Components: Runtime / State Backends, Table SQL / Planner Affects Versions: 1.10.0 Reporter: Teng Fei Liao Hey, in one of my SourceFunctions, I'm calling ServiceLoader.load(MyDefiniedService.class) which internally fetches the ClassLoader by calling Thread.currentThread().getContextClassLoader(). It seems like whichever ClassLoader this fetches is unaware of any jars/code that I've shipped. My application code has not changed and this came up in my attempt switch planners from the old planner to the blink planner. Would appreciate any help I can get on this, thanks. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17176) Slow down Pod re-creation in KubernetesResourceManager#PodCallbackHandler
[ https://issues.apache.org/jira/browse/FLINK-17176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17085383#comment-17085383 ] Canbin Zheng commented on FLINK-17176: -- Of course, they should share the same retry logic, and this ticket could be a subtask of {color:#4a6ee0}FLINK-17127{color} > Slow down Pod re-creation in KubernetesResourceManager#PodCallbackHandler > - > > Key: FLINK-17176 > URL: https://issues.apache.org/jira/browse/FLINK-17176 > Project: Flink > Issue Type: Improvement > Components: Deployment / Kubernetes >Affects Versions: 1.10.0 >Reporter: Canbin Zheng >Priority: Major > Fix For: 1.11.0 > > > In the native K8s setups, there are some cases that we do not control the > speed of pod re-creation which poses potential risks to flood the K8s API > Server in the {{PodCallbackHandler}} implementation of > {{KubernetesResourceManager.}} > Here are steps to reproduce this kind of problems: > # Mount the {{/opt/flink/log}} in the Container of TaskManager to a path on > the K8s nodes via HostPath, make sure that the path exists but the > TaskManager process has no write permission. We can achieve this via the > [user-specified pod template > support|https://issues.apache.org/jira/browse/FLINK-15656] or just hardcode > it for testing only. > # Launch a session cluster > # Submit a new job to the session cluster, as expected, we can observe that > the Pod constantly fails quickly during launching the main Container, then > the {{KubernetesResourceManager#onModified}} is invoked to re-create a new > Pod immediately, without any speed control. > To sum up, once the {{KubernetesResourceManager}} receives the Pod *ADD* > event and that Pod is terminated before successfully registering into the > {{KubernetesResourceManager}}, the {{KubernetesResourceManager}} will send > another creation request to K8s API Server immediately. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15795) KubernetesClusterDescriptor seems to report unreachable JobManager interface URL
[ https://issues.apache.org/jira/browse/FLINK-15795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17085379#comment-17085379 ] Yang Wang commented on FLINK-15795: --- [~felixzheng] I post the yesterday discussion[1] here since this ticket is related. If you want to completely separate the {{getRestEndpoint}} for {{LoadBalancer}} and {{NodePort}}, this ticket should be taken into account. Just to make sure that we print a proper and accessible JobManager url in the Flink client log. Given that the rest port is configured to 8081. * For LoadBalancer, it should be {{LB_IP:8081}}. If we do not get a LB_IP, then it will great if a warning log shows up. * For NodePort, it should be {{K8S_MASTER:NodePort}} or {{K8s_NODE:NodePort}}. [1]. https://github.com/apache/flink/pull/11715#discussion_r409295087 > KubernetesClusterDescriptor seems to report unreachable JobManager interface > URL > > > Key: FLINK-15795 > URL: https://issues.apache.org/jira/browse/FLINK-15795 > Project: Flink > Issue Type: Sub-task > Components: Deployment / Kubernetes >Affects Versions: 1.10.0 >Reporter: Till Rohrmann >Priority: Minor > Fix For: 1.10.1, 1.11.0 > > > The {{KubernetesClusterDescriptor}} reports the {{JobManager}} web interface > URL when deploying the cluster. In my case (using almost the standard > configuration), it reported an unreachable URL which consisted of the K8s > cluster's endpoint IP. This might be a configuration problem of the K8s > cluster but at the same time, the descriptor deployed a LoadBalancer (default > configuration) through which one could reach the web UI. In this case, I > would have wished that the {{LoadBalancer's}} IP and port is reported instead. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] godfreyhe edited a comment on issue #11765: [FLINK-8864][sql-client] Add cli command history in sql client and make it available after restarts
godfreyhe edited a comment on issue #11765: [FLINK-8864][sql-client] Add cli command history in sql client and make it available after restarts URL: https://github.com/apache/flink/pull/11765#issuecomment-615007657 I think we can learn from https://github.com/oncewang/hive/blob/master/beeline/src/test/org/apache/hive/beeline/TestBeeLineHistory.java There are some tests based on `Terminal` in `CliClientTest`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on issue #11765: [FLINK-8864][sql-client] Add cli command history in sql client and make it available after restarts
godfreyhe commented on issue #11765: [FLINK-8864][sql-client] Add cli command history in sql client and make it available after restarts URL: https://github.com/apache/flink/pull/11765#issuecomment-615007657 I think we can learn from https://github.com/oncewang/hive/blob/master/beeline/src/test/org/apache/hive/beeline/TestBeeLineHistory.java This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11782: [FLINK-15801] fix Timestamp extractor created from properties does no…
flinkbot edited a comment on issue #11782: [FLINK-15801] fix Timestamp extractor created from properties does no… URL: https://github.com/apache/flink/pull/11782#issuecomment-614731103 ## CI report: * 3ba08d20930dc7f1bfa750c2e11323ffbd67 Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/160578468) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7607) * de8e893e40c189d6aaa3de4330df4c3065b8ae62 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160663230) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7619) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11765: [FLINK-8864][sql-client] Add cli command history in sql client and make it available after restarts
flinkbot edited a comment on issue #11765: [FLINK-8864][sql-client] Add cli command history in sql client and make it available after restarts URL: https://github.com/apache/flink/pull/11765#issuecomment-614430999 ## CI report: * 22eef4c43cd4fd0a6ddffb3cbef00a2fe522dac6 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160495175) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7557) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services