Weird Flink SQL error

2022-11-22 Thread Dan Hill
Hi.  I'm hitting an obfuscated Flink SQL parser error.  Is there a way to
get better errors for Flink SQL?  I'm hitting it when I wrap some of the
fields on an inner Row.


*Works*

CREATE TEMPORARY VIEW `test_content_metrics_view` AS
SELECT
DATE_FORMAT(TUMBLE_ROWTIME(rowtime, INTERVAL '1' DAY), '-MM-dd'),
platform_id,
content_id
FROM content_event
GROUP BY
platform_id,
content_id,
TUMBLE(rowtime, INTERVAL '1' DAY)

CREATE TABLE test_content_metrics (
   dt STRING NOT NULL,
   `platform_id` BIGINT,
   `content_id` STRING
) PARTITIONED BY (dt) WITH (
   'connector' = 'filesystem',
   'path' = 'etl/test_content_metrics',
   'format' = 'json',
)

INSERT INTO `test_content_metrics`
SELECT * FROM `test_content_metrics_view`


*Fails*

Wrapping a couple parameters in a Row causes the following exception.

 Caused by: org.apache.flink.sql.parser.impl.ParseException:
Encountered "." at line 1, column 119.
Was expecting one of:
")" ...
"," ...

   
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:40981)
   
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:40792)
   
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:25220)
   
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java:19925)
   
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java:19581)
   [...]


CREATE TEMPORARY VIEW `test_content_metrics_view` AS
SELECT
DATE_FORMAT(TUMBLE_ROWTIME(rowtime, INTERVAL '1' DAY), '-MM-dd'),
ROW(
platform_id,
content_id
)
FROM content_event
GROUP BY
platform_id,
content_id,
TUMBLE(rowtime, INTERVAL '1' DAY)

CREATE TABLE test_content_metrics (
   dt STRING NOT NULL,
   `body` ROW(
   `platform_id` BIGINT,
   `content_id` STRING
   )
) PARTITIONED BY (dt) WITH (
   'connector' = 'filesystem',
   'path' = 'etl/test_content_metrics',
   'format' = 'json',
)

INSERT INTO `test_content_metrics`
SELECT * FROM `test_content_metrics_view`


Re:Re: Flink Operator in an off-line k8s enviroment

2022-11-22 Thread
Hi Geng Biao,
I works for me, thank you.
















At 2022-11-22 23:16:41, "Geng Biao"  wrote:

Hi Mark,

 

I guess you have to create your own local image registry service which your k8s 
cluster can connect to and upload the image of flink k8s operator to the 
service. After that, you can run something like `helm install 
flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator --set 
image.repository= ` to tell the k8s to use 
your local image.

 

Best,
Biao Geng

 

From: Mark Lee 
Date: Tuesday, November 22, 2022 at 9:01 PM
To: user@flink.apache.org 
Subject: Flink Operator in an off-line k8s enviroment

Hi all,

I installed flink operator following 
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/try-flink-kubernetes-operator/quick-start/.

helm repo add flink-operator-repo 
https://downloads.apache.org/flink/flink-kubernetes-operator-1.2.0/

helm install flink-kubernetes-operator 
flink-operator-repo/flink-kubernetes-operator

 

I execute above commands in a helm client(can touch internet) collect a k8s 
environment which cann’t connect internet.

 

  The flink operator is installed correctly but I got such errors for my k8s 
cluster can’t connect internet.

What steps can I do to run flink operator correctly in my off-line k8s cluster?

Should I run a local helm repo to replace the image 
“ghcr.io/apache/flink-kubernetes-operator:95128bf” to a local image?

 

   Thank you.

 

[root@localhost ~]# kubectl  get pods

NAMEREADY   STATUS RESTARTS 
  AGE

flink-kubernetes-operator-7797c7bd7-tpbqf   0/1 ImagePullBackOff   0
  124m

 

[root@localhost ~]# kubectl  describe pod 
flink-kubernetes-operator-7797c7bd7-tpbqf | grep Image -C 5

Normal   AddedInterface  124mmultus Add eth0 
[10.128.6.212/14] from kube-ovn

  Warning  Failed  119m (x4 over 123m) kubeletError: 
ErrImagePull

  Warning  Failed  118m (x7 over 123m) kubeletError: 
ImagePullBackOff

  Normal   Pulling 34m (x19 over 124m) kubeletPulling 
image "ghcr.io/apache/flink-kubernetes-operator:95128bf"

  Warning  Failed  8m53s (x23 over 123m)   kubeletFailed to 
pull image "ghcr.io/apache/flink-kubernetes-operator:95128bf": rpc error: code 
= Unknown desc = pinging container registry ghcr.io: Get "https://ghcr.io/v2/": 
dial tcp 20.205.243.164:443: i/o timeout

  Normal   BackOff 4m20s (x424 over 123m)  kubeletBack-off 
pulling image "ghcr.io/apache/flink-kubernetes-operator:95128bf"

 

Re: Re: Flink启动python任务异常:undefined symbol: _Py_LegacyLocaleDetected

2022-11-22 Thread Xingbo Huang
conda构建的venv会把机器底层一些c的库包括进来,会比较全。通过python venv包构建的虚拟环境可能会因为跨机器出现问题。

Best,
Xingbo

RS  于2022年11月23日周三 09:25写道:

> Hi,
> 我这边使用的python命令构建的,没有用conda,这个应该没有影响吧
> python3 -m venv jxy_venv
>
>
> 我启动了一个单点的flink测试,本机启动,有python环境,测试是可以运行成功的
>
>
>
> Thanks
>
>
>
>
>
> 在 2022-11-22 15:39:48,"Xingbo Huang"  写道:
> >Hi RS,
> >
> >你是使用conda构建的venv吗,可以参考PyFlink 准备环境的文档[1]
> >
> >Best,
> >Xingbo
> >
> >[1]
> >
> https://pyflink.readthedocs.io/en/latest/getting_started/installation/prepare.html#create-a-virtual-environment-using-conda
> >
> >RS  于2022年11月22日周二 15:14写道:
> >
> >> Hi,
> >> Flink版本:1.15.1
> >>
> >>
> >>
> A机器:在A机器上创建python虚拟环境,版本3.6.8,安装flink等python包,然后打包ZIP,jxy_venv.zip,上传到HDFS上
> >> B机器:在B机器上,主机上没有Python环境,Flink运行在K8S的docker中,docker里面也没有python环境,
> >> C机器:在C机器上,有flink的client,存在python环境,负责启动任务
> >>
> >>
> >> 启动命令:
> >> ./bin/flink run -Dexecution.runtime-mode=BATCH -d -m 192.168.1.2:8081
> -n
> >> -py /xxx/main.py -pyfs hdfs://xxx/config.py -pyarch
> >> hdfs://xxx/jxy_venv.zip#venv -pyclientexec venv/jxy_venv/bin/python
> >>
> >>
> >> 报错信息:
> >> ...
> >> venv/jxy_venv/bin/python: symbol lookup error: venv/jxy_venv/bin/python:
> >> undefined symbol: _Py_LegacyLocaleDetected
> >> org.apache.flink.client.program.ProgramAbortException:
> >> java.lang.RuntimeException: Python process exits with code: 127
> >> at
> >> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> at
> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> at
> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> at java.lang.reflect.Method.invoke(Method.java:498)
> >> at
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> >> at
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> >> at
> >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> >> at
> >>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:836)
> >> at
> >> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
> >> at
> >>
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1078)
> >> at
> >>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1156)
> >> at java.security.AccessController.doPrivileged(Native Method)
> >> at javax.security.auth.Subject.doAs(Subject.java:422)
> >> at
> >>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
> >> at
> >>
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >> at
> >> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1156)
> >> Caused by: java.lang.RuntimeException: Python process exits with code:
> 127
> >> at
> >> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
> >> ... 16 more
> >>
> >>
> >> 请问下,这种情况需要怎么处理 ?
> >> flink的环境中,一定要安装python命令吗?
> >>
> >>
> >> Thanks
> >>
> >>
>


Re:Re:Re: Flink启动python任务异常:undefined symbol: _Py_LegacyLocaleDetected

2022-11-22 Thread RS
Hi,
我单独测试了下python的虚拟环境使用,发现目标机器必须存在python的环境,不然复制了虚拟环境过去也是无法执行的,
还以为虚拟环境复制过去就可以跑起来了,和我想的不太一样,哈哈~


3.6版本报错
# ./python
./python: error while loading shared libraries: libpython3.6m.so.1.0: cannot 
open shared object file: No such file or directory


3.9版本报错

# ./venv/bin/python

Could not find platform independent libraries 

Could not find platform dependent libraries 

Consider setting $PYTHONHOME to [:]

Python path configuration:

  PYTHONHOME = (not set)

  PYTHONPATH = (not set)

  program name = './venv/bin/python'

  isolated = 0

  environment = 1

  user site = 1

  import site = 1

  sys._base_executable = '/root/tmp/venv/bin/python'

  sys.base_prefix = '/usr/local'

  sys.base_exec_prefix = '/usr/local'

  sys.platlibdir = 'lib'

  sys.executable = '/root/tmp/venv/bin/python'

  sys.prefix = '/usr/local'

  sys.exec_prefix = '/usr/local'

  sys.path = [

'/usr/local/lib/python39.zip',

'/usr/local/lib/python3.9',

'/usr/local/lib/lib-dynload',

  ]

Fatal Python error: init_fs_encoding: failed to get the Python codec of the 
filesystem encoding

Python runtime state: core initialized

ModuleNotFoundError: No module named 'encodings'




Current thread 0x7ff010275740 (most recent call first):






Thanks





在 2022-11-23 09:25:04,"RS"  写道:
>Hi,
>我这边使用的python命令构建的,没有用conda,这个应该没有影响吧
>python3 -m venv jxy_venv
>
>
>我启动了一个单点的flink测试,本机启动,有python环境,测试是可以运行成功的
>
>
>
>Thanks
>
>
>
>
>
>在 2022-11-22 15:39:48,"Xingbo Huang"  写道:
>>Hi RS,
>>
>>你是使用conda构建的venv吗,可以参考PyFlink 准备环境的文档[1]
>>
>>Best,
>>Xingbo
>>
>>[1]
>>https://pyflink.readthedocs.io/en/latest/getting_started/installation/prepare.html#create-a-virtual-environment-using-conda
>>
>>RS  于2022年11月22日周二 15:14写道:
>>
>>> Hi,
>>> Flink版本:1.15.1
>>>
>>>
>>> A机器:在A机器上创建python虚拟环境,版本3.6.8,安装flink等python包,然后打包ZIP,jxy_venv.zip,上传到HDFS上
>>> B机器:在B机器上,主机上没有Python环境,Flink运行在K8S的docker中,docker里面也没有python环境,
>>> C机器:在C机器上,有flink的client,存在python环境,负责启动任务
>>>
>>>
>>> 启动命令:
>>> ./bin/flink run -Dexecution.runtime-mode=BATCH -d -m 192.168.1.2:8081 -n
>>> -py /xxx/main.py -pyfs hdfs://xxx/config.py -pyarch
>>> hdfs://xxx/jxy_venv.zip#venv -pyclientexec venv/jxy_venv/bin/python
>>>
>>>
>>> 报错信息:
>>> ...
>>> venv/jxy_venv/bin/python: symbol lookup error: venv/jxy_venv/bin/python:
>>> undefined symbol: _Py_LegacyLocaleDetected
>>> org.apache.flink.client.program.ProgramAbortException:
>>> java.lang.RuntimeException: Python process exits with code: 127
>>> at
>>> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>>> at
>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:836)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1078)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1156)
>>> at java.security.AccessController.doPrivileged(Native Method)
>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>> at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
>>> at
>>> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1156)
>>> Caused by: java.lang.RuntimeException: Python process exits with code: 127
>>> at
>>> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
>>> ... 16 more
>>>
>>>
>>> 请问下,这种情况需要怎么处理 ?
>>> flink的环境中,一定要安装python命令吗?
>>>
>>>
>>> Thanks
>>>
>>>


Re: 如何扩展flink sql以支持CTAS/CDAS语句?

2022-11-22 Thread Shengkai Fang
想问一下你想实现的功能是咋样的呢?是阿里云上的那种吗?

Best,
Shengkai

casel.chen  于2022年11月23日周三 08:29写道:

> flink sql社区版貌似还不支持CTAS/CDAS语句,请问要如何扩展flink
> sql以支持CTAS/CDAS语句?可以给个思路或代码示例吗?谢谢!


Re:flink作业提交运行后如何监听作业状态发生变化?

2022-11-22 Thread RS
Hi,


Flink的Metric了解下,里面应该有作业的状态
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#reporter


配置不同的Metric方式,有的是拉取,有的是推送的机制,
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/



Thanks




在 2022-11-23 08:32:11,"casel.chen"  写道:
>请问flink作业提交运行后如何监听作业状态发生变化以便在控台上实时显示作业状态变更?目前我们的做法是轮询,但效率低,有没有listener可以进行注册的方法呢?


Re:Re: Flink启动python任务异常:undefined symbol: _Py_LegacyLocaleDetected

2022-11-22 Thread RS
Hi,
我这边使用的python命令构建的,没有用conda,这个应该没有影响吧
python3 -m venv jxy_venv


我启动了一个单点的flink测试,本机启动,有python环境,测试是可以运行成功的



Thanks





在 2022-11-22 15:39:48,"Xingbo Huang"  写道:
>Hi RS,
>
>你是使用conda构建的venv吗,可以参考PyFlink 准备环境的文档[1]
>
>Best,
>Xingbo
>
>[1]
>https://pyflink.readthedocs.io/en/latest/getting_started/installation/prepare.html#create-a-virtual-environment-using-conda
>
>RS  于2022年11月22日周二 15:14写道:
>
>> Hi,
>> Flink版本:1.15.1
>>
>>
>> A机器:在A机器上创建python虚拟环境,版本3.6.8,安装flink等python包,然后打包ZIP,jxy_venv.zip,上传到HDFS上
>> B机器:在B机器上,主机上没有Python环境,Flink运行在K8S的docker中,docker里面也没有python环境,
>> C机器:在C机器上,有flink的client,存在python环境,负责启动任务
>>
>>
>> 启动命令:
>> ./bin/flink run -Dexecution.runtime-mode=BATCH -d -m 192.168.1.2:8081 -n
>> -py /xxx/main.py -pyfs hdfs://xxx/config.py -pyarch
>> hdfs://xxx/jxy_venv.zip#venv -pyclientexec venv/jxy_venv/bin/python
>>
>>
>> 报错信息:
>> ...
>> venv/jxy_venv/bin/python: symbol lookup error: venv/jxy_venv/bin/python:
>> undefined symbol: _Py_LegacyLocaleDetected
>> org.apache.flink.client.program.ProgramAbortException:
>> java.lang.RuntimeException: Python process exits with code: 127
>> at
>> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:836)
>> at
>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1078)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1156)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
>> at
>> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1156)
>> Caused by: java.lang.RuntimeException: Python process exits with code: 127
>> at
>> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
>> ... 16 more
>>
>>
>> 请问下,这种情况需要怎么处理 ?
>> flink的环境中,一定要安装python命令吗?
>>
>>
>> Thanks
>>
>>


flink作业提交运行后如何监听作业状态发生变化?

2022-11-22 Thread casel.chen
请问flink作业提交运行后如何监听作业状态发生变化以便在控台上实时显示作业状态变更?目前我们的做法是轮询,但效率低,有没有listener可以进行注册的方法呢?

如何扩展flink sql以支持CTAS/CDAS语句?

2022-11-22 Thread casel.chen
flink sql社区版貌似还不支持CTAS/CDAS语句,请问要如何扩展flink sql以支持CTAS/CDAS语句?可以给个思路或代码示例吗?谢谢!

如何扩展flink sql以支持CTAS/CDAS语句?

2022-11-22 Thread casel.chen
flink sql社区版貌似还不支持CTAS/CDAS语句,请问要如何扩展flink sql以支持CTAS/CDAS语句?可以给个思路或代码示例吗?谢谢!

如何正确扩展jdbc connector以支持更多的数据库方言?

2022-11-22 Thread casel.chen
如何正确扩展jdbc connector以支持更多的数据库方言?我们目前的做法是拉下flink源码直接进行修改添加方言支持,有没有更优雅的方式来实现呢?

Fwd: Stand alone K8s HA mode with Static Tokens Used by Service Accounts

2022-11-22 Thread Berkay Polat via user
Hi team,

Bumping this up again, from the AWS docs, the suggested approach is to
simply upgrade the K8s java SDK client (
https://github.com/kubernetes-client/java/) being used. However, in Flink's
case with the io.fabric8 K8s client, I am not sure how to handle it. Any
help and guidance would be much appreciated.

Thanks,

-- Forwarded message -
From: Berkay Polat 
Date: Thu, Nov 17, 2022 at 12:36 PM
Subject: Stand alone K8s HA mode with Static Tokens Used by Service Accounts
To: 


Hi,

Our team has been using flink 1.15 and we have a stand alone K8s flink
setup that uses K8s HA services for its HA mode. Recently, our organization
is in the works of updating their EKS clusters' Kubernetes versions to 1.21
or later. We received a request from our support team that the service
accounts associated with our stand alone flink cluster have been using
static tokens, which is not permitted for newer K8s versions. Instead, they
requested us to switch to a refresh token approach (
https://docs.aws.amazon.com/eks/latest/userguide/service-accounts.html#identify-pods-using-stale-tokens
).

>From what I understand, in flink 1.15, HA mode is using version 5.5.0 of
io.fabric8's kubernetes client and it seems that it is compatible with K8s
1.21.1 and later (
https://github.com/fabric8io/kubernetes-client#compatibility-matrix) so I
am not sure what the underlying limitation/issue is here.

The AWS doc link I referred to earlier recommends upgrading versions for
Kubernetes Client SDKs but it refers to io.kubernetes's client SDKs, not
io.fabric8.

Could someone shed some light on it? Would it be worth it to request a
change to upgrade the io.fabric8 kubernetes client version to a newer
version?

Thanks,
-- 
*BERKAY POLAT*
Software Engineer SMTS | MuleSoft at Salesforce
Mobile: 443-710-7021




-- 
*BERKAY POLAT*
Software Engineer SMTS | MuleSoft at Salesforce
Mobile: 443-710-7021


RE: "Authentication failed" in "ConnectionState" when enabling internal SSL on Yarn with self signed certificate

2022-11-22 Thread LINZ, Arnaud
Last update :
My flink version is 1.14.3 in fact. The application works when enabling 
internal SSL in “local” intra-jvm cluster mode, so the certificate seems 
correct.
I see no log in Yarn server side, only that the application get killed.
I will try to take stack traces…

De : LINZ, Arnaud
Envoyé : mardi 22 novembre 2022 17:41
À : user 
Objet : RE: "Authentication failed" in "ConnectionState" when enabling internal 
SSL on Yarn with self signed certificate

Update :
In fact this « Authentication failed” message also appears when SSL is turned 
off (and when the yarn application succeeds), so it’s more of a warning and has 
no link with the “freeze” when SSL is turned on.

Thus, when internal SSL is enabled, I have no error in the yarn log, and the 
only error I get is a “timed out error” like the one you get when you don’t 
have enough ressources :
(NoResourceAvailableException: Slot request bulk is not fulfillable! Could not 
allocate the required slot within slot request timeout)
But I do have enough resources.

De : LINZ, Arnaud
Envoyé : mardi 22 novembre 2022 17:18
À : user mailto:user@flink.apache.org>>
Objet : "Authentication failed" in "ConnectionState" when enabling internal SSL 
on Yarn with self signed certificate

Hello,
I use Flink 1.14.3 in Yarn cluster mode.
I’ve followed the instructions listed here 
(https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/security/security-ssl/
 

   ) to turn on internal SSL:


$ keytool -genkeypair \

  -alias flink.internal \

  -keystore internal.keystore \

  -dname "CN=flink.internal" \

  -storepass internal_store_password \

  -keyalg RSA \

  -keysize 4096 \

  -storetype PKCS12



security.ssl.internal.enabled: true

security.ssl.internal.keystore: /path/to/flink/conf/internal.keystore

security.ssl.internal.truststore: /path/to/flink/conf/internal.keystore

security.ssl.internal.keystore-password: internal_store_password

security.ssl.internal.truststore-password: internal_store_password

security.ssl.internal.key-password: internal_store_password


I’ve shipped the keystore on every node, and get no error from keystore reading.
However the application fails to start (stuck in initializing step), with the 
only error log in Yarn containers :
15:49:46.397 [main-EventThread] ERROR 
org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState - 
Authentication failed


Could you please explain me what this “zookeeper” curator connection does and 
why it no longer works when enabling internal SSL ?



Best regards,

Arnaud







L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company 
that sent this message cannot therefore be held liable for its content nor 
attachments. Any unauthorized use or dissemination is prohibited. If you are 
not the intended recipient of this message, then please delete it and notify 
the sender.


RE: "Authentication failed" in "ConnectionState" when enabling internal SSL on Yarn with self signed certificate

2022-11-22 Thread LINZ, Arnaud
Update :
In fact this « Authentication failed” message also appears when SSL is turned 
off (and when the yarn application succeeds), so it’s more of a warning and has 
no link with the “freeze” when SSL is turned on.

Thus, when internal SSL is enabled, I have no error in the yarn log, and the 
only error I get is a “timed out error” like the one you get when you don’t 
have enough ressources :
(NoResourceAvailableException: Slot request bulk is not fulfillable! Could not 
allocate the required slot within slot request timeout)
But I do have enough resources.

De : LINZ, Arnaud
Envoyé : mardi 22 novembre 2022 17:18
À : user 
Objet : "Authentication failed" in "ConnectionState" when enabling internal SSL 
on Yarn with self signed certificate

Hello,
I use Flink 1.11.2 in Yarn cluster mode.
I’ve followed the instructions listed here 
(https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/security/security-ssl/
 

   ) to turn on internal SSL:


$ keytool -genkeypair \

  -alias flink.internal \

  -keystore internal.keystore \

  -dname "CN=flink.internal" \

  -storepass internal_store_password \

  -keyalg RSA \

  -keysize 4096 \

  -storetype PKCS12



security.ssl.internal.enabled: true

security.ssl.internal.keystore: /path/to/flink/conf/internal.keystore

security.ssl.internal.truststore: /path/to/flink/conf/internal.keystore

security.ssl.internal.keystore-password: internal_store_password

security.ssl.internal.truststore-password: internal_store_password

security.ssl.internal.key-password: internal_store_password


I’ve shipped the keystore on every node, and get no error from keystore reading.
However the application fails to start (stuck in initializing step), with the 
only error log in Yarn containers :
15:49:46.397 [main-EventThread] ERROR 
org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState - 
Authentication failed


Could you please explain me what this “zookeeper” curator connection does and 
why it no longer works when enabling internal SSL ?



Best regards,

Arnaud







L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company 
that sent this message cannot therefore be held liable for its content nor 
attachments. Any unauthorized use or dissemination is prohibited. If you are 
not the intended recipient of this message, then please delete it and notify 
the sender.


"Authentication failed" in "ConnectionState" when enabling internal SSL on Yarn with self signed certificate

2022-11-22 Thread LINZ, Arnaud
Hello,
I use Flink 1.11.2 in Yarn cluster mode.
I’ve followed the instructions listed here 
(https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/security/security-ssl/
 

   ) to turn on internal SSL:


$ keytool -genkeypair \

  -alias flink.internal \

  -keystore internal.keystore \

  -dname "CN=flink.internal" \

  -storepass internal_store_password \

  -keyalg RSA \

  -keysize 4096 \

  -storetype PKCS12



security.ssl.internal.enabled: true

security.ssl.internal.keystore: /path/to/flink/conf/internal.keystore

security.ssl.internal.truststore: /path/to/flink/conf/internal.keystore

security.ssl.internal.keystore-password: internal_store_password

security.ssl.internal.truststore-password: internal_store_password

security.ssl.internal.key-password: internal_store_password


I’ve shipped the keystore on every node, and get no error from keystore reading.
However the application fails to start (stuck in initializing step), with the 
only error log in Yarn containers :
15:49:46.397 [main-EventThread] ERROR 
org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState - 
Authentication failed


Could you please explain me what this “zookeeper” curator connection does and 
why it no longer works when enabling internal SSL ?



Best regards,

Arnaud







L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company 
that sent this message cannot therefore be held liable for its content nor 
attachments. Any unauthorized use or dissemination is prohibited. If you are 
not the intended recipient of this message, then please delete it and notify 
the sender.


Re: Flink Operator in an off-line k8s enviroment

2022-11-22 Thread Geng Biao
Hi Mark,

I guess you have to create your own local image registry service which your k8s 
cluster can connect to and upload the image of flink k8s operator to the 
service. After that, you can run something like `helm install 
flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator --set 
image.repository= ` to tell the k8s to use 
your local image.

Best,
Biao Geng

From: Mark Lee 
Date: Tuesday, November 22, 2022 at 9:01 PM
To: user@flink.apache.org 
Subject: Flink Operator in an off-line k8s enviroment
Hi all,
I installed flink operator following 
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/try-flink-kubernetes-operator/quick-start/.
helm repo add flink-operator-repo 
https://downloads.apache.org/flink/flink-kubernetes-operator-1.2.0/
helm install flink-kubernetes-operator 
flink-operator-repo/flink-kubernetes-operator

I execute above commands in a helm client(can touch internet) collect a k8s 
environment which cann’t connect internet.

  The flink operator is installed correctly but I got such errors for my k8s 
cluster can’t connect internet.
What steps can I do to run flink operator correctly in my off-line k8s cluster?
Should I run a local helm repo to replace the image 
“ghcr.io/apache/flink-kubernetes-operator:95128bf” to a local image?

   Thank you.

[root@localhost ~]# kubectl  get pods
NAMEREADY   STATUS RESTARTS 
  AGE
flink-kubernetes-operator-7797c7bd7-tpbqf   0/1 ImagePullBackOff   0
  124m

[root@localhost ~]# kubectl  describe pod 
flink-kubernetes-operator-7797c7bd7-tpbqf | grep Image -C 5
Normal   AddedInterface  124mmultus Add eth0 
[10.128.6.212/14] from kube-ovn
  Warning  Failed  119m (x4 over 123m) kubeletError: 
ErrImagePull
  Warning  Failed  118m (x7 over 123m) kubeletError: 
ImagePullBackOff
  Normal   Pulling 34m (x19 over 124m) kubeletPulling 
image "ghcr.io/apache/flink-kubernetes-operator:95128bf"
  Warning  Failed  8m53s (x23 over 123m)   kubeletFailed to 
pull image "ghcr.io/apache/flink-kubernetes-operator:95128bf": rpc error: code 
= Unknown desc = pinging container registry ghcr.io: Get "https://ghcr.io/v2/": 
dial tcp 20.205.243.164:443: i/o timeout
  Normal   BackOff 4m20s (x424 over 123m)  kubeletBack-off 
pulling image "ghcr.io/apache/flink-kubernetes-operator:95128bf"



Re: Any way to improve list state get performance

2022-11-22 Thread Xingcan Cui
Hi Tao,

I think you just need an extra `isEmpty` VARIABLE and maintain it properly
(e.g., when restoring the job, check if the list state is empty or not).

Also, I remembered that the list state for rocksdb is not as performant as
the map state when the state is large. Sometimes you could use a map state
with some extra value states to simulate it.

Best,
Xingcan

On Mon, Nov 21, 2022 at 9:20 PM tao xiao  wrote:

> any suggestion is highly appreciated
>
> On Tue, Nov 15, 2022 at 8:50 PM tao xiao  wrote:
>
>> Hi team,
>>
>> I have a Flink job that joins two streams, let's say A and B streams,
>> followed by a key process function. In the key process function the job
>> inserts elements from B stream to a list state if element from A stream
>> hasn't arrived yet. I am wondering if any way to skip the liststat.get() to
>> check if there are elements in the list state when A stream arrives to
>> reduce the call to underlying state (RocksDB)
>>
>> Here is the code snippet
>>
>> keyfunction {
>>
>> process(in, ctx, collector) {
>> if (in is A stream)
>> // anyway to check if list state is empty so that we dont need to call
>> get()?
>> for (b : liststate.get()) {
>> .
>> }
>>
>> if (in is B stream)
>> liststate.add(in)
>>
>>
>> --
>> Regards,
>> Tao
>>
>
>
> --
> Regards,
> Tao
>


Flink Operator in an off-line k8s enviroment

2022-11-22 Thread Mark Lee
Hi all,

I installed flink operator following
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/
try-flink-kubernetes-operator/quick-start/.

helm repo add flink-operator-repo
https://downloads.apache.org/flink/flink-kubernetes-operator-1.2.0/

helm install flink-kubernetes-operator
flink-operator-repo/flink-kubernetes-operator

 

I execute above commands in a helm client(can touch internet) collect a k8s
environment which cann't connect internet.

 

  The flink operator is installed correctly but I got such errors for my k8s
cluster can't connect internet.

What steps can I do to run flink operator correctly in my off-line k8s
cluster?

Should I run a local helm repo to replace the image
"ghcr.io/apache/flink-kubernetes-operator:95128bf" to a local image?

  

   Thank you.

 

[root@localhost ~]# kubectl  get pods

NAMEREADY   STATUS
RESTARTS   AGE

flink-kubernetes-operator-7797c7bd7-tpbqf   0/1 ImagePullBackOff   0
124m

 

[root@localhost ~]# kubectl  describe pod
flink-kubernetes-operator-7797c7bd7-tpbqf | grep Image -C 5

Normal   AddedInterface  124mmultus Add eth0
[10.128.6.212/14] from kube-ovn

  Warning  Failed  119m (x4 over 123m) kubeletError:
ErrImagePull

  Warning  Failed  118m (x7 over 123m) kubeletError:
ImagePullBackOff

  Normal   Pulling 34m (x19 over 124m) kubelet
Pulling image "ghcr.io/apache/flink-kubernetes-operator:95128bf"

  Warning  Failed  8m53s (x23 over 123m)   kubeletFailed
to pull image "ghcr.io/apache/flink-kubernetes-operator:95128bf": rpc error:
code = Unknown desc = pinging container registry ghcr.io: Get
"https://ghcr.io/v2/": dial tcp 20.205.243.164:443: i/o timeout

  Normal   BackOff 4m20s (x424 over 123m)  kubelet
Back-off pulling image "ghcr.io/apache/flink-kubernetes-operator:95128bf"

 



Re: WIndowing in batch mode

2022-11-22 Thread Martijn Visser
Hi,

This question has been posted on Stackoverflow, Slack and now the mailing
list. Please don't spam the different channels for getting support on your
question. This is a voluntary service, run by community members.

Best regards,

Martijn Visser

On Tue, Nov 22, 2022 at 7:27 AM Suparn Lele (sulele) 
wrote:

> Hi,
> We have one table A in database. We are loading that table into flink
> using Flink SQL JdbcCatalog.
>
> Here is how we are loading the data
> val catalog = new JdbcCatalog("my_catalog", "database_name", username,
> password, url)
> streamTableEnvironment.registerCatalog("my_catalog", catalog)
> streamTableEnvironment.useCatalog("my_catalog")
>
> val query = "select timestamp, count from A"
> val sourceTable = streamTableEnvironment.sqlQuery(query)
> streamTableEnvironment.createTemporaryView("innerTable", sourceTable)
>
> val aggregationQuery =
> select window_end, sum(count)
> from TABLE(TUMBLE(TABLE innerTable, DESCRIPTOR(timestamp), INTERVAL '10'
> minutes))
> group by window_end
>
> It throws following error
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> SQL validation failed. The window function TUMBLE(TABLE table_name,
> DESCRIPTOR(timecol), datetime interval[, datetime interval]) requires the
> timecol is a time attribute type, but is TIMESTAMP(6).
>
> In short we want to apply windowing aggregation on an already existing
> column. How can we do that
> *Note - This is a batch processing*
>
> Thanks,
> Suparn
>


Re: [Flink K8s operator] HA metadata not available to restore from last state

2022-11-22 Thread Dongwon Kim
Hi Gyula :-)

Okay, we're gonna upgrade to 1.15 and see what happens.

Thanks a lot for the quick feedback and the detailed explanation!

Best,

Dongwon


On Tue, Nov 22, 2022 at 5:57 PM Gyula Fóra  wrote:

> Hi Dongwon!
>
> This error mostly occurs when using Flink 1.14 and the Flink cluster goes
> into a terminal state. If a Flink job is FAILED/FINISHED (such as it
> exhausted the retry strategy), in Flink 1.14 the cluster shuts itself down
> and removes the HA metadata.
>
> In these cases the operator will only see that the cluster completely
> disappeared and there is no HA metadata and it will throw the error you
> mentioned. It does not know what happened and doesn't have any way to
> recover checkpoint information.
>
> This is fixed in Flink 1.15 where even after terminal FAILED/FINISHED
> states, the jobmanager would not shut down. This allows the operator to
> observe this terminal state and actually recover the job even if the HA
> metadata was removed.
>
> To summarize, this is mostly caused by Flink 1.14 behaviour that the
> operator cannot control. Upgrading to 1.15 allows much more robustness and
> should eliminate most of these cases.
>
> Cheers,
> Gyula
>
> On Tue, Nov 22, 2022 at 9:43 AM Dongwon Kim  wrote:
>
>> Hi,
>>
>> While using a last-state upgrade mode on flink-k8s-operator-1.2.0 and
>> flink-1.14.3, we're occasionally facing the following error:
>>
>> Status:
>>>   Cluster Info:
>>> Flink - Revision: 98997ea @ 2022-01-08T23:23:54+01:00
>>> Flink - Version:  1.14.3
>>>   Error:  HA metadata not available to restore
>>> from last state. It is possible that the job has finished or terminally
>>> failed, or the configmaps have been deleted. Manual restore required.
>>>   Job Manager Deployment Status:  ERROR
>>>   Job Status:
>>> Job Id:e8dd04ea4b03f1817a4a4b9e5282f433
>>> Job Name:  flinktest
>>> Savepoint Info:
>>>   Last Periodic Savepoint Timestamp:  0
>>>   Savepoint History:
>>>   Trigger Id:
>>>   Trigger Timestamp:  0
>>>   Trigger Type:   UNKNOWN
>>> Start Time:   1668660381400
>>> State:RECONCILING
>>> Update Time:  1668994910151
>>>   Reconciliation Status:
>>> Last Reconciled Spec:  ...
>>> Reconciliation Timestamp:  1668660371853
>>> State: DEPLOYED
>>>   Task Manager:
>>> Label Selector:  component=taskmanager,app=flinktest
>>> Replicas:1
>>> Events:
>>>   Type ReasonAge From
>>> Message
>>>    --
>>> ---
>>>   Normal   JobStatusChanged  30m Job
>>> Job status changed from RUNNING to RESTARTING
>>>   Normal   JobStatusChanged  29m Job
>>> Job status changed from RESTARTING to CREATED
>>>   Normal   JobStatusChanged  28m Job
>>> Job status changed from CREATED to RESTARTING
>>>   Warning  Missing   26m JobManagerDeployment
>>> Missing JobManager deployment
>>>   Warning  RestoreFailed 9s (x106 over 26m)  JobManagerDeployment
>>> HA metadata not available to restore from last state. It is possible that
>>> the job has finished or terminally failed, or the configmaps have been
>>> deleted. Manual restore required.
>>>   Normal   Submit9s (x106 over 26m)  JobManagerDeployment
>>> Starting deployment
>>
>>
>> We're happy with the last state mode most of the time, but we face it
>> occasionally.
>>
>> We found that it's not easy to reproduce the problem; we tried to kill
>> JMs and TMs and even shutdown the nodes on which JMs and TMs are running.
>>
>> We also checked that the file size is not zero.
>>
>> Thanks,
>>
>> Dongwon
>>
>>
>>


Re: [Flink K8s operator] HA metadata not available to restore from last state

2022-11-22 Thread Gyula Fóra
Hi Dongwon!

This error mostly occurs when using Flink 1.14 and the Flink cluster goes
into a terminal state. If a Flink job is FAILED/FINISHED (such as it
exhausted the retry strategy), in Flink 1.14 the cluster shuts itself down
and removes the HA metadata.

In these cases the operator will only see that the cluster completely
disappeared and there is no HA metadata and it will throw the error you
mentioned. It does not know what happened and doesn't have any way to
recover checkpoint information.

This is fixed in Flink 1.15 where even after terminal FAILED/FINISHED
states, the jobmanager would not shut down. This allows the operator to
observe this terminal state and actually recover the job even if the HA
metadata was removed.

To summarize, this is mostly caused by Flink 1.14 behaviour that the
operator cannot control. Upgrading to 1.15 allows much more robustness and
should eliminate most of these cases.

Cheers,
Gyula

On Tue, Nov 22, 2022 at 9:43 AM Dongwon Kim  wrote:

> Hi,
>
> While using a last-state upgrade mode on flink-k8s-operator-1.2.0 and
> flink-1.14.3, we're occasionally facing the following error:
>
> Status:
>>   Cluster Info:
>> Flink - Revision: 98997ea @ 2022-01-08T23:23:54+01:00
>> Flink - Version:  1.14.3
>>   Error:  HA metadata not available to restore
>> from last state. It is possible that the job has finished or terminally
>> failed, or the configmaps have been deleted. Manual restore required.
>>   Job Manager Deployment Status:  ERROR
>>   Job Status:
>> Job Id:e8dd04ea4b03f1817a4a4b9e5282f433
>> Job Name:  flinktest
>> Savepoint Info:
>>   Last Periodic Savepoint Timestamp:  0
>>   Savepoint History:
>>   Trigger Id:
>>   Trigger Timestamp:  0
>>   Trigger Type:   UNKNOWN
>> Start Time:   1668660381400
>> State:RECONCILING
>> Update Time:  1668994910151
>>   Reconciliation Status:
>> Last Reconciled Spec:  ...
>> Reconciliation Timestamp:  1668660371853
>> State: DEPLOYED
>>   Task Manager:
>> Label Selector:  component=taskmanager,app=flinktest
>> Replicas:1
>> Events:
>>   Type ReasonAge From
>> Message
>>    --
>> ---
>>   Normal   JobStatusChanged  30m Job
>> Job status changed from RUNNING to RESTARTING
>>   Normal   JobStatusChanged  29m Job
>> Job status changed from RESTARTING to CREATED
>>   Normal   JobStatusChanged  28m Job
>> Job status changed from CREATED to RESTARTING
>>   Warning  Missing   26m JobManagerDeployment
>> Missing JobManager deployment
>>   Warning  RestoreFailed 9s (x106 over 26m)  JobManagerDeployment
>> HA metadata not available to restore from last state. It is possible that
>> the job has finished or terminally failed, or the configmaps have been
>> deleted. Manual restore required.
>>   Normal   Submit9s (x106 over 26m)  JobManagerDeployment
>> Starting deployment
>
>
> We're happy with the last state mode most of the time, but we face it
> occasionally.
>
> We found that it's not easy to reproduce the problem; we tried to kill JMs
> and TMs and even shutdown the nodes on which JMs and TMs are running.
>
> We also checked that the file size is not zero.
>
> Thanks,
>
> Dongwon
>
>
>


[Flink K8s operator] HA metadata not available to restore from last state

2022-11-22 Thread Dongwon Kim
Hi,

While using a last-state upgrade mode on flink-k8s-operator-1.2.0 and
flink-1.14.3, we're occasionally facing the following error:

Status:
>   Cluster Info:
> Flink - Revision: 98997ea @ 2022-01-08T23:23:54+01:00
> Flink - Version:  1.14.3
>   Error:  HA metadata not available to restore
> from last state. It is possible that the job has finished or terminally
> failed, or the configmaps have been deleted. Manual restore required.
>   Job Manager Deployment Status:  ERROR
>   Job Status:
> Job Id:e8dd04ea4b03f1817a4a4b9e5282f433
> Job Name:  flinktest
> Savepoint Info:
>   Last Periodic Savepoint Timestamp:  0
>   Savepoint History:
>   Trigger Id:
>   Trigger Timestamp:  0
>   Trigger Type:   UNKNOWN
> Start Time:   1668660381400
> State:RECONCILING
> Update Time:  1668994910151
>   Reconciliation Status:
> Last Reconciled Spec:  ...
> Reconciliation Timestamp:  1668660371853
> State: DEPLOYED
>   Task Manager:
> Label Selector:  component=taskmanager,app=flinktest
> Replicas:1
> Events:
>   Type ReasonAge From
> Message
>    --
> ---
>   Normal   JobStatusChanged  30m Job
> Job status changed from RUNNING to RESTARTING
>   Normal   JobStatusChanged  29m Job
> Job status changed from RESTARTING to CREATED
>   Normal   JobStatusChanged  28m Job
> Job status changed from CREATED to RESTARTING
>   Warning  Missing   26m JobManagerDeployment
> Missing JobManager deployment
>   Warning  RestoreFailed 9s (x106 over 26m)  JobManagerDeployment  HA
> metadata not available to restore from last state. It is possible that the
> job has finished or terminally failed, or the configmaps have been
> deleted. Manual restore required.
>   Normal   Submit9s (x106 over 26m)  JobManagerDeployment
> Starting deployment


We're happy with the last state mode most of the time, but we face it
occasionally.

We found that it's not easy to reproduce the problem; we tried to kill JMs
and TMs and even shutdown the nodes on which JMs and TMs are running.

We also checked that the file size is not zero.

Thanks,

Dongwon


debezium-json数据timestamp类型时区问题

2022-11-22 Thread Kyle Zhang
Hi all,
我们有一个场景,是把oracle数据通过debezium-oracle-cdc插件抽到kafka中,后面接flink
sql分析,现在遇到一个时区的问题,比如数据库中有一个timestamp类型的字段,值是‘2022-11-17
16:16:44’,但是debezium处理的时候用了int64保存,还不带时区信息,变成1668701804000,导致flink
sql中用FROM_UNIXTIME处理后变成‘2022-11-18 00:16:44
’,差了8小时,需要手工再减8h。请问有没有一种统一的方式处理这种情况?

Best