[GitHub] [flink] flinkbot edited a comment on pull request #13690: [FLINK-16595][YARN]support more HDFS nameServices in yarn mode when security enabled. Is…

2020-10-20 Thread GitBox


flinkbot edited a comment on pull request #13690:
URL: https://github.com/apache/flink/pull/13690#issuecomment-712304240


   
   ## CI report:
   
   * 67f01dfa8c82c0575564f7d20961f36a3d0c623b Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7951)
 Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7971)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13680: [FLINK-19553] [ Runtime / Web Frontend] The format of checkpoint Completion Time and Failure Time should be changed from HH:mm:ss to

2020-10-20 Thread GitBox


flinkbot edited a comment on pull request #13680:
URL: https://github.com/apache/flink/pull/13680#issuecomment-711592132


   
   ## CI report:
   
   * f9085414f0bcc7644d623260bdf50758046f95fd Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7887)
 
   * a040eaf5fdc1858602290024370747ee7d2c8e13 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7979)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-19483) PyFlink Table end-to-end test failed with "FileExistsError: [Errno 17] File exists: '/home/vsts/work/1/s/flink-python/dev/.conda/pkgs'"

2020-10-20 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218121#comment-17218121
 ] 

Dian Fu commented on FLINK-19483:
-

Fixed in master via 2a9af7f6275166c1671bd495dc092b9845b960da

> PyFlink Table end-to-end test failed with "FileExistsError: [Errno 17] File 
> exists: '/home/vsts/work/1/s/flink-python/dev/.conda/pkgs'"
> ---
>
> Key: FLINK-19483
> URL: https://issues.apache.org/jira/browse/FLINK-19483
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Tests
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=7130=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529
> {code}
> 2020-09-30T17:13:14.7489481Z Collecting package metadata 
> (current_repodata.json): ...working... failed
> 2020-09-30T17:13:14.7699351Z 
> 2020-09-30T17:13:14.765Z # >> ERROR REPORT 
> <<
> 2020-09-30T17:13:14.7700398Z 
> 2020-09-30T17:13:14.7700782Z Traceback (most recent call last):
> 2020-09-30T17:13:14.7702095Z   File 
> "/home/vsts/work/1/s/flink-python/dev/.conda/lib/python3.7/site-packages/conda/gateways/disk/update.py",
>  line 107, in touch
> 2020-09-30T17:13:14.7702736Z mkdir_p_sudo_safe(dirpath)
> 2020-09-30T17:13:14.7703608Z   File 
> "/home/vsts/work/1/s/flink-python/dev/.conda/lib/python3.7/site-packages/conda/gateways/disk/__init__.py",
>  line 84, in mkdir_p_sudo_safe
> 2020-09-30T17:13:14.7704221Z os.mkdir(path)
> 2020-09-30T17:13:14.7704992Z FileExistsError: [Errno 17] File exists: 
> '/home/vsts/work/1/s/flink-python/dev/.conda/pkgs'
> 2020-09-30T17:13:14.7705512Z 
> 2020-09-30T17:13:14.7705956Z During handling of the above exception, 
> another exception occurred:
> 2020-09-30T17:13:14.7706402Z 
> 2020-09-30T17:13:14.7706789Z Traceback (most recent call last):
> 2020-09-30T17:13:14.7707615Z   File 
> "/home/vsts/work/1/s/flink-python/dev/.conda/lib/python3.7/site-packages/conda/core/subdir_data.py",
>  line 185, in _load
> 2020-09-30T17:13:14.7708341Z mtime = getmtime(self.cache_path_json)
> 2020-09-30T17:13:14.7709527Z   File 
> "/home/vsts/work/1/s/flink-python/dev/.conda/lib/python3.7/site-packages/conda/core/subdir_data.py",
>  line 153, in cache_path_json
> 2020-09-30T17:13:14.7710340Z return self.cache_path_base + '.json'
> 2020-09-30T17:13:14.7711227Z   File 
> "/home/vsts/work/1/s/flink-python/dev/.conda/lib/python3.7/site-packages/conda/core/subdir_data.py",
>  line 144, in cache_path_base
> 2020-09-30T17:13:14.7711832Z create_cache_dir(),
> 2020-09-30T17:13:14.7712821Z   File 
> "/home/vsts/work/1/s/flink-python/dev/.conda/lib/python3.7/site-packages/conda/core/subdir_data.py",
>  line 645, in create_cache_dir
> 2020-09-30T17:13:14.7715308Z cache_dir = 
> join(PackageCacheData.first_writable(context.pkgs_dirs).pkgs_dir, 'cache')
> 2020-09-30T17:13:14.7715986Z   File 
> "/home/vsts/work/1/s/flink-python/dev/.conda/lib/python3.7/site-packages/conda/core/package_cache_data.py",
>  line 162, in first_writable
> 2020-09-30T17:13:14.7716407Z created = 
> create_package_cache_directory(package_cache.pkgs_dir)
> 2020-09-30T17:13:14.7717084Z   File 
> "/home/vsts/work/1/s/flink-python/dev/.conda/lib/python3.7/site-packages/conda/gateways/disk/create.py",
>  line 435, in create_package_cache_directory
> 2020-09-30T17:13:14.7717522Z touch(join(pkgs_dir, 
> PACKAGE_CACHE_MAGIC_FILE), mkdir=True, sudo_safe=sudo_safe)
> 2020-09-30T17:13:14.7718150Z   File 
> "/home/vsts/work/1/s/flink-python/dev/.conda/lib/python3.7/site-packages/conda/gateways/disk/update.py",
>  line 125, in touch
> 2020-09-30T17:13:14.7718694Z raise NotWritableError(path, e.errno, 
> caused_by=e)
> 2020-09-30T17:13:14.7719040Z conda.exceptions.NotWritableError: The 
> current user does not have write permissions to a required path.
> 2020-09-30T17:13:14.7719797Z   path: 
> /home/vsts/work/1/s/flink-python/dev/.conda/pkgs/urls.txt
> 2020-09-30T17:13:14.7720054Z   uid: 1001
> 2020-09-30T17:13:14.7720217Z   gid: 118
> 2020-09-30T17:13:14.7720375Z 
> 2020-09-30T17:13:14.7720625Z If you feel that permissions on this path 
> are set incorrectly, you can manually
> 2020-09-30T17:13:14.7720898Z change them by executing
> 2020-09-30T17:13:14.7721072Z 
> 2020-09-30T17:13:14.7721513Z   $ sudo chown 1001:118 
> /home/vsts/work/1/s/flink-python/dev/.conda/pkgs/urls.txt
> 

[jira] [Closed] (FLINK-19483) PyFlink Table end-to-end test failed with "FileExistsError: [Errno 17] File exists: '/home/vsts/work/1/s/flink-python/dev/.conda/pkgs'"

2020-10-20 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu closed FLINK-19483.
---
Resolution: Fixed

> PyFlink Table end-to-end test failed with "FileExistsError: [Errno 17] File 
> exists: '/home/vsts/work/1/s/flink-python/dev/.conda/pkgs'"
> ---
>
> Key: FLINK-19483
> URL: https://issues.apache.org/jira/browse/FLINK-19483
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Tests
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=7130=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529
> {code}
> 2020-09-30T17:13:14.7489481Z Collecting package metadata 
> (current_repodata.json): ...working... failed
> 2020-09-30T17:13:14.7699351Z 
> 2020-09-30T17:13:14.765Z # >> ERROR REPORT 
> <<
> 2020-09-30T17:13:14.7700398Z 
> 2020-09-30T17:13:14.7700782Z Traceback (most recent call last):
> 2020-09-30T17:13:14.7702095Z   File 
> "/home/vsts/work/1/s/flink-python/dev/.conda/lib/python3.7/site-packages/conda/gateways/disk/update.py",
>  line 107, in touch
> 2020-09-30T17:13:14.7702736Z mkdir_p_sudo_safe(dirpath)
> 2020-09-30T17:13:14.7703608Z   File 
> "/home/vsts/work/1/s/flink-python/dev/.conda/lib/python3.7/site-packages/conda/gateways/disk/__init__.py",
>  line 84, in mkdir_p_sudo_safe
> 2020-09-30T17:13:14.7704221Z os.mkdir(path)
> 2020-09-30T17:13:14.7704992Z FileExistsError: [Errno 17] File exists: 
> '/home/vsts/work/1/s/flink-python/dev/.conda/pkgs'
> 2020-09-30T17:13:14.7705512Z 
> 2020-09-30T17:13:14.7705956Z During handling of the above exception, 
> another exception occurred:
> 2020-09-30T17:13:14.7706402Z 
> 2020-09-30T17:13:14.7706789Z Traceback (most recent call last):
> 2020-09-30T17:13:14.7707615Z   File 
> "/home/vsts/work/1/s/flink-python/dev/.conda/lib/python3.7/site-packages/conda/core/subdir_data.py",
>  line 185, in _load
> 2020-09-30T17:13:14.7708341Z mtime = getmtime(self.cache_path_json)
> 2020-09-30T17:13:14.7709527Z   File 
> "/home/vsts/work/1/s/flink-python/dev/.conda/lib/python3.7/site-packages/conda/core/subdir_data.py",
>  line 153, in cache_path_json
> 2020-09-30T17:13:14.7710340Z return self.cache_path_base + '.json'
> 2020-09-30T17:13:14.7711227Z   File 
> "/home/vsts/work/1/s/flink-python/dev/.conda/lib/python3.7/site-packages/conda/core/subdir_data.py",
>  line 144, in cache_path_base
> 2020-09-30T17:13:14.7711832Z create_cache_dir(),
> 2020-09-30T17:13:14.7712821Z   File 
> "/home/vsts/work/1/s/flink-python/dev/.conda/lib/python3.7/site-packages/conda/core/subdir_data.py",
>  line 645, in create_cache_dir
> 2020-09-30T17:13:14.7715308Z cache_dir = 
> join(PackageCacheData.first_writable(context.pkgs_dirs).pkgs_dir, 'cache')
> 2020-09-30T17:13:14.7715986Z   File 
> "/home/vsts/work/1/s/flink-python/dev/.conda/lib/python3.7/site-packages/conda/core/package_cache_data.py",
>  line 162, in first_writable
> 2020-09-30T17:13:14.7716407Z created = 
> create_package_cache_directory(package_cache.pkgs_dir)
> 2020-09-30T17:13:14.7717084Z   File 
> "/home/vsts/work/1/s/flink-python/dev/.conda/lib/python3.7/site-packages/conda/gateways/disk/create.py",
>  line 435, in create_package_cache_directory
> 2020-09-30T17:13:14.7717522Z touch(join(pkgs_dir, 
> PACKAGE_CACHE_MAGIC_FILE), mkdir=True, sudo_safe=sudo_safe)
> 2020-09-30T17:13:14.7718150Z   File 
> "/home/vsts/work/1/s/flink-python/dev/.conda/lib/python3.7/site-packages/conda/gateways/disk/update.py",
>  line 125, in touch
> 2020-09-30T17:13:14.7718694Z raise NotWritableError(path, e.errno, 
> caused_by=e)
> 2020-09-30T17:13:14.7719040Z conda.exceptions.NotWritableError: The 
> current user does not have write permissions to a required path.
> 2020-09-30T17:13:14.7719797Z   path: 
> /home/vsts/work/1/s/flink-python/dev/.conda/pkgs/urls.txt
> 2020-09-30T17:13:14.7720054Z   uid: 1001
> 2020-09-30T17:13:14.7720217Z   gid: 118
> 2020-09-30T17:13:14.7720375Z 
> 2020-09-30T17:13:14.7720625Z If you feel that permissions on this path 
> are set incorrectly, you can manually
> 2020-09-30T17:13:14.7720898Z change them by executing
> 2020-09-30T17:13:14.7721072Z 
> 2020-09-30T17:13:14.7721513Z   $ sudo chown 1001:118 
> /home/vsts/work/1/s/flink-python/dev/.conda/pkgs/urls.txt
> 2020-09-30T17:13:14.7721778Z 
> 2020-09-30T17:13:14.7722334Z In general, it's 

[GitHub] [flink] dianfu merged pull request #13702: [FLINK-19483][python][e2] Remove conda install zip

2020-10-20 Thread GitBox


dianfu merged pull request #13702:
URL: https://github.com/apache/flink/pull/13702


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13680: [FLINK-19553] [ Runtime / Web Frontend] The format of checkpoint Completion Time and Failure Time should be changed from HH:mm:ss to

2020-10-20 Thread GitBox


flinkbot edited a comment on pull request #13680:
URL: https://github.com/apache/flink/pull/13680#issuecomment-711592132


   
   ## CI report:
   
   * f9085414f0bcc7644d623260bdf50758046f95fd Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7887)
 
   * a040eaf5fdc1858602290024370747ee7d2c8e13 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13702: [FLINK-19483][python][e2] Remove conda install zip

2020-10-20 Thread GitBox


flinkbot edited a comment on pull request #13702:
URL: https://github.com/apache/flink/pull/13702#issuecomment-712809420


   
   ## CI report:
   
   * d2a85a4e1d5b90aca82b49573e0cf544df13b1e2 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7937)
 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7972)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wangyang0918 commented on a change in pull request #13644: [FLINK-19542][k8s]Implement LeaderElectionService and LeaderRetrievalService based on Kubernetes API

2020-10-20 Thread GitBox


wangyang0918 commented on a change in pull request #13644:
URL: https://github.com/apache/flink/pull/13644#discussion_r508997664



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import 
org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.extended.leaderelection.LeaderCallbacks;
+import 
io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfigBuilder;
+import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector;
+import 
io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.ConfigMapLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+
+/**
+ * Represent {@link KubernetesLeaderElector} in kubernetes. {@link 
LeaderElector#run()} is a blocking call. It should be
+ *  run in the IO executor, not the main thread. The lifecycle is bound to 
single leader election. Once the leadership
+ * is revoked, as well as the {@link LeaderCallbackHandler#notLeader()} is 
called, the {@link LeaderElector#run()} will
+ * finish. To start another round of election, we need to trigger again.
+ */
+public class KubernetesLeaderElector extends 
LeaderElector {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesLeaderElector.class);
+   protected static final String LOCK_IDENTITY = 
UUID.randomUUID().toString();

Review comment:
   IIUC, each Flink cluster will have separate cluster-id. So they will 
have different ConfigMap name just like following.
   * k8s-ha-app1-restserver-leader
   * k8s-ha-app2-restserver-leader
   
   For multiple JobManagers in a Flink cluster(session mode), each JobManager 
will have a separate ConfigMap binding to the job id.
   * k8s-ha-app1--jobmanager-leader
   * k8s-ha-app1-0002-jobmanager-leader
   
   Both the situation, I think one `LOCK_IDENTITY` could work. But follow 
xintong's suggestion, I will make it could be specified via 
`KubernetesLeaderElectionConfiguration`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19746) Why delete support for "as" function

2020-10-20 Thread appleyuchi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

appleyuchi updated FLINK-19746:
---
Description: 
I know the solution for my following question,so I'm not  asking for support to 
debug [this code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]


when I use the function "as("x","y","z")." in my code,
two available *preparation code* for "create table":

||relevant code||running log||Deprecated?||
|tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
|tEnv.createTemporaryView("Orders", 
orderA,$("user,product,amount"));|user,product,amount is not a field of type 
PojoType. 
Expected: amount, product, user}|NO|


Question:
why *the support for "as" is REMOVED* when the *newest officially recommended 
createTemporaryView* is used?











  was:
I know the solution for my following question,so I'm not  asking for support to 
debug [this code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]


when I use the function "as("x","y","z")." in my code,
two available *preparation code* for "create table":

||relevant code||running log||Deprecated?||
|tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
|tEnv.createTemporaryView("Orders", 
orderA,$("user,product,amount"));|user,product,amount is not a field of type 
PojoType. 
Expected: amount, product, user}|NO|


Question:
why the support for "as" is deleted when the *newest officially recommended 
createTemporaryView* is used?












> Why delete support for "as" function
> 
>
> Key: FLINK-19746
> URL: https://issues.apache.org/jira/browse/FLINK-19746
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.11.0
>Reporter: appleyuchi
>Priority: Major
>
> I know the solution for my following question,so I'm not  asking for support 
> to debug [this code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]
> when I use the function "as("x","y","z")." in my code,
> two available *preparation code* for "create table":
> ||relevant code||running log||Deprecated?||
> |tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
> |tEnv.createTemporaryView("Orders", 
> orderA,$("user,product,amount"));|user,product,amount is not a field of type 
> PojoType. 
> Expected: amount, product, user}|NO|
> Question:
> why *the support for "as" is REMOVED* when the *newest officially recommended 
> createTemporaryView* is used?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19746) Why delete support for "as" function

2020-10-20 Thread appleyuchi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

appleyuchi updated FLINK-19746:
---
Description: 
I know the solution for my following question,so I'm not  asking for support to 
debug [this code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]


when I use the function "as("x","y","z")." in my code,
two available *preparation code* for "create table":

||relevant code||running log||Deprecated?||
|tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
|tEnv.createTemporaryView("Orders", 
orderA,$("user,product,amount"));|user,product,amount is not a field of type 
PojoType. 
Expected: amount, product, user}|NO|


Question:
why the support for "as" is deleted when the *newest officially recommended 
createTemporaryView* is used?











  was:
I know the solution for my following question,so I'm not  asking for support to 
debug [this code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]


when I use the function "as("x","y","z")." in my code,
two available solutions for "create table":

||relevant code||running log||Deprecated?||
|tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
|tEnv.createTemporaryView("Orders", 
orderA,$("user,product,amount"));|user,product,amount is not a field of type 
PojoType. 
Expected: amount, product, user}|NO|


Question:
why the support for "as" is deleted when the *newest officially recommended 
createTemporaryView* is used?












> Why delete support for "as" function
> 
>
> Key: FLINK-19746
> URL: https://issues.apache.org/jira/browse/FLINK-19746
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.11.0
>Reporter: appleyuchi
>Priority: Major
>
> I know the solution for my following question,so I'm not  asking for support 
> to debug [this code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]
> when I use the function "as("x","y","z")." in my code,
> two available *preparation code* for "create table":
> ||relevant code||running log||Deprecated?||
> |tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
> |tEnv.createTemporaryView("Orders", 
> orderA,$("user,product,amount"));|user,product,amount is not a field of type 
> PojoType. 
> Expected: amount, product, user}|NO|
> Question:
> why the support for "as" is deleted when the *newest officially recommended 
> createTemporaryView* is used?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wangyang0918 commented on a change in pull request #13644: [FLINK-19542][k8s]Implement LeaderElectionService and LeaderRetrievalService based on Kubernetes API

2020-10-20 Thread GitBox


wangyang0918 commented on a change in pull request #13644:
URL: https://github.com/apache/flink/pull/13644#discussion_r508995602



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionService.java
##
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import 
org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import 
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.leaderelection.AbstractLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.util.function.FunctionUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static 
org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
+import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY;
+import static 
org.apache.flink.kubernetes.utils.Constants.LEADER_SESSION_ID_KEY;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Leader election service for multiple JobManagers. The active JobManager is 
elected using Kubernetes.
+ * The current leader's address as well as its leader session ID is published 
via Kubernetes ConfigMap.
+ * Note that the contending lock and leader storage are using the same 
ConfigMap. And every component(e.g.
+ * ResourceManager, Dispatcher, RestEndpoint, JobManager for each job) will 
have a separate ConfigMap.
+ */
+public class KubernetesLeaderElectionService extends 
AbstractLeaderElectionService {
+
+   private final FlinkKubeClient kubeClient;
+
+   private final Executor executor;
+
+   private final String configMapName;
+
+   private final KubernetesLeaderElector leaderElector;
+
+   private KubernetesWatch kubernetesWatch;
+
+   // Labels will be used to clean up the ha related ConfigMaps.
+   private Map configMapLabels;
+
+   KubernetesLeaderElectionService(
+   FlinkKubeClient kubeClient,
+   Executor executor,
+   KubernetesLeaderElectionConfiguration leaderConfig) {
+
+   this.kubeClient = checkNotNull(kubeClient, "Kubernetes client 
should not be null.");
+   this.executor = checkNotNull(executor, "Executor should not be 
null.");
+   this.configMapName = leaderConfig.getConfigMapName();
+   this.leaderElector = 
kubeClient.createLeaderElector(leaderConfig, new LeaderCallbackHandlerImpl());
+   this.leaderContender = null;
+   this.configMapLabels = KubernetesUtils.getConfigMapLabels(
+   leaderConfig.getClusterId(), 
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY);
+   }
+
+   @Override
+   public void internalStart(LeaderContender contender) {
+   CompletableFuture.runAsync(leaderElector::run, executor);
+   kubernetesWatch = kubeClient.watchConfigMaps(configMapName, new 
ConfigMapCallbackHandlerImpl());
+   }
+
+   @Override
+   public void internalStop() {
+   if (kubernetesWatch != null) {
+   kubernetesWatch.close();
+   }
+   }
+
+   @Override
+   protected void writeLeaderInformation() {
+   try {
+   kubeClient.checkAndUpdateConfigMap(
+   configMapName,
+   configMap -> {
+   if 
(leaderElector.hasLeadership(configMap)) {
+   // Get the updated ConfigMap 
with new leader information
+   

[GitHub] [flink] wangyang0918 commented on a change in pull request #13644: [FLINK-19542][k8s]Implement LeaderElectionService and LeaderRetrievalService based on Kubernetes API

2020-10-20 Thread GitBox


wangyang0918 commented on a change in pull request #13644:
URL: https://github.com/apache/flink/pull/13644#discussion_r508995324



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionService.java
##
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import 
org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import 
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.leaderelection.AbstractLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.util.function.FunctionUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static 
org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
+import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY;
+import static 
org.apache.flink.kubernetes.utils.Constants.LEADER_SESSION_ID_KEY;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Leader election service for multiple JobManagers. The active JobManager is 
elected using Kubernetes.
+ * The current leader's address as well as its leader session ID is published 
via Kubernetes ConfigMap.
+ * Note that the contending lock and leader storage are using the same 
ConfigMap. And every component(e.g.
+ * ResourceManager, Dispatcher, RestEndpoint, JobManager for each job) will 
have a separate ConfigMap.
+ */
+public class KubernetesLeaderElectionService extends 
AbstractLeaderElectionService {
+
+   private final FlinkKubeClient kubeClient;
+
+   private final Executor executor;
+
+   private final String configMapName;
+
+   private final KubernetesLeaderElector leaderElector;
+
+   private KubernetesWatch kubernetesWatch;
+
+   // Labels will be used to clean up the ha related ConfigMaps.
+   private Map configMapLabels;
+
+   KubernetesLeaderElectionService(
+   FlinkKubeClient kubeClient,
+   Executor executor,
+   KubernetesLeaderElectionConfiguration leaderConfig) {
+
+   this.kubeClient = checkNotNull(kubeClient, "Kubernetes client 
should not be null.");
+   this.executor = checkNotNull(executor, "Executor should not be 
null.");
+   this.configMapName = leaderConfig.getConfigMapName();
+   this.leaderElector = 
kubeClient.createLeaderElector(leaderConfig, new LeaderCallbackHandlerImpl());
+   this.leaderContender = null;
+   this.configMapLabels = KubernetesUtils.getConfigMapLabels(
+   leaderConfig.getClusterId(), 
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY);
+   }
+
+   @Override
+   public void internalStart(LeaderContender contender) {
+   CompletableFuture.runAsync(leaderElector::run, executor);
+   kubernetesWatch = kubeClient.watchConfigMaps(configMapName, new 
ConfigMapCallbackHandlerImpl());
+   }
+
+   @Override
+   public void internalStop() {
+   if (kubernetesWatch != null) {
+   kubernetesWatch.close();
+   }
+   }
+
+   @Override
+   protected void writeLeaderInformation() {
+   try {
+   kubeClient.checkAndUpdateConfigMap(
+   configMapName,
+   configMap -> {
+   if 
(leaderElector.hasLeadership(configMap)) {
+   // Get the updated ConfigMap 
with new leader information
+   

[jira] [Updated] (FLINK-19746) Why delete support for "as" function

2020-10-20 Thread appleyuchi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

appleyuchi updated FLINK-19746:
---
Description: 
I know the solution for my following question,so I'm not  asking for support to 
debug [this code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]


when I use the function "as("x","y","z")." in my code,
two available solutions for "create table":

||relevant code||running log||Deprecated?||
|tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
|tEnv.createTemporaryView("Orders", 
orderA,$("user,product,amount"));|user,product,amount is not a field of type 
PojoType. 
Expected: amount, product, user}|NO|


Question:
why the support for "as" is deleted when the *newest officially recommended 
createTemporaryView* is used?











  was:
I know the solution for my following question,so I'm not  asking for support to 
debug [this code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]


when I use the function "as("x","y","z")." in my code,
two available solutions for "create table":

||relevant code||log||Deprecated?||
|tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
|tEnv.createTemporaryView("Orders", 
orderA,$("user,product,amount"));|user,product,amount is not a field of type 
PojoType. 
Expected: amount, product, user}|NO|


Question:
why the support for "as" is deleted when the *newest officially recommended 
createTemporaryView* is used?












> Why delete support for "as" function
> 
>
> Key: FLINK-19746
> URL: https://issues.apache.org/jira/browse/FLINK-19746
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.11.0
>Reporter: appleyuchi
>Priority: Major
>
> I know the solution for my following question,so I'm not  asking for support 
> to debug [this code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]
> when I use the function "as("x","y","z")." in my code,
> two available solutions for "create table":
> ||relevant code||running log||Deprecated?||
> |tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
> |tEnv.createTemporaryView("Orders", 
> orderA,$("user,product,amount"));|user,product,amount is not a field of type 
> PojoType. 
> Expected: amount, product, user}|NO|
> Question:
> why the support for "as" is deleted when the *newest officially recommended 
> createTemporaryView* is used?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19746) Why delete support for "as" function

2020-10-20 Thread appleyuchi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

appleyuchi updated FLINK-19746:
---
Description: 
I know the solution for my following question,so I'm not  asking for support to 
debug [this code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]


when I use the function "as("x","y","z")." in my code,
two available solutions for "create table":

||relevant code||log||Deprecated?||
|tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
|tEnv.createTemporaryView("Orders", 
orderA,$("user,product,amount"));|user,product,amount is not a field of type 
PojoType. 
Expected: amount, product, user}|NO|


Question:
why the support for "as" is deleted when the *newest officially recommended 
createTemporaryView* is used?











  was:
I know the solution for my following question,so I'm not  asking for support to 
debug [this code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]


when I use the function "as("x","y","z")." in my code,
two available solutions for "create table":

||relevant code||log||Deprecated?||
|tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
|tEnv.createTemporaryView("Orders", 
orderA,$("user,product,amount"));|user,product,amount is not a field of type 
PojoType. 
Expected: amount, product, user}|NO|


Question:
why the support for "as" is deleted when the* newest officially recommended 
createTemporaryView* is used?












> Why delete support for "as" function
> 
>
> Key: FLINK-19746
> URL: https://issues.apache.org/jira/browse/FLINK-19746
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.11.0
>Reporter: appleyuchi
>Priority: Major
>
> I know the solution for my following question,so I'm not  asking for support 
> to debug [this code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]
> when I use the function "as("x","y","z")." in my code,
> two available solutions for "create table":
> ||relevant code||log||Deprecated?||
> |tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
> |tEnv.createTemporaryView("Orders", 
> orderA,$("user,product,amount"));|user,product,amount is not a field of type 
> PojoType. 
> Expected: amount, product, user}|NO|
> Question:
> why the support for "as" is deleted when the *newest officially recommended 
> createTemporaryView* is used?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19746) Why delete support for "as" function

2020-10-20 Thread appleyuchi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

appleyuchi updated FLINK-19746:
---
Description: 
I know the solution for my following question,so I'm not  asking for support to 
debug [this code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]


when I use the function "as("x","y","z")." in my code,
two available solutions for "create table":

||relevant code||log||Deprecated?||
|tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
|tEnv.createTemporaryView("Orders", 
orderA,$("user,product,amount"));|user,product,amount is not a field of type 
PojoType. 
Expected: amount, product, user}|NO|


Question:
why the support for "as" is deleted when the* newest officially recommended 
createTemporaryView* is used?











  was:
I know the solution for my following question,so I'm not  asking for support to 
debug [this code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]


when I use the function "as("x","y","z")." in my code,
two available solutions for "create table":

||relevant code||log||Deprecated?||
|tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
|tEnv.createTemporaryView("Orders", 
orderA,$("user,product,amount"));|user,product,amount is not a field of type 
PojoType. 
Expected: amount, product, user}|NO|


Question:
why the support for "as" is deleted when createTemporaryView is used?












> Why delete support for "as" function
> 
>
> Key: FLINK-19746
> URL: https://issues.apache.org/jira/browse/FLINK-19746
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.11.0
>Reporter: appleyuchi
>Priority: Major
>
> I know the solution for my following question,so I'm not  asking for support 
> to debug [this code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]
> when I use the function "as("x","y","z")." in my code,
> two available solutions for "create table":
> ||relevant code||log||Deprecated?||
> |tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
> |tEnv.createTemporaryView("Orders", 
> orderA,$("user,product,amount"));|user,product,amount is not a field of type 
> PojoType. 
> Expected: amount, product, user}|NO|
> Question:
> why the support for "as" is deleted when the* newest officially recommended 
> createTemporaryView* is used?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19746) Why delete support for "as" function

2020-10-20 Thread appleyuchi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

appleyuchi updated FLINK-19746:
---
Description: 
I know the solution for my following question,so I'm not  asking for support to 
debug [this code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]


when I use the function "as("x","y","z")." in my code,
two available solutions for "create table":

||relevant code||log||Deprecated?||
|tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
|tEnv.createTemporaryView("Orders", 
orderA,$("user,product,amount"));|user,product,amount is not a field of type 
PojoType. 
Expected: amount, product, user}|NO|


Question:
why the support for as is deleted when createTemporaryView is used?











  was:
I know the solution for my following question,so I'm not  asking for support to 
debug [this code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]


when I use the function "as("x","y","z")." in my code,
two available solutions for "create table":

||relevant code||log||Deprecated?||
|tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
|tEnv.createTemporaryView("Orders", 
orderA,$("user,product,amount"));|user,product,amount is not a field of type 
PojoType. 
Expected: amount, product, user}|NO|


Question:
why the support for AS is deleted when createTemporaryView is used?












> Why delete support for "as" function
> 
>
> Key: FLINK-19746
> URL: https://issues.apache.org/jira/browse/FLINK-19746
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.11.0
>Reporter: appleyuchi
>Priority: Major
>
> I know the solution for my following question,so I'm not  asking for support 
> to debug [this code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]
> when I use the function "as("x","y","z")." in my code,
> two available solutions for "create table":
> ||relevant code||log||Deprecated?||
> |tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
> |tEnv.createTemporaryView("Orders", 
> orderA,$("user,product,amount"));|user,product,amount is not a field of type 
> PojoType. 
> Expected: amount, product, user}|NO|
> Question:
> why the support for as is deleted when createTemporaryView is used?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wangyang0918 commented on a change in pull request #13644: [FLINK-19542][k8s]Implement LeaderElectionService and LeaderRetrievalService based on Kubernetes API

2020-10-20 Thread GitBox


wangyang0918 commented on a change in pull request #13644:
URL: https://github.com/apache/flink/pull/13644#discussion_r508994663



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionService.java
##
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import 
org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import 
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.leaderelection.AbstractLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.util.function.FunctionUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static 
org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
+import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY;
+import static 
org.apache.flink.kubernetes.utils.Constants.LEADER_SESSION_ID_KEY;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Leader election service for multiple JobManagers. The active JobManager is 
elected using Kubernetes.
+ * The current leader's address as well as its leader session ID is published 
via Kubernetes ConfigMap.
+ * Note that the contending lock and leader storage are using the same 
ConfigMap. And every component(e.g.
+ * ResourceManager, Dispatcher, RestEndpoint, JobManager for each job) will 
have a separate ConfigMap.
+ */
+public class KubernetesLeaderElectionService extends 
AbstractLeaderElectionService {
+
+   private final FlinkKubeClient kubeClient;
+
+   private final Executor executor;
+
+   private final String configMapName;
+
+   private final KubernetesLeaderElector leaderElector;
+
+   private KubernetesWatch kubernetesWatch;
+
+   // Labels will be used to clean up the ha related ConfigMaps.
+   private Map configMapLabels;
+
+   KubernetesLeaderElectionService(
+   FlinkKubeClient kubeClient,
+   Executor executor,
+   KubernetesLeaderElectionConfiguration leaderConfig) {
+
+   this.kubeClient = checkNotNull(kubeClient, "Kubernetes client 
should not be null.");
+   this.executor = checkNotNull(executor, "Executor should not be 
null.");
+   this.configMapName = leaderConfig.getConfigMapName();
+   this.leaderElector = 
kubeClient.createLeaderElector(leaderConfig, new LeaderCallbackHandlerImpl());
+   this.leaderContender = null;
+   this.configMapLabels = KubernetesUtils.getConfigMapLabels(
+   leaderConfig.getClusterId(), 
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY);
+   }
+
+   @Override
+   public void internalStart(LeaderContender contender) {
+   CompletableFuture.runAsync(leaderElector::run, executor);
+   kubernetesWatch = kubeClient.watchConfigMaps(configMapName, new 
ConfigMapCallbackHandlerImpl());
+   }
+
+   @Override
+   public void internalStop() {
+   if (kubernetesWatch != null) {
+   kubernetesWatch.close();
+   }
+   }
+
+   @Override
+   protected void writeLeaderInformation() {
+   try {
+   kubeClient.checkAndUpdateConfigMap(
+   configMapName,
+   configMap -> {
+   if 
(leaderElector.hasLeadership(configMap)) {
+   // Get the updated ConfigMap 
with new leader information
+   

[jira] [Updated] (FLINK-19746) Why delete support for "as" function

2020-10-20 Thread appleyuchi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

appleyuchi updated FLINK-19746:
---
Description: 
I know the solution for my following question,so I'm not  asking for support to 
debug [this code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]


when I use the function "as("x","y","z")." in my code,
two available solutions for "create table":

||relevant code||log||Deprecated?||
|tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
|tEnv.createTemporaryView("Orders", 
orderA,$("user,product,amount"));|user,product,amount is not a field of type 
PojoType. 
Expected: amount, product, user}|NO|


Question:
why the support for "as" is deleted when createTemporaryView is used?











  was:
I know the solution for my following question,so I'm not  asking for support to 
debug [this code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]


when I use the function "as("x","y","z")." in my code,
two available solutions for "create table":

||relevant code||log||Deprecated?||
|tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
|tEnv.createTemporaryView("Orders", 
orderA,$("user,product,amount"));|user,product,amount is not a field of type 
PojoType. 
Expected: amount, product, user}|NO|


Question:
why the support for as is deleted when createTemporaryView is used?












> Why delete support for "as" function
> 
>
> Key: FLINK-19746
> URL: https://issues.apache.org/jira/browse/FLINK-19746
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.11.0
>Reporter: appleyuchi
>Priority: Major
>
> I know the solution for my following question,so I'm not  asking for support 
> to debug [this code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]
> when I use the function "as("x","y","z")." in my code,
> two available solutions for "create table":
> ||relevant code||log||Deprecated?||
> |tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
> |tEnv.createTemporaryView("Orders", 
> orderA,$("user,product,amount"));|user,product,amount is not a field of type 
> PojoType. 
> Expected: amount, product, user}|NO|
> Question:
> why the support for "as" is deleted when createTemporaryView is used?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wangyang0918 commented on a change in pull request #13644: [FLINK-19542][k8s]Implement LeaderElectionService and LeaderRetrievalService based on Kubernetes API

2020-10-20 Thread GitBox


wangyang0918 commented on a change in pull request #13644:
URL: https://github.com/apache/flink/pull/13644#discussion_r508994167



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
##
@@ -219,6 +230,71 @@ public KubernetesWatch watchPodsAndDoCallback(
.watch(new 
KubernetesPodsWatcher(podCallbackHandler)));
}
 
+   @Override
+   public CompletableFuture createConfigMap(KubernetesConfigMap 
configMap) {
+   final String configMapName = configMap.getName();
+   return CompletableFuture.runAsync(
+   () -> 
this.internalClient.configMaps().inNamespace(namespace).create(configMap.getInternalResource()),
+   kubeClientExecutorService)
+   .whenComplete((ignored, throwable) -> {
+   if (throwable != null) {
+   throw new FlinkRuntimeException("Failed 
to create ConfigMap " + configMapName, throwable);
+   }
+   });
+   }
+
+   @Override
+   public Optional getConfigMap(String name) {
+   final ConfigMap configMap = 
this.internalClient.configMaps().inNamespace(namespace).withName(name).get();
+   return configMap == null ? Optional.empty() : Optional.of(new 
KubernetesConfigMap(configMap));
+   }
+
+   @Override
+   public CompletableFuture checkAndUpdateConfigMap(
+   String configMapName,
+   FunctionWithException, ?> function) {
+   return FutureUtils.retry(
+   () -> CompletableFuture.supplyAsync(
+   () -> getConfigMap(configMapName)
+   
.map(FunctionUtils.uncheckedFunction(configMap -> {
+   final boolean updated = 
function.apply(configMap).map(
+   updatedConfigMap -> {
+   
this.internalClient.configMaps()
+   
.inNamespace(namespace)
+   
.createOrReplace(updatedConfigMap.getInternalResource());
+   return true;
+   }).orElse(false);
+   if (!updated) {
+   LOG.warn("Trying to 
update ConfigMap {} to {} without checking pass, ignoring.",
+   
configMap.getName(), configMap.getData());
+   }
+   return updated;
+   }))
+   .orElseThrow(
+   () -> new 
FlinkRuntimeException("ConfigMap " + configMapName + " not exists.")),

Review comment:
   If we agree to not handle the externally deletion/update, then we do not 
need to retry if the ConfigMap not exists.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wangyang0918 commented on a change in pull request #13644: [FLINK-19542][k8s]Implement LeaderElectionService and LeaderRetrievalService based on Kubernetes API

2020-10-20 Thread GitBox


wangyang0918 commented on a change in pull request #13644:
URL: https://github.com/apache/flink/pull/13644#discussion_r508993402



##
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalServiceTest.java
##
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.utils.Constants;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link KubernetesLeaderRetrievalService}.
+ */
+public class KubernetesLeaderRetrievalServiceTest extends 
KubernetesHighAvailabilityTestBase {

Review comment:
   Just like Yarn MiniCluster, we are not very simple to start a minikube. 
So I prefer to add the unit tests for the contract testing first and leave the 
integration tests with E2E tests together. WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19746) Why delete support for "as" function

2020-10-20 Thread appleyuchi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

appleyuchi updated FLINK-19746:
---
Description: 
I know the solution for my following question,so I'm not  asking for support to 
debug [this code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]


when I use the function "as("x","y","z")." in my code,
two available solutions for "create table":

||relevant code||log||Deprecated?||
|tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
|tEnv.createTemporaryView("Orders", 
orderA,$("user,product,amount"));|user,product,amount is not a field of type 
PojoType. 
Expected: amount, product, user}|NO|


Question:
why the support for AS is deleted when createTemporaryView is used?











  was:
I know the solution for my following question,so I'm not  asking for support to 
debug [this code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]


when I use the function "as("x","y","z")." in my code,
two available solutions:

||relevant code||log||Deprecated?||
|tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
|tEnv.createTemporaryView("Orders", 
orderA,$("user,product,amount"));|user,product,amount is not a field of type 
PojoType. 
Expected: amount, product, user}|NO|


Question:
why the support for AS is deleted when createTemporaryView is used?












> Why delete support for "as" function
> 
>
> Key: FLINK-19746
> URL: https://issues.apache.org/jira/browse/FLINK-19746
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.11.0
>Reporter: appleyuchi
>Priority: Major
>
> I know the solution for my following question,so I'm not  asking for support 
> to debug [this code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]
> when I use the function "as("x","y","z")." in my code,
> two available solutions for "create table":
> ||relevant code||log||Deprecated?||
> |tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
> |tEnv.createTemporaryView("Orders", 
> orderA,$("user,product,amount"));|user,product,amount is not a field of type 
> PojoType. 
> Expected: amount, product, user}|NO|
> Question:
> why the support for AS is deleted when createTemporaryView is used?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #13714: [FLINK-19718][hive] HiveTableSourceITCase.testStreamPartitionRead is not stable

2020-10-20 Thread GitBox


flinkbot edited a comment on pull request #13714:
URL: https://github.com/apache/flink/pull/13714#issuecomment-713300898


   
   ## CI report:
   
   * da5593d4b8197d75c0a0aa5d92eb794bc0fe0ec6 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7978)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19746) Why delete support for "as" function

2020-10-20 Thread appleyuchi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

appleyuchi updated FLINK-19746:
---
Description: 
I know the solution for my following question,so I'm not  asking for support to 
debug [this code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]


when I use the function "as("x","y","z")." in my code,
two available solutions:

||relevant code||log||Deprecated?||
|tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
|tEnv.createTemporaryView("Orders", 
orderA,$("user,product,amount"));|user,product,amount is not a field of type 
PojoType. 
Expected: amount, product, user}|NO|


Question:
why the support for AS is deleted when createTemporaryView is used?











  was:
I know the solution for my following question,So it's not  asking for support 
to debug [this code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]


when I use the function *as* in my code,
two available solutions:

||relevant code||log||Deprecated?||
|tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
|tEnv.createTemporaryView("Orders", 
orderA,$("user,product,amount"));|user,product,amount is not a field of type 
PojoType. 
Expected: amount, product, user}|NO|


Question:
why the support for AS is deleted when createTemporaryView is used?












> Why delete support for "as" function
> 
>
> Key: FLINK-19746
> URL: https://issues.apache.org/jira/browse/FLINK-19746
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.11.0
>Reporter: appleyuchi
>Priority: Major
>
> I know the solution for my following question,so I'm not  asking for support 
> to debug [this code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]
> when I use the function "as("x","y","z")." in my code,
> two available solutions:
> ||relevant code||log||Deprecated?||
> |tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
> |tEnv.createTemporaryView("Orders", 
> orderA,$("user,product,amount"));|user,product,amount is not a field of type 
> PojoType. 
> Expected: amount, product, user}|NO|
> Question:
> why the support for AS is deleted when createTemporaryView is used?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19746) Why delete support for "as" function

2020-10-20 Thread appleyuchi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

appleyuchi updated FLINK-19746:
---
Description: 
I know the solution for my following question,So it's not  asking for support 
to debug [this code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]


when I use the function *as* in my code,
two available solutions:

||relevant code||log||Deprecated?||
|tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
|tEnv.createTemporaryView("Orders", 
orderA,$("user,product,amount"));|user,product,amount is not a field of type 
PojoType. 
Expected: amount, product, user}|NO|


Question:
why the support for AS is deleted when createTemporaryView is used?











  was:
I know the solution for my following question,
So it's not  asking for support to debug code.


[completed code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]:


when I use the function *as* in my code,


||relevant code||log||Deprecated?||
|tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
|tEnv.createTemporaryView("Orders", 
orderA,$("user,product,amount"));|user,product,amount is not a field of type 
PojoType. 
Expected: amount, product, user}|NO|


Question:
why the support for AS is deleted when createTemporaryView is used?












> Why delete support for "as" function
> 
>
> Key: FLINK-19746
> URL: https://issues.apache.org/jira/browse/FLINK-19746
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.11.0
>Reporter: appleyuchi
>Priority: Major
>
> I know the solution for my following question,So it's not  asking for support 
> to debug [this code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]
> when I use the function *as* in my code,
> two available solutions:
> ||relevant code||log||Deprecated?||
> |tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
> |tEnv.createTemporaryView("Orders", 
> orderA,$("user,product,amount"));|user,product,amount is not a field of type 
> PojoType. 
> Expected: amount, product, user}|NO|
> Question:
> why the support for AS is deleted when createTemporaryView is used?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] pyscala commented on pull request #13680: [FLINK-19553] [ Runtime / Web Frontend] The format of checkpoint Completion Time and Failure Time should be changed from HH:mm:ss to yyyy-MM-

2020-10-20 Thread GitBox


pyscala commented on pull request #13680:
URL: https://github.com/apache/flink/pull/13680#issuecomment-713305996


   
![image](https://user-images.githubusercontent.com/20840332/96675769-3e8b2900-139e-11eb-99ea-c624457d0b1b.png)
   @Myasuka Done



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19746) Why delete support for "as" function

2020-10-20 Thread appleyuchi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

appleyuchi updated FLINK-19746:
---
Description: 
I know the solution for my following question,
So it's not  asking for support to debug code.


[completed code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]:


when I use the function *as* in my code,


||relevant code||log||Deprecated?||
|tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
|tEnv.createTemporaryView("Orders", 
orderA,$("user,product,amount"));|user,product,amount is not a field of type 
PojoType. 
Expected: amount, product, user}|NO|


Question:
why the support for AS is deleted when createTemporaryView is used?











  was:
I know the solution for my following question,
So it's not  asking for support to debug code.


[completed code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]:


when I use the function *as* in my code,


||relevant code||log||
|tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|
|tEnv.createTemporaryView("Orders", 
orderA,$("user,product,amount"));|user,product,amount is not a field of type 
PojoType. 
Expected: amount, product, user}|




---

registerDataStream is deprecated,
so the best way is to use createTemporaryView.

Question:
why the support for AS is deleted when createTemporaryView is used?












> Why delete support for "as" function
> 
>
> Key: FLINK-19746
> URL: https://issues.apache.org/jira/browse/FLINK-19746
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.11.0
>Reporter: appleyuchi
>Priority: Major
>
> I know the solution for my following question,
> So it's not  asking for support to debug code.
> [completed code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]:
> when I use the function *as* in my code,
> ||relevant code||log||Deprecated?||
> |tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|YES|
> |tEnv.createTemporaryView("Orders", 
> orderA,$("user,product,amount"));|user,product,amount is not a field of type 
> PojoType. 
> Expected: amount, product, user}|NO|
> Question:
> why the support for AS is deleted when createTemporaryView is used?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19746) Why delete support for "as" function

2020-10-20 Thread appleyuchi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

appleyuchi updated FLINK-19746:
---
Description: 
I know the solution for my following question,
So it's not  asking for support to debug code.


[completed code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]:


when I use the function *as* in my code,


||relevant code||log||
|tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|
|tEnv.createTemporaryView("Orders", 
orderA,$("user,product,amount"));|user,product,amount is not a field of type 
PojoType. 
Expected: amount, product, user}|




---

registerDataStream is deprecated,
so the best way is to use createTemporaryView.

Question:
why the support for AS is deleted when createTemporaryView is used?











  was:
*I know the solution for my following question,
So it's not  asking for support.*

completed code:
https://paste.ubuntu.com/p/Mw5tJSmDCs/

when I use the function *as* in my code,

I can only use.

//tEnv.createTemporaryView("Orders", orderA,$("user,product,amount"));
tEnv.registerDataStream("Orders", orderA,"user,product,amount");

I can Not use:

tEnv.createTemporaryView("Orders", orderA,$("user,product,amount"));
 //   tEnv.registerDataStream("Orders", orderA,"user,product,amount");

error log:
user,product,amount is not a field of type PojoType. Expected: amount, product, user}


---

registerDataStream is deprecated,
so the best way is to use createTemporaryView.

Question:
why the support for AS is deleted when createTemporaryView is used?












> Why delete support for "as" function
> 
>
> Key: FLINK-19746
> URL: https://issues.apache.org/jira/browse/FLINK-19746
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.11.0
>Reporter: appleyuchi
>Priority: Major
>
> I know the solution for my following question,
> So it's not  asking for support to debug code.
> [completed code|https://paste.ubuntu.com/p/Mw5tJSmDCs/]:
> when I use the function *as* in my code,
> ||relevant code||log||
> |tEnv.registerDataStream("Orders", orderA,"user,product,amount");|ok|
> |tEnv.createTemporaryView("Orders", 
> orderA,$("user,product,amount"));|user,product,amount is not a field of type 
> PojoType. 
> Expected: amount, product, user}|
> ---
> registerDataStream is deprecated,
> so the best way is to use createTemporaryView.
> Question:
> why the support for AS is deleted when createTemporaryView is used?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19746) Why delete support for "as" function

2020-10-20 Thread appleyuchi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

appleyuchi updated FLINK-19746:
---
Affects Version/s: 1.11.0
  Description: 
*I know the solution for my following question,
So it's not  asking for support.*

completed code:
https://paste.ubuntu.com/p/Mw5tJSmDCs/

when I use the function *as* in my code,

I can only use.

//tEnv.createTemporaryView("Orders", orderA,$("user,product,amount"));
tEnv.registerDataStream("Orders", orderA,"user,product,amount");

I can Not use:

tEnv.createTemporaryView("Orders", orderA,$("user,product,amount"));
 //   tEnv.registerDataStream("Orders", orderA,"user,product,amount");

error log:
user,product,amount is not a field of type PojoType. Expected: amount, product, user}


---

registerDataStream is deprecated,
so the best way is to use createTemporaryView.

Question:
why the support for AS is deleted when createTemporaryView is used?










  Summary: Why delete support for "as" function  (was: Why delete 
for)

> Why delete support for "as" function
> 
>
> Key: FLINK-19746
> URL: https://issues.apache.org/jira/browse/FLINK-19746
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.11.0
>Reporter: appleyuchi
>Priority: Major
>
> *I know the solution for my following question,
> So it's not  asking for support.*
> completed code:
> https://paste.ubuntu.com/p/Mw5tJSmDCs/
> when I use the function *as* in my code,
> I can only use.
> //tEnv.createTemporaryView("Orders", orderA,$("user,product,amount"));
> tEnv.registerDataStream("Orders", orderA,"user,product,amount");
> I can Not use:
> tEnv.createTemporaryView("Orders", orderA,$("user,product,amount"));
>  //   tEnv.registerDataStream("Orders", orderA,"user,product,amount");
> error log:
> user,product,amount is not a field of type PojoType Integer, product: String, user: Long]>. Expected: amount, product, user}
> ---
> registerDataStream is deprecated,
> so the best way is to use createTemporaryView.
> Question:
> why the support for AS is deleted when createTemporaryView is used?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19746) Why delete for

2020-10-20 Thread appleyuchi (Jira)
appleyuchi created FLINK-19746:
--

 Summary: Why delete for
 Key: FLINK-19746
 URL: https://issues.apache.org/jira/browse/FLINK-19746
 Project: Flink
  Issue Type: Bug
Reporter: appleyuchi






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wangyang0918 commented on a change in pull request #13644: [FLINK-19542][k8s]Implement LeaderElectionService and LeaderRetrievalService based on Kubernetes API

2020-10-20 Thread GitBox


wangyang0918 commented on a change in pull request #13644:
URL: https://github.com/apache/flink/pull/13644#discussion_r508989034



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java
##
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import 
org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.blob.BlobStoreService;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.highavailability.AbstractHaServices;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import 
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneRunningJobsRegistry;
+import org.apache.flink.runtime.jobmanager.JobGraphStore;
+import org.apache.flink.runtime.jobmanager.StandaloneJobGraphStore;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import java.util.concurrent.Executor;
+
+import static 
org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
+import static org.apache.flink.kubernetes.utils.Constants.NAME_SEPARATOR;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * High availability service for Kubernetes.
+ */
+public class KubernetesHaServices extends AbstractHaServices {
+
+   private final String clusterId;
+
+   /** Kubernetes client. */
+   private final FlinkKubeClient kubeClient;
+
+   private static final String RESOURCE_MANAGER_NAME = "resourcemanager";
+
+   private static final String DISPATCHER_NAME = "dispatcher";
+
+   private static final String JOB_MANAGER_NAME = "jobmanager";
+
+   private static final String REST_SERVER_NAME = "restserver";
+
+   private static final String LEADER_SUFFIX = "leader";
+
+   KubernetesHaServices(
+   FlinkKubeClient kubeClient,
+   Executor executor,
+   Configuration config,
+   BlobStoreService blobStoreService) {
+
+   super(executor, config, blobStoreService);
+   this.kubeClient = checkNotNull(kubeClient);
+   this.clusterId = 
checkNotNull(config.get(KubernetesConfigOptions.CLUSTER_ID));
+   }
+
+   @Override
+   public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
+   return new StandaloneCheckpointRecoveryFactory();
+   }
+
+   @Override
+   public JobGraphStore getJobGraphStore() {
+   return new StandaloneJobGraphStore();
+   }
+
+   @Override
+   public RunningJobsRegistry getRunningJobsRegistry() {
+   return new StandaloneRunningJobsRegistry();
+   }
+
+   @Override
+   public LeaderElectionService createLeaderElectionService(String 
leaderName) {
+   return new KubernetesLeaderElectionService(
+   kubeClient,
+   executor,
+   
KubernetesLeaderElectionConfiguration.fromConfiguration(leaderName, 
configuration));
+   }
+
+   @Override
+   public LeaderRetrievalService createLeaderRetrievalService(String 
leaderName) {
+   return new KubernetesLeaderRetrievalService(kubeClient, 
leaderName);
+   }
+
+   @Override
+   public void internalClose() throws Exception {
+   kubeClient.close();

Review comment:
   Yes. You are right. So I will keep two separate `flinkKubeClient` for 
`KubernetesHaService` and `KubernetesResourceDriver`.





This is an automated 

[GitHub] [flink] wangyang0918 commented on a change in pull request #13644: [FLINK-19542][k8s]Implement LeaderElectionService and LeaderRetrievalService based on Kubernetes API

2020-10-20 Thread GitBox


wangyang0918 commented on a change in pull request #13644:
URL: https://github.com/apache/flink/pull/13644#discussion_r508988440



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
##
@@ -104,6 +106,67 @@ KubernetesWatch watchPodsAndDoCallback(
Map labels,
WatchCallbackHandler podCallbackHandler);
 
+   /**
+* Create the ConfigMap with specified content. If the ConfigMap 
already exists, a FlinkRuntimeException will be
+* thrown.
+*
+* @param configMap ConfigMap.
+*
+* @return Return the ConfigMap create future.
+*/
+   CompletableFuture createConfigMap(KubernetesConfigMap configMap);
+
+   /**
+* Get the ConfigMap with specified name.
+*
+* @param name ConfigMap name.
+*
+* @return Return the ConfigMap, or empty if the ConfigMap does not 
exist.
+*/
+   Optional getConfigMap(String name);
+
+   /**
+* Update an existing ConfigMap with the data. Benefit from https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions>
+* resource version and combined with {@link 
#getConfigMap(String)}, we could perform a get-check-and-update
+* transactional operation. Since concurrent modification could happen 
on a same ConfigMap,
+* the update operation may fail. We need to retry internally. The max 
retry attempts could be
+* configured via {@link 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions#KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRIES}.
+*
+* @param configMapName ConfigMap to be replaced with.
+* @param function  Function to be applied to the obtained 
ConfigMap and get a new updated one. If the returned

Review comment:
   If we have an exception in the `updateFunction`, we convert it to the 
unchecked exception and retry again. Since a new retry attempt will get the 
latest ConfigMap, the `updateFunction` could then execute successfully.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13714: [FLINK-19718][hive] HiveTableSourceITCase.testStreamPartitionRead is not stable

2020-10-20 Thread GitBox


flinkbot commented on pull request #13714:
URL: https://github.com/apache/flink/pull/13714#issuecomment-713300898


   
   ## CI report:
   
   * da5593d4b8197d75c0a0aa5d92eb794bc0fe0ec6 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wangyang0918 commented on a change in pull request #13644: [FLINK-19542][k8s]Implement LeaderElectionService and LeaderRetrievalService based on Kubernetes API

2020-10-20 Thread GitBox


wangyang0918 commented on a change in pull request #13644:
URL: https://github.com/apache/flink/pull/13644#discussion_r508986718



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/KubeClientFactory.java
##
@@ -71,7 +73,12 @@ public static FlinkKubeClient 
fromConfiguration(Configuration flinkConfig) {
 
final KubernetesClient client = new 
DefaultKubernetesClient(config);
 
-   return new Fabric8FlinkKubeClient(flinkConfig, client, 
KubeClientFactory::createThreadPoolForAsyncIO);
+   if (flinkKubeClient == null) {
+   flinkKubeClient = new Fabric8FlinkKubeClient(
+   flinkConfig, client, 
KubeClientFactory::createThreadPoolForAsyncIO);
+   }

Review comment:
   Currently, in the JobManager process, we have two `flinkKubeClient`s. 
One is for `KubernetesResourceDriver`, and the other is for 
`KubernetesHaService`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wangyang0918 commented on a change in pull request #13644: [FLINK-19542][k8s]Implement LeaderElectionService and LeaderRetrievalService based on Kubernetes API

2020-10-20 Thread GitBox


wangyang0918 commented on a change in pull request #13644:
URL: https://github.com/apache/flink/pull/13644#discussion_r508986296



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionService.java
##
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import 
org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import 
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.leaderelection.AbstractLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.util.function.FunctionUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static 
org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
+import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY;
+import static 
org.apache.flink.kubernetes.utils.Constants.LEADER_SESSION_ID_KEY;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Leader election service for multiple JobManagers. The active JobManager is 
elected using Kubernetes.
+ * The current leader's address as well as its leader session ID is published 
via Kubernetes ConfigMap.
+ * Note that the contending lock and leader storage are using the same 
ConfigMap. And every component(e.g.
+ * ResourceManager, Dispatcher, RestEndpoint, JobManager for each job) will 
have a separate ConfigMap.
+ */
+public class KubernetesLeaderElectionService extends 
AbstractLeaderElectionService {
+
+   private final FlinkKubeClient kubeClient;
+
+   private final Executor executor;
+
+   private final String configMapName;
+
+   private final KubernetesLeaderElector leaderElector;
+
+   private KubernetesWatch kubernetesWatch;
+
+   // Labels will be used to clean up the ha related ConfigMaps.
+   private Map configMapLabels;
+
+   KubernetesLeaderElectionService(
+   FlinkKubeClient kubeClient,
+   Executor executor,
+   KubernetesLeaderElectionConfiguration leaderConfig) {
+
+   this.kubeClient = checkNotNull(kubeClient, "Kubernetes client 
should not be null.");
+   this.executor = checkNotNull(executor, "Executor should not be 
null.");
+   this.configMapName = leaderConfig.getConfigMapName();
+   this.leaderElector = 
kubeClient.createLeaderElector(leaderConfig, new LeaderCallbackHandlerImpl());
+   this.leaderContender = null;
+   this.configMapLabels = KubernetesUtils.getConfigMapLabels(
+   leaderConfig.getClusterId(), 
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY);
+   }
+
+   @Override
+   public void internalStart(LeaderContender contender) {
+   CompletableFuture.runAsync(leaderElector::run, executor);
+   kubernetesWatch = kubeClient.watchConfigMaps(configMapName, new 
ConfigMapCallbackHandlerImpl());
+   }
+
+   @Override
+   public void internalStop() {
+   if (kubernetesWatch != null) {
+   kubernetesWatch.close();
+   }
+   }
+
+   @Override
+   protected void writeLeaderInformation() {
+   try {
+   kubeClient.checkAndUpdateConfigMap(
+   configMapName,
+   configMap -> {
+   if 
(leaderElector.hasLeadership(configMap)) {
+   // Get the updated ConfigMap 
with new leader information
+   

[GitHub] [flink] wangyang0918 commented on a change in pull request #13644: [FLINK-19542][k8s]Implement LeaderElectionService and LeaderRetrievalService based on Kubernetes API

2020-10-20 Thread GitBox


wangyang0918 commented on a change in pull request #13644:
URL: https://github.com/apache/flink/pull/13644#discussion_r508985458



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java
##
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.UUID;
+
+/**
+ * Abstract class for leader election service based on distributed 
coordination system(e.g. Zookeeper, Kubernetes, etc.).
+ */
+public abstract class AbstractLeaderElectionService implements 
LeaderElectionService {
+
+   protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+   protected final Object lock = new Object();

Review comment:
   I will try to use the composition for the abstraction. Just like you 
said, it could help us to better control the `lock` and make the contract more 
clear.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #13714: [FLINK-19718][hive] HiveTableSourceITCase.testStreamPartitionRead is not stable

2020-10-20 Thread GitBox


flinkbot commented on pull request #13714:
URL: https://github.com/apache/flink/pull/13714#issuecomment-713290085


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit da5593d4b8197d75c0a0aa5d92eb794bc0fe0ec6 (Wed Oct 21 
04:18:08 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19706) Add WARN logs when hive table partition has existed before commit in `MetastoreCommitPolicy`

2020-10-20 Thread Lsw_aka_laplace (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lsw_aka_laplace updated FLINK-19706:

Summary: Add WARN logs when hive table partition has existed before commit 
in `MetastoreCommitPolicy` (was: Introduce `Repeated Partition Commit 
Check` in `org.apache.flink.table.filesystem.PartitionCommitPolicy` )

> Add WARN logs when hive table partition has existed before commit in 
> `MetastoreCommitPolicy`   
> ---
>
> Key: FLINK-19706
> URL: https://issues.apache.org/jira/browse/FLINK-19706
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem, Connectors / Hive, Table SQL / 
> Runtime
>Reporter: Lsw_aka_laplace
>Assignee: Lsw_aka_laplace
>Priority: Minor
> Attachments: image-2020-10-19-16-47-39-354.png, 
> image-2020-10-19-16-57-02-661.png, image-2020-10-19-17-00-27-255.png, 
> image-2020-10-19-17-03-21-558.png, image-2020-10-19-18-16-35-083.png
>
>
> dfHi all,
>       Recently we have been devoted to using Hive Streaming Writing to 
> accelerate our data-sync of Data Warehouse based on Hive, and eventually we 
> made it. 
>        For producing purpose, a lot of metrics/logs/measures were added in 
> order to help us analyze running info or fix some unexpected problems. Among 
> these mentioned above, we found that Checking Repeated Partition Commit is 
> the most important one. So here, we are willing to make a contribution of 
> introducing this backwards to Community.
>      If this proposal is meaning, I am happy to introduce my design and 
> implementation.
>  
> Looking forward to ANY opinion~
>  
>  
> UPDATE 
>  
> Our user(using our own platform to build his own Flink job)raised some 
> Requests. One of the requests is that once the parition is commited, the data 
> in this partitio is regarded as frozen or completed. [Commiting partition] 
> seem like a gurantee(but we all know it is hard to be a promise) in some way 
> which tells us this partition is completed. Certainly, we make a lot of 
> measures try to achieve that [partition-commit means completed]. So if a 
> partition is committed twice or more times, for us, there must be sth wrong 
> or our measures are insufficent.  On the other hand, it also inform us to do 
> sth to make up to avoid data-loss or data-incompletion.  
>  
> So first of all, it is important to let us or help us know that certain 
> partition is committed repeatedly. So that we can do the following things ASAP
>    1. analyze the reason or the cause 
>    2. do some trade-off operations
>    3. improve our code/measures
>  
> — Design and Implementation--- 
> There are basically two ways, both of them have been used in prod-env
> Approach1
> Add measures in CommitPolicy and be called before partition commit
> !image-2020-10-19-16-47-39-354.png|width=576,height=235!
> //{color:#ffab00}Newly posted, see here{color}
> !image-2020-10-19-18-16-35-083.png|width=725,height=313!
>  1.1 As the pic shows, add `checkPartitionExists` and implement it in 
> sub-class
>   !image-2020-10-19-17-03-21-558.png|width=1203,height=88!
>  1.2 call checkPartitionExists before partition commit
> --- 
> Approach2
> Build a bounded cache of committed partitions and check it everytime before 
> partition commit 
> (actually this cache supposed to be a operator state)
> !image-2020-10-19-16-57-02-661.png|width=1298,height=57!
>   2.1 build a cache
> !image-2020-10-19-17-00-27-255.png|width=1235,height=116!
>   2.2 check before commit 
>  
>  
> — UPDATE —
> After discussed with [~lzljs3620320], `Repeated partition check` seems  a 
> little misleading in semantics, so only some WARN logs will be added in 
> `MetastoreCommitPolicy` in aware of repeated commit 
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19718) HiveTableSourceITCase.testStreamPartitionRead is not stable on Azure

2020-10-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-19718:
---
Labels: pull-request-available  (was: )

> HiveTableSourceITCase.testStreamPartitionRead is not stable on Azure
> 
>
> Key: FLINK-19718
> URL: https://issues.apache.org/jira/browse/FLINK-19718
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Assignee: Jingsong Lee
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> Here are some instances:
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=7845=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=7875=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf
> {code}
> 2020-10-19T09:17:36.2004157Z [INFO] Results:
> 2020-10-19T09:17:36.2004505Z [INFO] 
> 2020-10-19T09:17:36.2007981Z [ERROR] Failures: 
> 2020-10-19T09:17:36.2010669Z [ERROR]   
> HiveTableSourceITCase.testStreamPartitionRead:537 
> expected:<[+I(0,0,2020-05-06 00:00:00), +I(1,1,2020-05-06 00:10:00), 
> +I(1,1_copy,2020-05-06 00:10:00), +I(2,2,2020-05-06 00:20:00), 
> +I(2,2_copy,2020-05-06 00:20:00), +I(3,3,2020-05-06 00:30:00), 
> +I(3,3_copy,2020-05-06 00:30:00), +I(4,4,2020-05-06 00:40:00), 
> +I(4,4_copy,2020-05-06 00:40:00), +I(5,5,2020-05-06 00:50:00), 
> +I(5,5_copy,2020-05-06 00:50:00)]> but was:<[]>
> 2020-10-19T09:17:36.2011985Z [INFO] 
> 2020-10-19T09:17:36.2012582Z [ERROR] Tests run: 80, Failures: 1, Errors: 0, 
> Skipped: 3
> 2020-10-19T09:17:36.2012976Z [INFO] 
> 2020-10-19T09:17:36.2137222Z [INFO] 
> 
> 2020-10-19T09:17:36.2140971Z [INFO] Reactor Summary:
> 2020-10-19T09:17:36.2141558Z [INFO] 
> 2020-10-19T09:17:36.2141987Z [INFO] Flink : Tools : Force Shading 
> .. SUCCESS [  1.346 s]
> 2020-10-19T09:17:36.2142534Z [INFO] Flink : Test utils : 
> ... SUCCESS [  1.845 s]
> 2020-10-19T09:17:36.2143098Z [INFO] Flink : Test utils : Junit 
> . SUCCESS [  3.265 s]
> 2020-10-19T09:17:36.2190677Z [INFO] Flink : Queryable state : 
> .. SUCCESS [  0.077 s]
> 2020-10-19T09:17:36.2191261Z [INFO] Flink : FileSystems : Azure FS Hadoop 
> .. SUCCESS [ 12.600 s]
> 2020-10-19T09:17:36.2191821Z [INFO] Flink : Examples : 
> . SUCCESS [  0.249 s]
> 2020-10-19T09:17:36.2192380Z [INFO] Flink : Examples : Batch 
> ... SUCCESS [  1.919 s]
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] JingsongLi opened a new pull request #13714: [FLINK-19718][hive] HiveTableSourceITCase.testStreamPartitionRead is not stable

2020-10-20 Thread GitBox


JingsongLi opened a new pull request #13714:
URL: https://github.com/apache/flink/pull/13714


   
   
   ## What is the purpose of the change
   
   Here are some instances:
   
   
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=7845=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf
   
   
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=7875=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf
   
   
   ## Brief change log
   
   Use `TableResult.collect` to assert.
   Previous `TestingAppendSink` has thread safe problem.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: (no
 - The runtime per-record code paths (performance sensitive): (no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no
 - The S3 file system connector: (no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19745) Supplement micro-benchmark for bounded blocking partition in remote channel case

2020-10-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-19745:
---
Labels: pull-request-available  (was: )

> Supplement micro-benchmark for bounded blocking partition in remote channel 
> case
> 
>
> Key: FLINK-19745
> URL: https://issues.apache.org/jira/browse/FLINK-19745
> Project: Flink
>  Issue Type: Task
>  Components: Benchmarks, Runtime / Network
>Reporter: Zhijiang
>Assignee: Zhijiang
>Priority: Major
>  Labels: pull-request-available
>
> The current benchmark `BlockingPartitionBenchmark` for batch job only 
> measures the scenario of producer & consumer deployment in the same 
> processor, that corresponds to the local input channel on consumer side. 
> We want to supplement another common scenario to measure the effect of 
> reading data via network shuffle, which corresponds to the remote input 
> channel on consumer side.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink-benchmarks] zhijiangW opened a new pull request #4: [FLINK-19745][network] Supplement micro-benchmark for bounded blocking partition in remote channel case

2020-10-20 Thread GitBox


zhijiangW opened a new pull request #4:
URL: https://github.com/apache/flink-benchmarks/pull/4


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19706) Introduce `Repeated Partition Commit Check` in `org.apache.flink.table.filesystem.PartitionCommitPolicy`

2020-10-20 Thread Lsw_aka_laplace (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lsw_aka_laplace updated FLINK-19706:

Description: 
dfHi all,

      Recently we have been devoted to using Hive Streaming Writing to 
accelerate our data-sync of Data Warehouse based on Hive, and eventually we 
made it. 

       For producing purpose, a lot of metrics/logs/measures were added in 
order to help us analyze running info or fix some unexpected problems. Among 
these mentioned above, we found that Checking Repeated Partition Commit is the 
most important one. So here, we are willing to make a contribution of 
introducing this backwards to Community.

     If this proposal is meaning, I am happy to introduce my design and 
implementation.

 

Looking forward to ANY opinion~

 

 

UPDATE 

 

Our user(using our own platform to build his own Flink job)raised some 
Requests. One of the requests is that once the parition is commited, the data 
in this partitio is regarded as frozen or completed. [Commiting partition] seem 
like a gurantee(but we all know it is hard to be a promise) in some way which 
tells us this partition is completed. Certainly, we make a lot of measures try 
to achieve that [partition-commit means completed]. So if a partition is 
committed twice or more times, for us, there must be sth wrong or our measures 
are insufficent.  On the other hand, it also inform us to do sth to make up to 
avoid data-loss or data-incompletion.  

 

So first of all, it is important to let us or help us know that certain 
partition is committed repeatedly. So that we can do the following things ASAP

   1. analyze the reason or the cause 

   2. do some trade-off operations

   3. improve our code/measures

 

— Design and Implementation--- 

There are basically two ways, both of them have been used in prod-env

Approach1

Add measures in CommitPolicy and be called before partition commit

!image-2020-10-19-16-47-39-354.png|width=576,height=235!

//{color:#ffab00}Newly posted, see here{color}

!image-2020-10-19-18-16-35-083.png|width=725,height=313!

 1.1 As the pic shows, add `checkPartitionExists` and implement it in sub-class

  !image-2020-10-19-17-03-21-558.png|width=1203,height=88!

 1.2 call checkPartitionExists before partition commit

--- 

Approach2

Build a bounded cache of committed partitions and check it everytime before 
partition commit 

(actually this cache supposed to be a operator state)

!image-2020-10-19-16-57-02-661.png|width=1298,height=57!

  2.1 build a cache

!image-2020-10-19-17-00-27-255.png|width=1235,height=116!

  2.2 check before commit 

 

 

— UPDATE —

After discussed with [~lzljs3620320], `Repeated partition check` seems  a 
little misleading in semantics, so only some WARN logs will be added in 
`MetastoreCommitPolicy` in aware of repeated commit 

 

 

  was:
Hi all,

      Recently we have been devoted to using Hive Streaming Writing to 
accelerate our data-sync of Data Warehouse based on Hive, and eventually we 
made it. 

       For producing purpose, a lot of metrics/logs/measures were added in 
order to help us analyze running info or fix some unexpected problems. Among 
these mentioned above, we found that Checking Repeated Partition Commit is the 
most important one. So here, we are willing to make a contribution of 
introducing this backwards to Community.

     If this proposal is meaning, I am happy to introduce my design and 
implementation.

 

Looking forward to ANY opinion~

 

 

UPDATE 

 

Our user(using our own platform to build his own Flink job)raised some 
Requests. One of the requests is that once the parition is commited, the data 
in this partitio is regarded as frozen or completed. [Commiting partition] seem 
like a gurantee(but we all know it is hard to be a promise) in some way which 
tells us this partition is completed. Certainly, we make a lot of measures try 
to achieve that [partition-commit means completed]. So if a partition is 
committed twice or more times, for us, there must be sth wrong or our measures 
are insufficent.  On the other hand, it also inform us to do sth to make up to 
avoid data-loss or data-incompletion.  

 

So first of all, it is important to let us or help us know that certain 
partition is committed repeatedly. So that we can do the following things ASAP

   1. analyze the reason or the cause 

   2. do some trade-off operations

   3. improve our code/measures

 

— Design and Implementation--- 

There are basically two ways, both of them have been used in prod-env

Approach1

Add measures in CommitPolicy and be called before partition commit

!image-2020-10-19-16-47-39-354.png|width=576,height=235!

//{color:#ffab00}Newly posted, see here{color}

!image-2020-10-19-18-16-35-083.png|width=725,height=313!

 1.1 As the pic shows, add `checkPartitionExists` and implement it in sub-class

  

[jira] [Commented] (FLINK-19706) Introduce `Repeated Partition Commit Check` in `org.apache.flink.table.filesystem.PartitionCommitPolicy`

2020-10-20 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218094#comment-17218094
 ] 

Jingsong Lee commented on FLINK-19706:
--

If we do strict check, I recommend late event handle.

> Introduce `Repeated Partition Commit Check` in 
> `org.apache.flink.table.filesystem.PartitionCommitPolicy` 
> -
>
> Key: FLINK-19706
> URL: https://issues.apache.org/jira/browse/FLINK-19706
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem, Connectors / Hive, Table SQL / 
> Runtime
>Reporter: Lsw_aka_laplace
>Priority: Minor
> Attachments: image-2020-10-19-16-47-39-354.png, 
> image-2020-10-19-16-57-02-661.png, image-2020-10-19-17-00-27-255.png, 
> image-2020-10-19-17-03-21-558.png, image-2020-10-19-18-16-35-083.png
>
>
> Hi all,
>       Recently we have been devoted to using Hive Streaming Writing to 
> accelerate our data-sync of Data Warehouse based on Hive, and eventually we 
> made it. 
>        For producing purpose, a lot of metrics/logs/measures were added in 
> order to help us analyze running info or fix some unexpected problems. Among 
> these mentioned above, we found that Checking Repeated Partition Commit is 
> the most important one. So here, we are willing to make a contribution of 
> introducing this backwards to Community.
>      If this proposal is meaning, I am happy to introduce my design and 
> implementation.
>  
> Looking forward to ANY opinion~
>  
>  
> UPDATE 
>  
> Our user(using our own platform to build his own Flink job)raised some 
> Requests. One of the requests is that once the parition is commited, the data 
> in this partitio is regarded as frozen or completed. [Commiting partition] 
> seem like a gurantee(but we all know it is hard to be a promise) in some way 
> which tells us this partition is completed. Certainly, we make a lot of 
> measures try to achieve that [partition-commit means completed]. So if a 
> partition is committed twice or more times, for us, there must be sth wrong 
> or our measures are insufficent.  On the other hand, it also inform us to do 
> sth to make up to avoid data-loss or data-incompletion.  
>  
> So first of all, it is important to let us or help us know that certain 
> partition is committed repeatedly. So that we can do the following things ASAP
>    1. analyze the reason or the cause 
>    2. do some trade-off operations
>    3. improve our code/measures
>  
> — Design and Implementation--- 
> There are basically two ways, both of them have been used in prod-env
> Approach1
> Add measures in CommitPolicy and be called before partition commit
> !image-2020-10-19-16-47-39-354.png|width=576,height=235!
> //{color:#ffab00}Newly posted, see here{color}
> !image-2020-10-19-18-16-35-083.png|width=725,height=313!
>  1.1 As the pic shows, add `checkPartitionExists` and implement it in 
> sub-class
>   !image-2020-10-19-17-03-21-558.png|width=1203,height=88!
>  1.2 call checkPartitionExists before partition commit
> --- 
> Approach2
> Build a bounded cache of committed partitions and check it everytime before 
> partition commit 
> (actually this cache supposed to be a operator state)
> !image-2020-10-19-16-57-02-661.png|width=1298,height=57!
>   2.1 build a cache
> !image-2020-10-19-17-00-27-255.png|width=1235,height=116!
>   2.2 check before commit 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19706) Introduce `Repeated Partition Commit Check` in `org.apache.flink.table.filesystem.PartitionCommitPolicy`

2020-10-20 Thread Lsw_aka_laplace (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lsw_aka_laplace updated FLINK-19706:

Description: 
Hi all,

      Recently we have been devoted to using Hive Streaming Writing to 
accelerate our data-sync of Data Warehouse based on Hive, and eventually we 
made it. 

       For producing purpose, a lot of metrics/logs/measures were added in 
order to help us analyze running info or fix some unexpected problems. Among 
these mentioned above, we found that Checking Repeated Partition Commit is the 
most important one. So here, we are willing to make a contribution of 
introducing this backwards to Community.

     If this proposal is meaning, I am happy to introduce my design and 
implementation.

 

Looking forward to ANY opinion~

 

 

UPDATE 

 

Our user(using our own platform to build his own Flink job)raised some 
Requests. One of the requests is that once the parition is commited, the data 
in this partitio is regarded as frozen or completed. [Commiting partition] seem 
like a gurantee(but we all know it is hard to be a promise) in some way which 
tells us this partition is completed. Certainly, we make a lot of measures try 
to achieve that [partition-commit means completed]. So if a partition is 
committed twice or more times, for us, there must be sth wrong or our measures 
are insufficent.  On the other hand, it also inform us to do sth to make up to 
avoid data-loss or data-incompletion.  

 

So first of all, it is important to let us or help us know that certain 
partition is committed repeatedly. So that we can do the following things ASAP

   1. analyze the reason or the cause 

   2. do some trade-off operations

   3. improve our code/measures

 

— Design and Implementation--- 

There are basically two ways, both of them have been used in prod-env

Approach1

Add measures in CommitPolicy and be called before partition commit

!image-2020-10-19-16-47-39-354.png|width=576,height=235!

//{color:#ffab00}Newly posted, see here{color}

!image-2020-10-19-18-16-35-083.png|width=725,height=313!

 1.1 As the pic shows, add `checkPartitionExists` and implement it in sub-class

  !image-2020-10-19-17-03-21-558.png|width=1203,height=88!

 1.2 call checkPartitionExists before partition commit

--- 

Approach2

Build a bounded cache of committed partitions and check it everytime before 
partition commit 

(actually this cache supposed to be a operator state)

!image-2020-10-19-16-57-02-661.png|width=1298,height=57!

  2.1 build a cache

!image-2020-10-19-17-00-27-255.png|width=1235,height=116!

  2.2 check before commit 

 

 

--- UPDATE ---

After discussed with [~lzljs3620320], `Repeated partition check` seems  a 
little hack on

 

 

  was:
Hi all,

      Recently we have been devoted to using Hive Streaming Writing to 
accelerate our data-sync of Data Warehouse based on Hive, and eventually we 
made it. 

       For producing purpose, a lot of metrics/logs/measures were added in 
order to help us analyze running info or fix some unexpected problems. Among 
these mentioned above, we found that Checking Repeated Partition Commit is the 
most important one. So here, we are willing to make a contribution of 
introducing this backwards to Community.

     If this proposal is meaning, I am happy to introduce my design and 
implementation.

 

Looking forward to ANY opinion~

 

 

UPDATE 

 

Our user(using our own platform to build his own Flink job)raised some 
Requests. One of the requests is that once the parition is commited, the data 
in this partitio is regarded as frozen or completed. [Commiting partition] seem 
like a gurantee(but we all know it is hard to be a promise) in some way which 
tells us this partition is completed. Certainly, we make a lot of measures try 
to achieve that [partition-commit means completed]. So if a partition is 
committed twice or more times, for us, there must be sth wrong or our measures 
are insufficent.  On the other hand, it also inform us to do sth to make up to 
avoid data-loss or data-incompletion.  

 

So first of all, it is important to let us or help us know that certain 
partition is committed repeatedly. So that we can do the following things ASAP

   1. analyze the reason or the cause 

   2. do some trade-off operations

   3. improve our code/measures

 

— Design and Implementation--- 

There are basically two ways, both of them have been used in prod-env

Approach1

Add measures in CommitPolicy and be called before partition commit

!image-2020-10-19-16-47-39-354.png|width=576,height=235!

//{color:#ffab00}Newly posted, see here{color}

!image-2020-10-19-18-16-35-083.png|width=725,height=313!

 1.1 As the pic shows, add `checkPartitionExists` and implement it in sub-class

  !image-2020-10-19-17-03-21-558.png|width=1203,height=88!

 1.2 call checkPartitionExists before partition commit

--- 

Approach2

Build a 

[jira] [Assigned] (FLINK-19706) Introduce `Repeated Partition Commit Check` in `org.apache.flink.table.filesystem.PartitionCommitPolicy`

2020-10-20 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-19706:


Assignee: Lsw_aka_laplace

> Introduce `Repeated Partition Commit Check` in 
> `org.apache.flink.table.filesystem.PartitionCommitPolicy` 
> -
>
> Key: FLINK-19706
> URL: https://issues.apache.org/jira/browse/FLINK-19706
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem, Connectors / Hive, Table SQL / 
> Runtime
>Reporter: Lsw_aka_laplace
>Assignee: Lsw_aka_laplace
>Priority: Minor
> Attachments: image-2020-10-19-16-47-39-354.png, 
> image-2020-10-19-16-57-02-661.png, image-2020-10-19-17-00-27-255.png, 
> image-2020-10-19-17-03-21-558.png, image-2020-10-19-18-16-35-083.png
>
>
> Hi all,
>       Recently we have been devoted to using Hive Streaming Writing to 
> accelerate our data-sync of Data Warehouse based on Hive, and eventually we 
> made it. 
>        For producing purpose, a lot of metrics/logs/measures were added in 
> order to help us analyze running info or fix some unexpected problems. Among 
> these mentioned above, we found that Checking Repeated Partition Commit is 
> the most important one. So here, we are willing to make a contribution of 
> introducing this backwards to Community.
>      If this proposal is meaning, I am happy to introduce my design and 
> implementation.
>  
> Looking forward to ANY opinion~
>  
>  
> UPDATE 
>  
> Our user(using our own platform to build his own Flink job)raised some 
> Requests. One of the requests is that once the parition is commited, the data 
> in this partitio is regarded as frozen or completed. [Commiting partition] 
> seem like a gurantee(but we all know it is hard to be a promise) in some way 
> which tells us this partition is completed. Certainly, we make a lot of 
> measures try to achieve that [partition-commit means completed]. So if a 
> partition is committed twice or more times, for us, there must be sth wrong 
> or our measures are insufficent.  On the other hand, it also inform us to do 
> sth to make up to avoid data-loss or data-incompletion.  
>  
> So first of all, it is important to let us or help us know that certain 
> partition is committed repeatedly. So that we can do the following things ASAP
>    1. analyze the reason or the cause 
>    2. do some trade-off operations
>    3. improve our code/measures
>  
> — Design and Implementation--- 
> There are basically two ways, both of them have been used in prod-env
> Approach1
> Add measures in CommitPolicy and be called before partition commit
> !image-2020-10-19-16-47-39-354.png|width=576,height=235!
> //{color:#ffab00}Newly posted, see here{color}
> !image-2020-10-19-18-16-35-083.png|width=725,height=313!
>  1.1 As the pic shows, add `checkPartitionExists` and implement it in 
> sub-class
>   !image-2020-10-19-17-03-21-558.png|width=1203,height=88!
>  1.2 call checkPartitionExists before partition commit
> --- 
> Approach2
> Build a bounded cache of committed partitions and check it everytime before 
> partition commit 
> (actually this cache supposed to be a operator state)
> !image-2020-10-19-16-57-02-661.png|width=1298,height=57!
>   2.1 build a cache
> !image-2020-10-19-17-00-27-255.png|width=1235,height=116!
>   2.2 check before commit 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19745) Supplement micro-benchmark for bounded blocking partition in remote channel case

2020-10-20 Thread Zhijiang (Jira)
Zhijiang created FLINK-19745:


 Summary: Supplement micro-benchmark for bounded blocking partition 
in remote channel case
 Key: FLINK-19745
 URL: https://issues.apache.org/jira/browse/FLINK-19745
 Project: Flink
  Issue Type: Task
  Components: Benchmarks, Runtime / Network
Reporter: Zhijiang
Assignee: Zhijiang


The current benchmark `BlockingPartitionBenchmark` for batch job only measures 
the scenario of producer & consumer deployment in the same processor, that 
corresponds to the local input channel on consumer side. 

We want to supplement another common scenario to measure the effect of reading 
data via network shuffle, which corresponds to the remote input channel on 
consumer side.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] leonardBang commented on a change in pull request #13708: [FLINK-19734][table-planner-blink] Replace 'collection' connector by 'values' connector for temporal join plan tests

2020-10-20 Thread GitBox


leonardBang commented on a change in pull request #13708:
URL: https://github.com/apache/flink/pull/13708#discussion_r508971859



##
File path: 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.xml
##
@@ -43,13 +43,13 @@ Calc(select=[amount, currency, rowtime, 
PROCTIME_MATERIALIZE(proctime) AS procti
:- Exchange(distribution=[hash[currency]])
:  +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
: +- Calc(select=[amount, currency, rowtime, PROCTIME() AS proctime])
-   :+- LegacyTableSourceScan(table=[[default_catalog, 
default_database, Orders, source: [CollectionTableSource(amount, currency, 
rowtime)]]], fields=[amount, currency, rowtime])
+   :+- TableSourceScan(table=[[default_catalog, default_database, 
Orders]], fields=[amount, currency, rowtime])
+- Exchange(distribution=[hash[currency]])
   +- Calc(select=[currency, rate, rowtime], where=[<(rate, 100)])
  +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=1], partitionBy=[currency], orderBy=[rowtime 
DESC], select=[currency, rate, rowtime])
 +- Exchange(distribution=[hash[currency]])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
-  +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, RatesHistory, source: [CollectionTableSource(currency, rate, 
rowtime)]]], fields=[currency, rate, rowtime])
+  +- TableSourceScan(table=[[default_catalog, 
default_database, RatesHistory]], fields=[currency, rate, rowtime])

Review comment:
   Could we add one `Collection` test to ensure the temporal join plan 
works fine with Legacy connector?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-19740) Error in to_pandas for table containing event time: class java.time.LocalDateTime cannot be cast to class java.sql.Timestamp

2020-10-20 Thread Nicholas Jiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218090#comment-17218090
 ] 

Nicholas Jiang commented on FLINK-19740:


cc [~dian.fu]

> Error in to_pandas for table containing event time: class 
> java.time.LocalDateTime cannot be cast to class java.sql.Timestamp
> 
>
> Key: FLINK-19740
> URL: https://issues.apache.org/jira/browse/FLINK-19740
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Table SQL / API
>Affects Versions: 1.12.0, 1.11.2
> Environment: Ubuntu 18.04
> Python 3.8, jar built from master yesterday.
> Or Python 3.7, installed latest version from pip.
>Reporter: Alex Hall
>Priority: Major
>
> In a nutshell, if I create a table with an event time column:
> {{CREATE TABLE simple_table (}}
>  {{   ts TIMESTAMP(3),}}
>  {{   WATERMARK FOR ts AS ts - INTERVAL '5' SECOND}}
>  {{)}}
> then it fails to serialize with .to_pandas(). This only happens with the 
> watermark line and in streaming mode.
> Full code:
> from pyflink.table import EnvironmentSettings, StreamTableEnvironment
> env_settings = (
>  
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> )
> table_env = StreamTableEnvironment.create(environment_settings=env_settings)
> table_env.execute_sql(
>  """
>  CREATE TABLE simple_table (
>  ts TIMESTAMP(3),
>  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
>  ) WITH (
>  'connector.type' = 'filesystem',
>  'format.type' = 'csv',
>  'connector.path' = 
> '/home/alex/.config/JetBrains/PyCharm2020.2/scratches/scratch_2.csv'
>  )
> """
> )
> print(table_env.from_path("simple_table").to_pandas())
> Output:
>   
>  WARNING: An illegal reflective access operation has occurred
>  WARNING: Illegal reflective access by 
> org.apache.flink.api.python.shaded.io.netty.util.internal.ReflectionUtil 
> ([file:/home/alex/work/flink/flink-dist/target/flink-1.12-SNAPSHOT-bin/flink-1.12-SNAPSHOT/opt/flink-python_2.11-1.12-SNAPSHOT.jar|file:///home/alex/work/flink/flink-dist/target/flink-1.12-SNAPSHOT-bin/flink-1.12-SNAPSHOT/opt/flink-python_2.11-1.12-SNAPSHOT.jar])
>  to constructor java.nio.DirectByteBuffer(long,int)
>  WARNING: Please consider reporting this to the maintainers of 
> org.apache.flink.api.python.shaded.io.netty.util.internal.ReflectionUtil
>  WARNING: Use --illegal-access=warn to enable warnings of further illegal 
> reflective access operations
>  WARNING: All illegal access operations will be denied in a future release
>  Traceback (most recent call last):
>  File "/home/alex/.config/JetBrains/PyCharm2020.2/scratches/scratch_903.py", 
> line 20, in 
>  print(table_env.from_path("simple_table").to_pandas())
>  File "/home/alex/work/flink/flink-python/pyflink/table/table.py", line 839, 
> in to_pandas
>  table = pa.Table.from_batches(serializer.load_from_iterator(batches))
>  File "pyarrow/table.pxi", line 1576, in pyarrow.lib.Table.from_batches
>  File "/home/alex/work/flink/flink-python/pyflink/table/serializers.py", line 
> 76, in load_from_iterator
>  reader = pa.ipc.open_stream(
>  File 
> "/home/alex/.cache/pypoetry/virtualenvs/relycomply-WA1zLZ2n-py3.8/lib/python3.8/site-packages/pyarrow/ipc.py",
>  line 146, in open_stream
>  return RecordBatchStreamReader(source)
>  File 
> "/home/alex/.cache/pypoetry/virtualenvs/relycomply-WA1zLZ2n-py3.8/lib/python3.8/site-packages/pyarrow/ipc.py",
>  line 62, in __init__
>  self._open(source)
>  File "pyarrow/ipc.pxi", line 360, in 
> pyarrow.lib._RecordBatchStreamReader._open
>  File "pyarrow/error.pxi", line 123, in 
> pyarrow.lib.pyarrow_internal_check_status
>  File "/home/alex/work/flink/flink-python/pyflink/table/serializers.py", line 
> 69, in readinto
>  input = self.leftover or (self.itor.next() if self.itor.hasNext() else None)
>  File 
> "/home/alex/.cache/pypoetry/virtualenvs/relycomply-WA1zLZ2n-py3.8/lib/python3.8/site-packages/py4j/java_gateway.py",
>  line 1285, in __call__
>  return_value = get_return_value(
>  File "/home/alex/work/flink/flink-python/pyflink/util/exceptions.py", line 
> 147, in deco
>  return f(*a, **kw)
>  File 
> "/home/alex/.cache/pypoetry/virtualenvs/relycomply-WA1zLZ2n-py3.8/lib/python3.8/site-packages/py4j/protocol.py",
>  line 326, in get_return_value
>  raise Py4JJavaError(
>  py4j.protocol.Py4JJavaError: An error occurred while calling o39.next.
>  : java.lang.RuntimeException: Failed to serialize the data of the table
>  at 
> org.apache.flink.table.runtime.arrow.ArrowUtils$2.next(ArrowUtils.java:683)
>  at 
> org.apache.flink.table.runtime.arrow.ArrowUtils$2.next(ArrowUtils.java:663)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>  at 
> 

[jira] [Created] (FLINK-19744) Support Nebula Graph Connector

2020-10-20 Thread darion yaphet (Jira)
darion yaphet created FLINK-19744:
-

 Summary: Support Nebula Graph Connector
 Key: FLINK-19744
 URL: https://issues.apache.org/jira/browse/FLINK-19744
 Project: Flink
  Issue Type: New Feature
Reporter: darion yaphet


A lot of Nebula Graph users are using Flink as their computation engine. The 
problem is, they need to develop a program on their own to write data from 
Flink to Nebula Graph, which is painful. So we want to provide a smoother 
experience connecting Flink and Nebula Graph.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #13713: [FLINK-19733][python] Make fast_operation and slow_operation produce functions consistent

2020-10-20 Thread GitBox


flinkbot edited a comment on pull request #13713:
URL: https://github.com/apache/flink/pull/13713#issuecomment-713269298


   
   ## CI report:
   
   * 75ac685135eac8dde548c3d04d96502964d68cf2 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7977)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13457: [FLINK-8357] Use rolling logs as default

2020-10-20 Thread GitBox


flinkbot edited a comment on pull request #13457:
URL: https://github.com/apache/flink/pull/13457#issuecomment-697012704


   
   ## CI report:
   
   * 14db2d58abff9a9b1ab6c997af66180c6301373c Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7970)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] danny0405 commented on a change in pull request #13331: [FLINK-19079][table-runtime] Import rowtime deduplicate operator

2020-10-20 Thread GitBox


danny0405 commented on a change in pull request #13331:
URL: https://github.com/apache/flink/pull/13331#discussion_r508472527



##
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDeduplicate.scala
##
@@ -102,44 +110,109 @@ class StreamExecDeduplicate(
   .asInstanceOf[Transformation[RowData]]
 
 val rowTypeInfo = 
inputTransform.getOutputType.asInstanceOf[InternalTypeInfo[RowData]]
+val inputFieldTypes = rowTypeInfo.toRowFieldTypes
+val keyFieldTypes = new Array[LogicalType](uniqueKeys.length)
+for (i <- 0 until uniqueKeys.length) {
+  keyFieldTypes(i) = inputFieldTypes(uniqueKeys(i))
+}
+

Review comment:
   You can use `uniqueKeys .map(idx => inputFieldTypes(idx))` directly.

##
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDeduplicate.scala
##
@@ -102,44 +110,109 @@ class StreamExecDeduplicate(
   .asInstanceOf[Transformation[RowData]]
 
 val rowTypeInfo = 
inputTransform.getOutputType.asInstanceOf[InternalTypeInfo[RowData]]
+val inputFieldTypes = rowTypeInfo.toRowFieldTypes
+val keyFieldTypes = new Array[LogicalType](uniqueKeys.length)
+for (i <- 0 until uniqueKeys.length) {
+  keyFieldTypes(i) = inputFieldTypes(uniqueKeys(i))
+}
+
 val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this)
 val tableConfig = planner.getTableConfig
 val generateInsert = tableConfig.getConfiguration
   .getBoolean(TABLE_EXEC_INSERT_AND_UPDATE_AFTER_SENSITIVE)
 val isMiniBatchEnabled = tableConfig.getConfiguration.getBoolean(
   ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED)
 val minRetentionTime = tableConfig.getMinIdleStateRetentionTime
-val operator = if (isMiniBatchEnabled) {
-  val exeConfig = planner.getExecEnv.getConfig
-  val rowSerializer = rowTypeInfo.createSerializer(exeConfig)
+
+val rowtimeField = input.getRowType.getFieldList
+  .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+val rowtimeIndex = if (isRowtime) {
+  Preconditions.checkArgument(rowtimeField.nonEmpty)
+  rowtimeField.get(0).getIndex
+} else {
+  -1
+}
+
+val miniBatchsize = if (isMiniBatchEnabled) {
+  val size = tableConfig.getConfiguration.getLong(
+ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE)
+  Preconditions.checkArgument(size > 0)
+  size
+} else {
+  -1L
+}
+val exeConfig = planner.getExecEnv.getConfig
+val rowSerializer = rowTypeInfo.createSerializer(exeConfig)
+
+val operator = if (isRowtime) {
+  if(isMiniBatchEnabled) {
+val processFunction = if (keepLastRow) {
+  new RowTimeMiniBatchDeduplicateKeepLastRowFunction(
+rowTypeInfo,

Review comment:
   Can we have 2 sub-class here ? One is for `proctime` another is for 
`rowtime`, in each class, we can have a method to return the row function 
`getRowFunctction(isMiniBatchEnabled, keepLastRow)`.
   
   There are too many if else branches here and it is hard to maintain.

##
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
##
@@ -93,6 +95,239 @@ static void processFirstRow(
out.collect(currentRow);
}
 
+   /**
+* Processes element to deduplicate on keys with row time semantic, 
sends current element if it is last row,
+* retracts previous element if needed.
+*
+* @param state state of function
+* @param currentRow latest row received by deduplicate function
+* @param serializer serializer to serialize the data
+* @param out underlying collector
+* @param rowtimeIndex index of row time field
+* @param generateUpdateBefore flag to generate UPDATE_BEFORE message 
or not
+* @param generateInsert flag to gennerate INSERT message or not
+*/
+   static void processLastRowOnRowtime(
+   ValueState state,
+   RowData currentRow,
+   TypeSerializer serializer,
+   Collector out,
+   int rowtimeIndex,
+   boolean generateUpdateBefore,
+   boolean generateInsert) throws Exception {
+
+   checkInsertOnly(currentRow);
+   RowData prevRow = state.value();
+   if (!isLastRow(prevRow, currentRow, rowtimeIndex)) {
+   return;
+   }
+   state.update(currentRow);
+
+   // store all needed data to state
+   if (generateUpdateBefore || generateInsert) {
+   if (prevRow == null) {
+   // the first row, send INSERT message
+   

[jira] [Commented] (FLINK-19692) Can't restore feedback channel from savepoint

2020-10-20 Thread Antti Kaikkonen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218077#comment-17218077
 ] 

Antti Kaikkonen commented on FLINK-19692:
-

Thanks for investigating and identifying the issue so quickly. I will keep an 
eye on these issues and update as soon as there is a solution available. I'm 
also interested in any workarounds to get it working as soon as possible.

> Can't restore feedback channel from savepoint
> -
>
> Key: FLINK-19692
> URL: https://issues.apache.org/jira/browse/FLINK-19692
> Project: Flink
>  Issue Type: Bug
>  Components: Stateful Functions
>Affects Versions: statefun-2.0.0, statefun-2.1.0, statefun-2.2.0
>Reporter: Antti Kaikkonen
>Priority: Blocker
> Fix For: statefun-2.3.0, statefun-2.2.1
>
>
> When using the new statefun-flink-datastream integration the following error 
> is thrown by the *feedback -> union* task when trying to restore from a 
> savepoint:
> {code:java}
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: java.io.IOException: position out of bounds
> at 
> org.apache.flink.runtime.state.StatePartitionStreamProvider.getStream(StatePartitionStreamProvider.java:58)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:235)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167)
> ... 9 more
> Caused by: java.io.IOException: position out of bounds
> at 
> org.apache.flink.runtime.state.memory.ByteStreamStateHandle$ByteStateHandleInputStream.seek(ByteStreamStateHandle.java:124)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:442)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:395)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:228)
> ... 10 more
> {code}
>  The error is only thrown when the feedback channel has been used. 
> I have tested with the [example 
> application|https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java]
>  and the error is thrown only if it is modified to actually use the feedback 
> channel. I simply modified the invoke method to sometimes forward the 
> greeting to a random name: 
> {code:java}
> @Override
> public void invoke(Context context, Object input) {
>   int seen = seenCount.updateAndGet(MyFunction::increment);
>   context.send(GREETINGS, String.format("Hello %s at the %d-th time", input, 
> seen));
>   String[] names = {"Stephan", "Igal", "Gordon", "Seth", "Marta"};
>   ThreadLocalRandom random = ThreadLocalRandom.current();
>   int index = random.nextInt(names.length);
>   final String name2 = names[index];
>   if (random.nextDouble() < 0.5) context.send(new Address(GREET, name2), 
> input);
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18570) SQLClientHBaseITCase.testHBase fails on azure

2020-10-20 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218074#comment-17218074
 ] 

Leonard Xu commented on FLINK-18570:


The same reason with my previous investigation, unfortunately I didn't 
reproduce this on my many azure builds(I added some log output), [~rmetzger]  
Could we add add a little log for  `AbstractTableInputFormat` as a hotfix ?

Once the test failed again, we can obtain more information to help fix it.
{code:java}
Caused by: java.io.IOException: Expecting at least one region.
Oct 20 13:34:40 at 
org.apache.flink.connector.hbase1.source.AbstractTableInputFormat.createInputSplits(AbstractTableInputFormat.java:225)
Oct 20 13:34:40 at 
org.apache.flink.connector.hbase1.source.AbstractTableInputFormat.createInputSplits(AbstractTableInputFormat.java:49)
Oct 20 13:34:40 at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:234)
Oct 20 13:34:40 ... 18 more
Oct 20 13:34
{code}
{code:java}
// The test failed here with no Exception stack, the Exception is thrown by 
Flink rather than Hbase side 
@Override
public TableInputSplit[] createInputSplits(final int minNumSplits) throws 
IOException {
   try {
  initTable();

  // Get the starting and ending row keys for every region in the currently 
open table
  final Pair keys = 
table.getRegionLocator().getStartEndKeys();
  if (keys == null || keys.getFirst() == null || keys.getFirst().length == 
0) {
 throw new IOException("Expecting at least one region.");
  }
{code}

> SQLClientHBaseITCase.testHBase fails on azure
> -
>
> Key: FLINK-18570
> URL: https://issues.apache.org/jira/browse/FLINK-18570
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase, Table SQL / Ecosystem
>Affects Versions: 1.12.0
>Reporter: Dawid Wysakowicz
>Assignee: Leonard Xu
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=4403=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529
> {code}
> 2020-07-10T17:06:32.1514539Z [INFO] Running 
> org.apache.flink.tests.util.hbase.SQLClientHBaseITCase
> 2020-07-10T17:08:09.9141283Z [ERROR] Tests run: 1, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 97.757 s <<< FAILURE! - in 
> org.apache.flink.tests.util.hbase.SQLClientHBaseITCase
> 2020-07-10T17:08:09.9144691Z [ERROR] 
> testHBase(org.apache.flink.tests.util.hbase.SQLClientHBaseITCase)  Time 
> elapsed: 97.757 s  <<< ERROR!
> 2020-07-10T17:08:09.9145637Z java.io.IOException: 
> 2020-07-10T17:08:09.9146515Z Process execution failed due error. Error 
> output:Jul 10, 2020 5:07:32 PM org.jline.utils.Log logr
> 2020-07-10T17:08:09.9147152Z WARNING: Unable to create a system terminal, 
> creating a dumb terminal (enable debug logging for more information)
> 2020-07-10T17:08:09.9147776Z Exception in thread "main" 
> org.apache.flink.table.client.SqlClientException: Unexpected exception. This 
> is a bug. Please consider filing an issue.
> 2020-07-10T17:08:09.9148432Z  at 
> org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
> 2020-07-10T17:08:09.9148828Z Caused by: java.lang.RuntimeException: Error 
> running SQL job.
> 2020-07-10T17:08:09.9149329Z  at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeUpdateInternal$14(LocalExecutor.java:598)
> 2020-07-10T17:08:09.9149932Z  at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:255)
> 2020-07-10T17:08:09.9150501Z  at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:592)
> 2020-07-10T17:08:09.9151088Z  at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:515)
> 2020-07-10T17:08:09.9151577Z  at 
> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:599)
> 2020-07-10T17:08:09.9152044Z  at 
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:315)
> 2020-07-10T17:08:09.9152456Z  at 
> java.util.Optional.ifPresent(Optional.java:159)
> 2020-07-10T17:08:09.9152874Z  at 
> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:212)
> 2020-07-10T17:08:09.9153312Z  at 
> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142)
> 2020-07-10T17:08:09.9153729Z  at 
> org.apache.flink.table.client.SqlClient.start(SqlClient.java:114)
> 2020-07-10T17:08:09.9154151Z  at 
> org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
> 2020-07-10T17:08:09.9154685Z Caused by: 
> java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
> 

[GitHub] [flink] flinkbot commented on pull request #13713: [FLINK-19733][python] Make fast_operation and slow_operation produce functions consistent

2020-10-20 Thread GitBox


flinkbot commented on pull request #13713:
URL: https://github.com/apache/flink/pull/13713#issuecomment-713269298


   
   ## CI report:
   
   * 75ac685135eac8dde548c3d04d96502964d68cf2 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13706: [FLINK-19677][runtime] Make JobManager lazily resolve hostname of TaskManager and provide an option to turn off reverse resolution en

2020-10-20 Thread GitBox


flinkbot edited a comment on pull request #13706:
URL: https://github.com/apache/flink/pull/13706#issuecomment-712871607


   
   ## CI report:
   
   * 5a5166c17661208d58eb831c577984482ae4ba99 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7948)
 
   * d75d71b5b60bffd3690739c9f12ebbaf62547d09 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7976)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-19718) HiveTableSourceITCase.testStreamPartitionRead is not stable on Azure

2020-10-20 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218070#comment-17218070
 ] 

Jingsong Lee commented on FLINK-19718:
--

Thanks for reporting this issue, I think we can use {{TableResult.collect}}

> HiveTableSourceITCase.testStreamPartitionRead is not stable on Azure
> 
>
> Key: FLINK-19718
> URL: https://issues.apache.org/jira/browse/FLINK-19718
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Priority: Blocker
> Fix For: 1.12.0
>
>
> Here are some instances:
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=7845=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=7875=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf
> {code}
> 2020-10-19T09:17:36.2004157Z [INFO] Results:
> 2020-10-19T09:17:36.2004505Z [INFO] 
> 2020-10-19T09:17:36.2007981Z [ERROR] Failures: 
> 2020-10-19T09:17:36.2010669Z [ERROR]   
> HiveTableSourceITCase.testStreamPartitionRead:537 
> expected:<[+I(0,0,2020-05-06 00:00:00), +I(1,1,2020-05-06 00:10:00), 
> +I(1,1_copy,2020-05-06 00:10:00), +I(2,2,2020-05-06 00:20:00), 
> +I(2,2_copy,2020-05-06 00:20:00), +I(3,3,2020-05-06 00:30:00), 
> +I(3,3_copy,2020-05-06 00:30:00), +I(4,4,2020-05-06 00:40:00), 
> +I(4,4_copy,2020-05-06 00:40:00), +I(5,5,2020-05-06 00:50:00), 
> +I(5,5_copy,2020-05-06 00:50:00)]> but was:<[]>
> 2020-10-19T09:17:36.2011985Z [INFO] 
> 2020-10-19T09:17:36.2012582Z [ERROR] Tests run: 80, Failures: 1, Errors: 0, 
> Skipped: 3
> 2020-10-19T09:17:36.2012976Z [INFO] 
> 2020-10-19T09:17:36.2137222Z [INFO] 
> 
> 2020-10-19T09:17:36.2140971Z [INFO] Reactor Summary:
> 2020-10-19T09:17:36.2141558Z [INFO] 
> 2020-10-19T09:17:36.2141987Z [INFO] Flink : Tools : Force Shading 
> .. SUCCESS [  1.346 s]
> 2020-10-19T09:17:36.2142534Z [INFO] Flink : Test utils : 
> ... SUCCESS [  1.845 s]
> 2020-10-19T09:17:36.2143098Z [INFO] Flink : Test utils : Junit 
> . SUCCESS [  3.265 s]
> 2020-10-19T09:17:36.2190677Z [INFO] Flink : Queryable state : 
> .. SUCCESS [  0.077 s]
> 2020-10-19T09:17:36.2191261Z [INFO] Flink : FileSystems : Azure FS Hadoop 
> .. SUCCESS [ 12.600 s]
> 2020-10-19T09:17:36.2191821Z [INFO] Flink : Examples : 
> . SUCCESS [  0.249 s]
> 2020-10-19T09:17:36.2192380Z [INFO] Flink : Examples : Batch 
> ... SUCCESS [  1.919 s]
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-19718) HiveTableSourceITCase.testStreamPartitionRead is not stable on Azure

2020-10-20 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-19718:


Assignee: Jingsong Lee

> HiveTableSourceITCase.testStreamPartitionRead is not stable on Azure
> 
>
> Key: FLINK-19718
> URL: https://issues.apache.org/jira/browse/FLINK-19718
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Assignee: Jingsong Lee
>Priority: Blocker
> Fix For: 1.12.0
>
>
> Here are some instances:
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=7845=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=7875=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf
> {code}
> 2020-10-19T09:17:36.2004157Z [INFO] Results:
> 2020-10-19T09:17:36.2004505Z [INFO] 
> 2020-10-19T09:17:36.2007981Z [ERROR] Failures: 
> 2020-10-19T09:17:36.2010669Z [ERROR]   
> HiveTableSourceITCase.testStreamPartitionRead:537 
> expected:<[+I(0,0,2020-05-06 00:00:00), +I(1,1,2020-05-06 00:10:00), 
> +I(1,1_copy,2020-05-06 00:10:00), +I(2,2,2020-05-06 00:20:00), 
> +I(2,2_copy,2020-05-06 00:20:00), +I(3,3,2020-05-06 00:30:00), 
> +I(3,3_copy,2020-05-06 00:30:00), +I(4,4,2020-05-06 00:40:00), 
> +I(4,4_copy,2020-05-06 00:40:00), +I(5,5,2020-05-06 00:50:00), 
> +I(5,5_copy,2020-05-06 00:50:00)]> but was:<[]>
> 2020-10-19T09:17:36.2011985Z [INFO] 
> 2020-10-19T09:17:36.2012582Z [ERROR] Tests run: 80, Failures: 1, Errors: 0, 
> Skipped: 3
> 2020-10-19T09:17:36.2012976Z [INFO] 
> 2020-10-19T09:17:36.2137222Z [INFO] 
> 
> 2020-10-19T09:17:36.2140971Z [INFO] Reactor Summary:
> 2020-10-19T09:17:36.2141558Z [INFO] 
> 2020-10-19T09:17:36.2141987Z [INFO] Flink : Tools : Force Shading 
> .. SUCCESS [  1.346 s]
> 2020-10-19T09:17:36.2142534Z [INFO] Flink : Test utils : 
> ... SUCCESS [  1.845 s]
> 2020-10-19T09:17:36.2143098Z [INFO] Flink : Test utils : Junit 
> . SUCCESS [  3.265 s]
> 2020-10-19T09:17:36.2190677Z [INFO] Flink : Queryable state : 
> .. SUCCESS [  0.077 s]
> 2020-10-19T09:17:36.2191261Z [INFO] Flink : FileSystems : Azure FS Hadoop 
> .. SUCCESS [ 12.600 s]
> 2020-10-19T09:17:36.2191821Z [INFO] Flink : Examples : 
> . SUCCESS [  0.249 s]
> 2020-10-19T09:17:36.2192380Z [INFO] Flink : Examples : Batch 
> ... SUCCESS [  1.919 s]
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on pull request #13713: [FLINK-19733][python] Make fast_operation and slow_operation produce functions consistent

2020-10-20 Thread GitBox


flinkbot commented on pull request #13713:
URL: https://github.com/apache/flink/pull/13713#issuecomment-713264885


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 75ac685135eac8dde548c3d04d96502964d68cf2 (Wed Oct 21 
03:05:44 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
* **This pull request references an unassigned [Jira 
ticket](https://issues.apache.org/jira/browse/FLINK-19733).** According to the 
[code contribution 
guide](https://flink.apache.org/contributing/contribute-code.html), tickets 
need to be assigned before starting with the implementation work.
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-19733) Make fast_operation and slow_operation produce functions consistent

2020-10-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-19733:
---
Labels: pull-request-available  (was: )

> Make fast_operation and slow_operation produce functions consistent
> ---
>
> Key: FLINK-19733
> URL: https://issues.apache.org/jira/browse/FLINK-19733
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: Huang Xingbo
>Priority: Major
>  Labels: pull-request-available
>
> The function generated by slow_operation uses the characteristics of python 
> syntax. In order to better reconstruct the python operation, we need to keep 
> the functions generated by fast_operation and slow_operation consistent.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] HuangXingBo opened a new pull request #13713: [FLINK-19733][python] Make fast_operation and slow_operation produce functions consistent

2020-10-20 Thread GitBox


HuangXingBo opened a new pull request #13713:
URL: https://github.com/apache/flink/pull/13713


   ## What is the purpose of the change
   
   *This pull request will make fast_operation and slow_operation produce 
functions consistent*
   
   
   ## Brief change log
   
 - *make the generated functions of slow operation consistent with fast 
operations*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13706: [FLINK-19677][runtime] Make JobManager lazily resolve hostname of TaskManager and provide an option to turn off reverse resolution en

2020-10-20 Thread GitBox


flinkbot edited a comment on pull request #13706:
URL: https://github.com/apache/flink/pull/13706#issuecomment-712871607


   
   ## CI report:
   
   * 5a5166c17661208d58eb831c577984482ae4ba99 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7948)
 
   * d75d71b5b60bffd3690739c9f12ebbaf62547d09 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13653: [FLINK-17528][table] Remove RowData#get() and ArrayData#get() and use FieldGetter and ElementGetter instead

2020-10-20 Thread GitBox


flinkbot edited a comment on pull request #13653:
URL: https://github.com/apache/flink/pull/13653#issuecomment-709337488


   
   ## CI report:
   
   * cadf9a2a13d3113395f199431881fff216b9a50a Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7974)
 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7958)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] kylemeow commented on a change in pull request #13706: [FLINK-19677][runtime] Make JobManager lazily resolve hostname of TaskManager and provide an option to turn off reverse resoluti

2020-10-20 Thread GitBox


kylemeow commented on a change in pull request #13706:
URL: https://github.com/apache/flink/pull/13706#discussion_r508957417



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
##
@@ -150,28 +166,48 @@ public String addressString() {
 
/**
 * Returns the fully-qualified domain name the TaskManager. If the name 
could not be
-* determined, the return value will be a textual representation of the 
TaskManager's IP address.
+* determined or {@link 
JobManagerOptions.RETRIEVE_TASK_MANAGER_HOSTNAME} is set to false,

Review comment:
   You are right that the implementation details of JobMaster should not be 
exposed to other classes, and I have updated the JavaDocs.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] kylemeow commented on a change in pull request #13706: [FLINK-19677][runtime] Make JobManager lazily resolve hostname of TaskManager and provide an option to turn off reverse resoluti

2020-10-20 Thread GitBox


kylemeow commented on a change in pull request #13706:
URL: https://github.com/apache/flink/pull/13706#discussion_r508956726



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##
@@ -606,7 +608,12 @@ private void releaseEmptyTaskManager(ResourceID 
resourceId) {
 
final TaskManagerLocation taskManagerLocation;
try {
-   taskManagerLocation = 
TaskManagerLocation.fromUnresolvedLocation(unresolvedTaskManagerLocation);
+   final boolean retrieveTaskManagerHostName = 
this.jobMasterConfiguration.getConfiguration()
+   
.getBoolean(JobManagerOptions.RETRIEVE_TASK_MANAGER_HOSTNAME);
+   if (!retrieveTaskManagerHostName) {
+   log.info("JobManager would not retrieve 
TaskManager's canonical hostname due to configuration.");

Review comment:
   Yes this is a little verbose to log on every registration, and since we 
have already printed all the configuration parameters during startup, this 
logging is removed in the new commit : )





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] kylemeow commented on a change in pull request #13706: [FLINK-19677][runtime] Make JobManager lazily resolve hostname of TaskManager and provide an option to turn off reverse resoluti

2020-10-20 Thread GitBox


kylemeow commented on a change in pull request #13706:
URL: https://github.com/apache/flink/pull/13706#discussion_r508956321



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##
@@ -606,7 +608,12 @@ private void releaseEmptyTaskManager(ResourceID 
resourceId) {
 
final TaskManagerLocation taskManagerLocation;
try {
-   taskManagerLocation = 
TaskManagerLocation.fromUnresolvedLocation(unresolvedTaskManagerLocation);
+   final boolean retrieveTaskManagerHostName = 
this.jobMasterConfiguration.getConfiguration()
+   
.getBoolean(JobManagerOptions.RETRIEVE_TASK_MANAGER_HOSTNAME);

Review comment:
   Thank you for the suggestion, I have moved this variable to a field in 
*JobMaster*.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] kylemeow commented on a change in pull request #13706: [FLINK-19677][runtime] Make JobManager lazily resolve hostname of TaskManager and provide an option to turn off reverse resoluti

2020-10-20 Thread GitBox


kylemeow commented on a change in pull request #13706:
URL: https://github.com/apache/flink/pull/13706#discussion_r508955384



##
File path: 
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
##
@@ -308,6 +308,19 @@
.defaultValue(Integer.MAX_VALUE)
.withDescription("The max number of completed jobs that 
can be kept in the job store.");
 
+   /**
+* Flag indicating whether JobManager would retrieve canonical host 
name of TaskManager during registration.
+*/
+   @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)

Review comment:
   Thanks for pointing out, and I have changed it as suggested : )





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] pyscala commented on pull request #13680: [FLINK-19553] [ Runtime / Web Frontend] The format of checkpoint Completion Time and Failure Time should be changed from HH:mm:ss to yyyy-MM-

2020-10-20 Thread GitBox


pyscala commented on pull request #13680:
URL: https://github.com/apache/flink/pull/13680#issuecomment-713257351


   > Thanks for your update, I think we should also include date information 
for trigger timestamp, please update related code and offer the updated UI with 
your new changes.
   
   @Myasuka Thanks for your reply. We have the same views and I will finish it 
as soon as possible.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-19605) Implement cumulative windowing for window aggregate operator

2020-10-20 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-19605.
---
Resolution: Fixed

Implemented in master: 9423cf1cf2e7a96d5017e641e52d7acaabf39333

> Implement cumulative windowing for window aggregate operator
> 
>
> Key: FLINK-19605
> URL: https://issues.apache.org/jira/browse/FLINK-19605
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> Support cumulative windows for existing window aggregate operator, i.e. 
> {{WindowOperator}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wuchong merged pull request #13650: [FLINK-19605][table-runtime-blink] Implement cumulative windowing for window aggregate operator

2020-10-20 Thread GitBox


wuchong merged pull request #13650:
URL: https://github.com/apache/flink/pull/13650


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on pull request #13650: [FLINK-19605][table-runtime-blink] Implement cumulative windowing for window aggregate operator

2020-10-20 Thread GitBox


wuchong commented on pull request #13650:
URL: https://github.com/apache/flink/pull/13650#issuecomment-713255813


   The failed Azure is the unstable e2e tests. This change shouldn't affect e2e 
part. 
   
   Merging...



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dianfu commented on a change in pull request #12902: [FLINK-18415][python] Support TableResult#collect in the Python Table…

2020-10-20 Thread GitBox


dianfu commented on a change in pull request #12902:
URL: https://github.com/apache/flink/pull/12902#discussion_r508938695



##
File path: flink-python/pyflink/table/table_result.py
##
@@ -151,6 +156,45 @@ def get_result_kind(self):
 """
 return 
ResultKind._from_j_result_kind(self._j_table_result.getResultKind())
 
+def collect(self):
+"""
+Get the result contents as a closeable row iterator.
+
+Note:
+
+For SELECT operation, the job will not be finished unless all result 
data has been
+collected. So we should actively close the job to avoid resource leak 
through
+CloseableIterator#close method. Calling CloseableIterator#close method 
will cancel the job
+and release related resources.
+
+For DML operation, Flink dose not support getting the real affected 
row count now. So the

Review comment:
   ```suggestion
   For DML operation, Flink does not support getting the real affected 
row count now. So the
   ```

##
File path: flink-python/pyflink/table/tests/test_table_environment_api.py
##
@@ -454,6 +458,145 @@ def test_to_retract_stream(self):
 "(True, )"]
 self.assertEqual(result, expected)
 
+def test_collect_for_all_data_types(self):
+
+def collect_from_source(t_env, expected_result):
+source = t_env.from_elements([(1, None, 1, True, 32767, 
-2147483648, 1.23, 1.98932,
+   bytearray(b'pyflink'), 'pyflink',
+   datetime.date(2014, 9, 13),
+   datetime.time(hour=12, minute=0, 
second=0,
+ microsecond=123000),
+   datetime.datetime(2018, 3, 11, 3, 
0, 0, 123000),
+   [Row(['pyflink']), 
Row(['pyflink']), Row(['pyflink'])],
+   {1: Row(['flink']), 2: 
Row(['pyflink'])},
+   
decimal.Decimal('100.05'),
+   
decimal.Decimal('100.0599989'
+   '99'))],
+ DataTypes.ROW([DataTypes.FIELD("a", 
DataTypes.BIGINT()),
+DataTypes.FIELD("b", 
DataTypes.BIGINT()),
+DataTypes.FIELD("c", 
DataTypes.TINYINT()),
+DataTypes.FIELD("d", 
DataTypes.BOOLEAN()),
+DataTypes.FIELD("e", 
DataTypes.SMALLINT()),
+DataTypes.FIELD("f", 
DataTypes.INT()),
+DataTypes.FIELD("g", 
DataTypes.FLOAT()),
+DataTypes.FIELD("h", 
DataTypes.DOUBLE()),
+DataTypes.FIELD("i", 
DataTypes.BYTES()),
+DataTypes.FIELD("j", 
DataTypes.STRING()),
+DataTypes.FIELD("k", 
DataTypes.DATE()),
+DataTypes.FIELD("l", 
DataTypes.TIME()),
+DataTypes.FIELD("m",
+
DataTypes.TIMESTAMP(3)),
+DataTypes.FIELD("n", 
DataTypes.ARRAY(
+
DataTypes.ROW([DataTypes.FIELD('ss2',
+   
DataTypes.STRING())]))),
+DataTypes.FIELD("o", 
DataTypes.MAP(
+
DataTypes.BIGINT(), DataTypes.ROW(
+
[DataTypes.FIELD('ss',
+ 
DataTypes.STRING())]))),
+DataTypes.FIELD("p",
+
DataTypes.DECIMAL(38, 18)),
+DataTypes.FIELD("q",
+DataTypes.DECIMAL(38, 
18))]))
+table_result = source.execute()
+with table_result.collect() as result:
+collected_result = []
+for i in result:
+collected_result.append(i)
+

[GitHub] [flink] flinkbot edited a comment on pull request #13653: [FLINK-17528][table] Remove RowData#get() and ArrayData#get() and use FieldGetter and ElementGetter instead

2020-10-20 Thread GitBox


flinkbot edited a comment on pull request #13653:
URL: https://github.com/apache/flink/pull/13653#issuecomment-709337488


   
   ## CI report:
   
   * cadf9a2a13d3113395f199431881fff216b9a50a Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7958)
 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7974)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (FLINK-19741) InternalTimeServiceManager fails to restore due to corrupt reads if there are other users of raw keyed state streams

2020-10-20 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-19741:
---

Assignee: Tzu-Li (Gordon) Tai

> InternalTimeServiceManager fails to restore due to corrupt reads if there are 
> other users of raw keyed state streams
> 
>
> Key: FLINK-19741
> URL: https://issues.apache.org/jira/browse/FLINK-19741
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.9.3, 1.10.2, 1.11.2
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.12.0, 1.11.3
>
>
> h2. *Diagnosis*
> Currently, when restoring a {{InternalTimeServiceManager}}, we always attempt 
> to read from the provided raw keyed state streams (using 
> {{InternalTimerServiceSerializationProxy}}):
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L117
> This is incorrect, since we don't write with the 
> {{InternalTimerServiceSerializationProxy}} if the timers do not require 
> legacy synchronous snapshots:
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L192
> (we currently only require that when users use RocksDB backend + heap timers).
> Therefore, the {{InternalTimeServiceManager}} can fail to be created on 
> restore due to corrupt reads in the case where:
> * a checkpoint was taken where {{useLegacySynchronousSnapshots}} is false 
> (hence nothing was written, and the time service manager does not use the raw 
> keyed stream)
> * the raw keyed stream is used elsewhere (e.g. in the Flink application's 
> user code)
> * on restore from the checkpoint, {{InternalTimeServiceManagerImpl.create()}} 
> attempts to read from the raw keyed stream with the 
> {{InternalTimerServiceSerializationProxy}}.
> Full error stack trace (with Flink 1.11.1):
> {code}
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: java.io.IOException: position out of bounds
>   at 
> org.apache.flink.runtime.state.StatePartitionStreamProvider.getStream(StatePartitionStreamProvider.java:58)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:235)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167)
>   ... 9 more
> Caused by: java.io.IOException: position out of bounds
>   at 
> org.apache.flink.runtime.state.memory.ByteStreamStateHandle$ByteStateHandleInputStream.seek(ByteStreamStateHandle.java:124)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:442)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:395)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:228)
>   ... 10 more
> {code}
> h2. *Reproducing*
> - Have an application with any operator that uses and writes to raw keyed 
> state streams
> - Use heap backend + any timer factory or RocksDB backend + RocksDB timers
> - Take a savepoint or wait for a checkpoint, and trigger a restore
> h2. *Proposed Fix*
> The fix would be to also respect the {{useLegacySynchronousSnapshots}} flag 
> in:

[jira] [Updated] (FLINK-19692) Can't restore feedback channel from savepoint

2020-10-20 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-19692:

Affects Version/s: statefun-2.0.0
   statefun-2.1.0

> Can't restore feedback channel from savepoint
> -
>
> Key: FLINK-19692
> URL: https://issues.apache.org/jira/browse/FLINK-19692
> Project: Flink
>  Issue Type: Bug
>  Components: Stateful Functions
>Affects Versions: statefun-2.0.0, statefun-2.1.0, statefun-2.2.0
>Reporter: Antti Kaikkonen
>Priority: Blocker
>
> When using the new statefun-flink-datastream integration the following error 
> is thrown by the *feedback -> union* task when trying to restore from a 
> savepoint:
> {code:java}
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: java.io.IOException: position out of bounds
> at 
> org.apache.flink.runtime.state.StatePartitionStreamProvider.getStream(StatePartitionStreamProvider.java:58)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:235)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167)
> ... 9 more
> Caused by: java.io.IOException: position out of bounds
> at 
> org.apache.flink.runtime.state.memory.ByteStreamStateHandle$ByteStateHandleInputStream.seek(ByteStreamStateHandle.java:124)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:442)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:395)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:228)
> ... 10 more
> {code}
>  The error is only thrown when the feedback channel has been used. 
> I have tested with the [example 
> application|https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java]
>  and the error is thrown only if it is modified to actually use the feedback 
> channel. I simply modified the invoke method to sometimes forward the 
> greeting to a random name: 
> {code:java}
> @Override
> public void invoke(Context context, Object input) {
>   int seen = seenCount.updateAndGet(MyFunction::increment);
>   context.send(GREETINGS, String.format("Hello %s at the %d-th time", input, 
> seen));
>   String[] names = {"Stephan", "Igal", "Gordon", "Seth", "Marta"};
>   ThreadLocalRandom random = ThreadLocalRandom.current();
>   int index = random.nextInt(names.length);
>   final String name2 = names[index];
>   if (random.nextDouble() < 0.5) context.send(new Address(GREET, name2), 
> input);
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19692) Can't restore feedback channel from savepoint

2020-10-20 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-19692:

Fix Version/s: statefun-2.2.1
   statefun-2.3.0

> Can't restore feedback channel from savepoint
> -
>
> Key: FLINK-19692
> URL: https://issues.apache.org/jira/browse/FLINK-19692
> Project: Flink
>  Issue Type: Bug
>  Components: Stateful Functions
>Affects Versions: statefun-2.0.0, statefun-2.1.0, statefun-2.2.0
>Reporter: Antti Kaikkonen
>Priority: Blocker
> Fix For: statefun-2.3.0, statefun-2.2.1
>
>
> When using the new statefun-flink-datastream integration the following error 
> is thrown by the *feedback -> union* task when trying to restore from a 
> savepoint:
> {code:java}
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: java.io.IOException: position out of bounds
> at 
> org.apache.flink.runtime.state.StatePartitionStreamProvider.getStream(StatePartitionStreamProvider.java:58)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:235)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167)
> ... 9 more
> Caused by: java.io.IOException: position out of bounds
> at 
> org.apache.flink.runtime.state.memory.ByteStreamStateHandle$ByteStateHandleInputStream.seek(ByteStreamStateHandle.java:124)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:442)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:395)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:228)
> ... 10 more
> {code}
>  The error is only thrown when the feedback channel has been used. 
> I have tested with the [example 
> application|https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java]
>  and the error is thrown only if it is modified to actually use the feedback 
> channel. I simply modified the invoke method to sometimes forward the 
> greeting to a random name: 
> {code:java}
> @Override
> public void invoke(Context context, Object input) {
>   int seen = seenCount.updateAndGet(MyFunction::increment);
>   context.send(GREETINGS, String.format("Hello %s at the %d-th time", input, 
> seen));
>   String[] names = {"Stephan", "Igal", "Gordon", "Seth", "Marta"};
>   ThreadLocalRandom random = ThreadLocalRandom.current();
>   int index = random.nextInt(names.length);
>   final String name2 = names[index];
>   if (random.nextDouble() < 0.5) context.send(new Address(GREET, name2), 
> input);
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19692) Can't restore feedback channel from savepoint

2020-10-20 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218055#comment-17218055
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-19692:
-

[~Antti-Kaikkonen] thanks a lot for reporting this.

We've currently identified FLINK-19741 as the cause for this error. This would 
require a new Flink minor / hotfix release, and updating the Flink dependency 
in StateFun with the new release.

In the meanwhile, [~igal] is also investigating if there is a possible solution 
for this in StateFun for 2.2.1, so that we don't have to wait for a new Flink 
release.

> Can't restore feedback channel from savepoint
> -
>
> Key: FLINK-19692
> URL: https://issues.apache.org/jira/browse/FLINK-19692
> Project: Flink
>  Issue Type: Bug
>  Components: Stateful Functions
>Affects Versions: statefun-2.2.0
>Reporter: Antti Kaikkonen
>Priority: Blocker
>
> When using the new statefun-flink-datastream integration the following error 
> is thrown by the *feedback -> union* task when trying to restore from a 
> savepoint:
> {code:java}
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: java.io.IOException: position out of bounds
> at 
> org.apache.flink.runtime.state.StatePartitionStreamProvider.getStream(StatePartitionStreamProvider.java:58)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:235)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167)
> ... 9 more
> Caused by: java.io.IOException: position out of bounds
> at 
> org.apache.flink.runtime.state.memory.ByteStreamStateHandle$ByteStateHandleInputStream.seek(ByteStreamStateHandle.java:124)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:442)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:395)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:228)
> ... 10 more
> {code}
>  The error is only thrown when the feedback channel has been used. 
> I have tested with the [example 
> application|https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java]
>  and the error is thrown only if it is modified to actually use the feedback 
> channel. I simply modified the invoke method to sometimes forward the 
> greeting to a random name: 
> {code:java}
> @Override
> public void invoke(Context context, Object input) {
>   int seen = seenCount.updateAndGet(MyFunction::increment);
>   context.send(GREETINGS, String.format("Hello %s at the %d-th time", input, 
> seen));
>   String[] names = {"Stephan", "Igal", "Gordon", "Seth", "Marta"};
>   ThreadLocalRandom random = ThreadLocalRandom.current();
>   int index = random.nextInt(names.length);
>   final String name2 = names[index];
>   if (random.nextDouble() < 0.5) context.send(new Address(GREET, name2), 
> input);
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wuchong commented on pull request #13653: [FLINK-17528][table] Remove RowData#get() and ArrayData#get() and use FieldGetter and ElementGetter instead

2020-10-20 Thread GitBox


wuchong commented on pull request #13653:
URL: https://github.com/apache/flink/pull/13653#issuecomment-713251236


   @flinkbot run azure



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Issue Comment Deleted] (FLINK-19718) HiveTableSourceITCase.testStreamPartitionRead is not stable on Azure

2020-10-20 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-19718:

Comment: was deleted

(was: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=7958=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf)

> HiveTableSourceITCase.testStreamPartitionRead is not stable on Azure
> 
>
> Key: FLINK-19718
> URL: https://issues.apache.org/jira/browse/FLINK-19718
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Priority: Blocker
> Fix For: 1.12.0
>
>
> Here are some instances:
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=7845=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=7875=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf
> {code}
> 2020-10-19T09:17:36.2004157Z [INFO] Results:
> 2020-10-19T09:17:36.2004505Z [INFO] 
> 2020-10-19T09:17:36.2007981Z [ERROR] Failures: 
> 2020-10-19T09:17:36.2010669Z [ERROR]   
> HiveTableSourceITCase.testStreamPartitionRead:537 
> expected:<[+I(0,0,2020-05-06 00:00:00), +I(1,1,2020-05-06 00:10:00), 
> +I(1,1_copy,2020-05-06 00:10:00), +I(2,2,2020-05-06 00:20:00), 
> +I(2,2_copy,2020-05-06 00:20:00), +I(3,3,2020-05-06 00:30:00), 
> +I(3,3_copy,2020-05-06 00:30:00), +I(4,4,2020-05-06 00:40:00), 
> +I(4,4_copy,2020-05-06 00:40:00), +I(5,5,2020-05-06 00:50:00), 
> +I(5,5_copy,2020-05-06 00:50:00)]> but was:<[]>
> 2020-10-19T09:17:36.2011985Z [INFO] 
> 2020-10-19T09:17:36.2012582Z [ERROR] Tests run: 80, Failures: 1, Errors: 0, 
> Skipped: 3
> 2020-10-19T09:17:36.2012976Z [INFO] 
> 2020-10-19T09:17:36.2137222Z [INFO] 
> 
> 2020-10-19T09:17:36.2140971Z [INFO] Reactor Summary:
> 2020-10-19T09:17:36.2141558Z [INFO] 
> 2020-10-19T09:17:36.2141987Z [INFO] Flink : Tools : Force Shading 
> .. SUCCESS [  1.346 s]
> 2020-10-19T09:17:36.2142534Z [INFO] Flink : Test utils : 
> ... SUCCESS [  1.845 s]
> 2020-10-19T09:17:36.2143098Z [INFO] Flink : Test utils : Junit 
> . SUCCESS [  3.265 s]
> 2020-10-19T09:17:36.2190677Z [INFO] Flink : Queryable state : 
> .. SUCCESS [  0.077 s]
> 2020-10-19T09:17:36.2191261Z [INFO] Flink : FileSystems : Azure FS Hadoop 
> .. SUCCESS [ 12.600 s]
> 2020-10-19T09:17:36.2191821Z [INFO] Flink : Examples : 
> . SUCCESS [  0.249 s]
> 2020-10-19T09:17:36.2192380Z [INFO] Flink : Examples : Batch 
> ... SUCCESS [  1.919 s]
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19718) HiveTableSourceITCase.testStreamPartitionRead is not stable on Azure

2020-10-20 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218050#comment-17218050
 ] 

Jark Wu commented on FLINK-19718:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=7958=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf

> HiveTableSourceITCase.testStreamPartitionRead is not stable on Azure
> 
>
> Key: FLINK-19718
> URL: https://issues.apache.org/jira/browse/FLINK-19718
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Priority: Blocker
> Fix For: 1.12.0
>
>
> Here are some instances:
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=7845=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=7875=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf
> {code}
> 2020-10-19T09:17:36.2004157Z [INFO] Results:
> 2020-10-19T09:17:36.2004505Z [INFO] 
> 2020-10-19T09:17:36.2007981Z [ERROR] Failures: 
> 2020-10-19T09:17:36.2010669Z [ERROR]   
> HiveTableSourceITCase.testStreamPartitionRead:537 
> expected:<[+I(0,0,2020-05-06 00:00:00), +I(1,1,2020-05-06 00:10:00), 
> +I(1,1_copy,2020-05-06 00:10:00), +I(2,2,2020-05-06 00:20:00), 
> +I(2,2_copy,2020-05-06 00:20:00), +I(3,3,2020-05-06 00:30:00), 
> +I(3,3_copy,2020-05-06 00:30:00), +I(4,4,2020-05-06 00:40:00), 
> +I(4,4_copy,2020-05-06 00:40:00), +I(5,5,2020-05-06 00:50:00), 
> +I(5,5_copy,2020-05-06 00:50:00)]> but was:<[]>
> 2020-10-19T09:17:36.2011985Z [INFO] 
> 2020-10-19T09:17:36.2012582Z [ERROR] Tests run: 80, Failures: 1, Errors: 0, 
> Skipped: 3
> 2020-10-19T09:17:36.2012976Z [INFO] 
> 2020-10-19T09:17:36.2137222Z [INFO] 
> 
> 2020-10-19T09:17:36.2140971Z [INFO] Reactor Summary:
> 2020-10-19T09:17:36.2141558Z [INFO] 
> 2020-10-19T09:17:36.2141987Z [INFO] Flink : Tools : Force Shading 
> .. SUCCESS [  1.346 s]
> 2020-10-19T09:17:36.2142534Z [INFO] Flink : Test utils : 
> ... SUCCESS [  1.845 s]
> 2020-10-19T09:17:36.2143098Z [INFO] Flink : Test utils : Junit 
> . SUCCESS [  3.265 s]
> 2020-10-19T09:17:36.2190677Z [INFO] Flink : Queryable state : 
> .. SUCCESS [  0.077 s]
> 2020-10-19T09:17:36.2191261Z [INFO] Flink : FileSystems : Azure FS Hadoop 
> .. SUCCESS [ 12.600 s]
> 2020-10-19T09:17:36.2191821Z [INFO] Flink : Examples : 
> . SUCCESS [  0.249 s]
> 2020-10-19T09:17:36.2192380Z [INFO] Flink : Examples : Batch 
> ... SUCCESS [  1.919 s]
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #13690: [FLINK-16595][YARN]support more HDFS nameServices in yarn mode when security enabled. Is…

2020-10-20 Thread GitBox


flinkbot edited a comment on pull request #13690:
URL: https://github.com/apache/flink/pull/13690#issuecomment-712304240


   
   ## CI report:
   
   * 67f01dfa8c82c0575564f7d20961f36a3d0c623b Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7951)
 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7971)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13665: [FLINK-19232][python] Support MapState and MapView in PyFlink.

2020-10-20 Thread GitBox


flinkbot edited a comment on pull request #13665:
URL: https://github.com/apache/flink/pull/13665#issuecomment-710028722


   
   ## CI report:
   
   * ab7e6fb1ee0ffa2f5cb68dfb03579b84b116afa3 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7946)
 
   * acc611e0a224ad1f9de40a81516a2510171518af Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7973)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-17159) ES6 ElasticsearchSinkITCase unstable

2020-10-20 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218043#comment-17218043
 ] 

Dian Fu commented on FLINK-17159:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=7969=logs=d44f43ce-542c-597d-bf94-b0718c71e5e8=34f486e1-e1e4-5dd2-9c06-bfdd9b9c74a8

> ES6 ElasticsearchSinkITCase unstable
> 
>
> Key: FLINK-17159
> URL: https://issues.apache.org/jira/browse/FLINK-17159
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch, Tests
>Affects Versions: 1.11.0, 1.12.0
>Reporter: Chesnay Schepler
>Assignee: Aljoscha Krettek
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.12.0, 1.11.3
>
>
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=7482=logs=64110e28-73be-50d7-9369-8750330e0bf1=aa84fb9a-59ae-5696-70f7-011bc086e59b]
> {code:java}
> 2020-04-15T02:37:04.4289477Z [ERROR] 
> testElasticsearchSinkWithSmile(org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase)
>   Time elapsed: 0.145 s  <<< ERROR!
> 2020-04-15T02:37:04.4290310Z 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 2020-04-15T02:37:04.4290790Z  at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
> 2020-04-15T02:37:04.4291404Z  at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:659)
> 2020-04-15T02:37:04.4291956Z  at 
> org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:77)
> 2020-04-15T02:37:04.4292548Z  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1643)
> 2020-04-15T02:37:04.4293254Z  at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase.runElasticSearchSinkTest(ElasticsearchSinkTestBase.java:128)
> 2020-04-15T02:37:04.4293990Z  at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase.runElasticsearchSinkSmileTest(ElasticsearchSinkTestBase.java:106)
> 2020-04-15T02:37:04.4295096Z  at 
> org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase.testElasticsearchSinkWithSmile(ElasticsearchSinkITCase.java:45)
> 2020-04-15T02:37:04.4295923Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2020-04-15T02:37:04.4296489Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2020-04-15T02:37:04.4297076Z  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-04-15T02:37:04.4297513Z  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2020-04-15T02:37:04.4297951Z  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2020-04-15T02:37:04.4298688Z  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2020-04-15T02:37:04.4299374Z  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2020-04-15T02:37:04.4300069Z  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2020-04-15T02:37:04.4300960Z  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> 2020-04-15T02:37:04.4301705Z  at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2020-04-15T02:37:04.4302204Z  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 2020-04-15T02:37:04.4302661Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 2020-04-15T02:37:04.4303234Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 2020-04-15T02:37:04.4303706Z  at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 2020-04-15T02:37:04.4304127Z  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 2020-04-15T02:37:04.4304716Z  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 2020-04-15T02:37:04.4305394Z  at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 2020-04-15T02:37:04.4305965Z  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 2020-04-15T02:37:04.4306425Z  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2020-04-15T02:37:04.4306942Z  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2020-04-15T02:37:04.4307466Z  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2020-04-15T02:37:04.4307920Z  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2020-04-15T02:37:04.4308375Z  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 

[GitHub] [flink] flinkbot edited a comment on pull request #13665: [FLINK-19232][python] Support MapState and MapView in PyFlink.

2020-10-20 Thread GitBox


flinkbot edited a comment on pull request #13665:
URL: https://github.com/apache/flink/pull/13665#issuecomment-710028722


   
   ## CI report:
   
   * ab7e6fb1ee0ffa2f5cb68dfb03579b84b116afa3 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7946)
 
   * acc611e0a224ad1f9de40a81516a2510171518af UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13690: [FLINK-16595][YARN]support more HDFS nameServices in yarn mode when security enabled. Is…

2020-10-20 Thread GitBox


flinkbot edited a comment on pull request #13690:
URL: https://github.com/apache/flink/pull/13690#issuecomment-712304240


   
   ## CI report:
   
   * 67f01dfa8c82c0575564f7d20961f36a3d0c623b Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7971)
 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7951)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] QingdongZeng3 edited a comment on pull request #13690: [FLINK-16595][YARN]support more HDFS nameServices in yarn mode when security enabled. Is…

2020-10-20 Thread GitBox


QingdongZeng3 edited a comment on pull request #13690:
URL: https://github.com/apache/flink/pull/13690#issuecomment-713237396


   @flinkbot run travis  



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] QingdongZeng3 commented on pull request #13690: [FLINK-16595][YARN]support more HDFS nameServices in yarn mode when security enabled. Is…

2020-10-20 Thread GitBox


QingdongZeng3 commented on pull request #13690:
URL: https://github.com/apache/flink/pull/13690#issuecomment-713237396


   @flinkbot run travis  
   Hi all, I want to re-run  the Failed CI,what shall I do? Is this command  
useful?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13702: [FLINK-19483][python][e2] Remove conda install zip

2020-10-20 Thread GitBox


flinkbot edited a comment on pull request #13702:
URL: https://github.com/apache/flink/pull/13702#issuecomment-712809420


   
   ## CI report:
   
   * d2a85a4e1d5b90aca82b49573e0cf544df13b1e2 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7937)
 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7972)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13690: [FLINK-16595][YARN]support more HDFS nameServices in yarn mode when security enabled. Is…

2020-10-20 Thread GitBox


flinkbot edited a comment on pull request #13690:
URL: https://github.com/apache/flink/pull/13690#issuecomment-712304240


   
   ## CI report:
   
   * 67f01dfa8c82c0575564f7d20961f36a3d0c623b Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7951)
 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7971)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dianfu commented on pull request #13702: [FLINK-19483][python][e2] Remove conda install zip

2020-10-20 Thread GitBox


dianfu commented on pull request #13702:
URL: https://github.com/apache/flink/pull/13702#issuecomment-713235868


   @flinkbot run azure



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-17159) ES6 ElasticsearchSinkITCase unstable

2020-10-20 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218026#comment-17218026
 ] 

Dian Fu commented on FLINK-17159:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=7966=logs=d44f43ce-542c-597d-bf94-b0718c71e5e8=03dca39c-73e8-5aaf-601d-328ae5c35f20

> ES6 ElasticsearchSinkITCase unstable
> 
>
> Key: FLINK-17159
> URL: https://issues.apache.org/jira/browse/FLINK-17159
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch, Tests
>Affects Versions: 1.11.0, 1.12.0
>Reporter: Chesnay Schepler
>Assignee: Aljoscha Krettek
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.12.0, 1.11.3
>
>
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=7482=logs=64110e28-73be-50d7-9369-8750330e0bf1=aa84fb9a-59ae-5696-70f7-011bc086e59b]
> {code:java}
> 2020-04-15T02:37:04.4289477Z [ERROR] 
> testElasticsearchSinkWithSmile(org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase)
>   Time elapsed: 0.145 s  <<< ERROR!
> 2020-04-15T02:37:04.4290310Z 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 2020-04-15T02:37:04.4290790Z  at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
> 2020-04-15T02:37:04.4291404Z  at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:659)
> 2020-04-15T02:37:04.4291956Z  at 
> org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:77)
> 2020-04-15T02:37:04.4292548Z  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1643)
> 2020-04-15T02:37:04.4293254Z  at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase.runElasticSearchSinkTest(ElasticsearchSinkTestBase.java:128)
> 2020-04-15T02:37:04.4293990Z  at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase.runElasticsearchSinkSmileTest(ElasticsearchSinkTestBase.java:106)
> 2020-04-15T02:37:04.4295096Z  at 
> org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase.testElasticsearchSinkWithSmile(ElasticsearchSinkITCase.java:45)
> 2020-04-15T02:37:04.4295923Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2020-04-15T02:37:04.4296489Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2020-04-15T02:37:04.4297076Z  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-04-15T02:37:04.4297513Z  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2020-04-15T02:37:04.4297951Z  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2020-04-15T02:37:04.4298688Z  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2020-04-15T02:37:04.4299374Z  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2020-04-15T02:37:04.4300069Z  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2020-04-15T02:37:04.4300960Z  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> 2020-04-15T02:37:04.4301705Z  at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2020-04-15T02:37:04.4302204Z  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 2020-04-15T02:37:04.4302661Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 2020-04-15T02:37:04.4303234Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 2020-04-15T02:37:04.4303706Z  at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 2020-04-15T02:37:04.4304127Z  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 2020-04-15T02:37:04.4304716Z  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 2020-04-15T02:37:04.4305394Z  at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 2020-04-15T02:37:04.4305965Z  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 2020-04-15T02:37:04.4306425Z  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2020-04-15T02:37:04.4306942Z  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2020-04-15T02:37:04.4307466Z  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2020-04-15T02:37:04.4307920Z  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2020-04-15T02:37:04.4308375Z  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 

[jira] [Commented] (FLINK-19589) Expose S3 options for tagging and object lifecycle policy for FileSystem

2020-10-20 Thread Padarn Wilson (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218027#comment-17218027
 ] 

Padarn Wilson commented on FLINK-19589:
---

[~AHeise] agreed - when I first asked the question I hadn't looked into it in 
enough depth to see option 1), but it seems like the natural one. If we go this 
way I assume I should open the issue on their community JIRA. 

[~rmetzger] interesting idea. We could perhaps add a new abstraction something 
like "WriteRequest", which in the case of S3 would be the equivalent of the 
"PutObjectRequest". I'm not sure if this would have many uses outside of the  
current case.

> Expose S3 options for tagging and object lifecycle policy for FileSystem
> 
>
> Key: FLINK-19589
> URL: https://issues.apache.org/jira/browse/FLINK-19589
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystems
>Affects Versions: 1.12.0
>Reporter: Padarn Wilson
>Assignee: Padarn Wilson
>Priority: Minor
>
> This ticket proposes to expose the management of two properties related S3 
> Object management:
>  - [Lifecycle configuration 
> |https://docs.aws.amazon.com/AmazonS3/latest/dev/intro-lifecycle-rules.html]
>  - [Object 
> tagging|https://docs.aws.amazon.com/AmazonS3/latest/dev/object-tagging.htm]
> Being able to control these is useful for people who want to manage jobs 
> using S3 for checkpointing or job output, but need to control per job level 
> configuration of the tagging/lifecycle for the purposes of auditing or cost 
> control (for example deleting old state from S3)
> Ideally, it would be possible to control this on each object being written by 
> Flink, or at least at a job level.
> _Note_*:* Some related existing properties can be set using the hadoop module 
> using system properties: see for example 
> {code:java}
> fs.s3a.acl.default{code}
> which sets the default ACL on written objects.
> *Solutions*:
> 1) Modify hadoop module:
> The above-linked module could be updated in order to have a new property (and 
> similar for lifecycle)
>  fs.s3a.tags.default
>  which could be a comma separated list of tags to set. For example
> {code:java}
> fs.s3a.acl.default = "jobname:JOBNAME,owner:OWNER"{code}
> This seems like a natural place to put this logic (and is outside of Flink if 
> we decide to go this way. However it does not allow for a sink and checkpoint 
> to have different values for these.
> 2) Expose withTagging from module
> The hadoop module used by Flink's existing filesystem has already exposed put 
> request level tagging (see 
> [this|https://github.com/aws/aws-sdk-java/blob/c06822732612d7208927d2a678073098522085c3/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/model/PutObjectRequest.java#L292]).
>  This could be used in the Flink filesystem plugin to expose these options. A 
> possible approach could be to somehow incorporate it into the file path, e.g.,
> {code:java}
> path = "TAGS:s3://bucket/path"{code}
>  Or possible as an option that can be applied to the checkpoint and sink 
> configurations, e.g.,
> {code:java}
> env.getCheckpointingConfig().setS3Tags(TAGS) {code}
> and similar for a file sink.
> _Note_: The lifecycle can also be managed using the module: see 
> [here|https://docs.aws.amazon.com/AmazonS3/latest/dev/manage-lifecycle-using-java.html].
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] QingdongZeng3 commented on pull request #13690: [FLINK-16595][YARN]support more HDFS nameServices in yarn mode when security enabled. Is…

2020-10-20 Thread GitBox


QingdongZeng3 commented on pull request #13690:
URL: https://github.com/apache/flink/pull/13690#issuecomment-713234240


   @flinkbot run azure



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (FLINK-19743) Add metrics definitions.

2020-10-20 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin reassigned FLINK-19743:


Assignee: Jiangjie Qin

> Add metrics definitions.
> 
>
> Key: FLINK-19743
> URL: https://issues.apache.org/jira/browse/FLINK-19743
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common
>Affects Versions: 1.11.2
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Major
>
> Add the metrics defined in 
> [FLIP-33|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics]
>  to \{{OperatorMetricsGroup}} and {{SourceReaderContext}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19743) Add metrics definitions.

2020-10-20 Thread Jiangjie Qin (Jira)
Jiangjie Qin created FLINK-19743:


 Summary: Add metrics definitions.
 Key: FLINK-19743
 URL: https://issues.apache.org/jira/browse/FLINK-19743
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Common
Affects Versions: 1.11.2
Reporter: Jiangjie Qin


Add the metrics defined in 
[FLIP-33|https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics]
 to \{{OperatorMetricsGroup}} and {{SourceReaderContext}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink-shaded] AlexDag commented on issue #30: Shaded netty does not work with epoll activated

2020-10-20 Thread GitBox


AlexDag commented on issue #30:
URL: https://github.com/apache/flink-shaded/issues/30#issuecomment-713218823


   Hi Roman, Till,
   sorry to bother you, I wanted to notify you about : finally , I've
   successfully run today my Apache Flink.
   My solution was:
   1. Download Apache Flink *1.10.2*
   2. Nothing changing  on my VMs nodes.
   3. Run ./bin/start-cluster.sh
   4. Run WordCount.jar
   It's all.
   It would be great to find a solution for Apache Flink 1.11.*.
   
   Best Regards
   Alex
   
   
   
   On Tue, Oct 20, 2020 at 9:58 AM Till Rohrmann 
   wrote:
   
   > Did the other loading mechanism load the library? Maybe you could share
   > the complete logs with us.
   >
   > —
   > You are receiving this because you commented.
   > Reply to this email directly, view it on GitHub
   > ,
   > or unsubscribe
   > 

   > .
   >
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13457: [FLINK-8357] Use rolling logs as default

2020-10-20 Thread GitBox


flinkbot edited a comment on pull request #13457:
URL: https://github.com/apache/flink/pull/13457#issuecomment-697012704


   
   ## CI report:
   
   * 72242a97ba40dedecb60632bcd8bd7ac3d1459ff Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7710)
 
   * 14db2d58abff9a9b1ab6c997af66180c6301373c Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7970)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13709: [FLINK-19401][checkpointing] Download checkpoints only if needed

2020-10-20 Thread GitBox


flinkbot edited a comment on pull request #13709:
URL: https://github.com/apache/flink/pull/13709#issuecomment-712988228


   
   ## CI report:
   
   * 14f7ba792a5ff15a1e6c2ca8cae93c7f3c63fe7c Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7965)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #13457: [FLINK-8357] Use rolling logs as default

2020-10-20 Thread GitBox


flinkbot edited a comment on pull request #13457:
URL: https://github.com/apache/flink/pull/13457#issuecomment-697012704


   
   ## CI report:
   
   * 72242a97ba40dedecb60632bcd8bd7ac3d1459ff Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7710)
 
   * 14db2d58abff9a9b1ab6c997af66180c6301373c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] kottmann commented on pull request #13457: [FLINK-8357] Use rolling logs as default

2020-10-20 Thread GitBox


kottmann commented on pull request #13457:
URL: https://github.com/apache/flink/pull/13457#issuecomment-713200967


   I changed it as suggested in the log4j.properties and the 
log4j-console.properties.
   
   The MAX_LOG_FILE_NUMBER environment variable is not set for the started Java 
process, therefore I added an export to expose it, otherwise the default (10 as 
specified in the log4j properties) is always used.
   
   The two defaults could be confusing, what do you think about not specifying 
it in the log4j properties? If there are no bugs in the shell scripts 
MAX_LOG_FILE_NUMBER should be used anyway (defaults to 5) or to the user 
configured value. If the variable fails to be set for some reason log4j 
defaults to 7.
   
   I would still propose these changes before merge:
   - Remove the default form the log4j properties files
   - Increase the default from 5 to 10
   - Add the on startup triggering policy for the log4j-console.properties file



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   3   4   5   6   >