Re:Re: Application upgrade rollbacks failed in Flink Kubernetes Operator

2023-02-19 Thread hjw
Hi
I declare a error pod-template without a container named flink-main-container 
to test rollback feature.
Please pay attention to the Pod-template in the old and new specs.


Last stable  spec:
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  image: flink:1.16
  flinkVersion: v1_16
  flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
kubernetes.operator.deployment.rollback.enabled: true
state.savepoints.dir: s3://flink-data/savepoints
state.checkpoints.dir: s3://flink-data/checkpoints
high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: s3://flink-data/ha
  serviceAccount: flink
  podTemplate:
spec:
  containers:
- name: flink-main-container  
  env:
  - name: TZ
value: Asia/Shanghai
  jobManager:
resource:
  memory: "2048m"
  cpu: 1
  taskManager:
resource:
  memory: "2048m"
  cpu: 1
  job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 2
upgradeMode: stateless


new Spec:
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  image: flink:1.16
  flinkVersion: v1_16
  flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
kubernetes.operator.deployment.rollback.enabled: true
state.savepoints.dir: s3://flink-data/savepoints
state.checkpoints.dir: s3://flink-data/checkpoints
high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: s3://flink-data/ha
  serviceAccount: flink
  podTemplate:
spec:
  containers:
-   env:
  - name: TZ
value: Asia/Shanghai
  jobManager:
resource:
  memory: "2048m"
  cpu: 1
  taskManager:
resource:
  memory: "2048m"
  cpu: 1
  job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 2
upgradeMode: stateless















--

Best,
Hjw




At 2023-02-20 08:48:46, "Shammon FY"  wrote:

Hi


I cannot see the difference between the two configurations, but the error info 
`Failure executing: POST at: 
https://*/k8s/clusters/c-fwkxh/apis/apps/v1/namespaces/test-flink/deployments. 
Message: Deployment.apps "basic-example" is invalid` is strange. Maybe you can 
check whether the configuration of k8s has changed?



Best,
Shammon




On Mon, Feb 20, 2023 at 12:56 AM hjw  wrote:

I make a test on the Application upgrade rollback feature, but this function 
fails.The Flink application mode job cannot roll back to  last stable spec.
As shown in the follow example, I declare a error pod-template without a 
container named flink-main-container to test rollback feature.
However, only the error of deploying the flink application job failed without 
rollback.


Error:
org.apache.flink.client.deployment.ClusterDeploymentException: Could not create 
Kubernetes cluster "basic-example".
 at 
org.apache.flink.kubernetes.KubernetesClusterDescriptor.deployClusterInternal(KubernetesClusterDescriptor.java:292)
Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure 
executing: POST at: 
https://*/k8s/clusters/c-fwkxh/apis/apps/v1/namespaces/test-flink/deployments. 
Message: Deployment.apps "basic-example" is invalid: 
[spec.template.spec.containers[0].name: Required value, 
spec.template.spec.containers[0].image: Required value]. Received status: 
Status(apiVersion=v1, code=422, 
details=StatusDetails(causes=[StatusCause(field=spec.template.spec.containers[0].name,
 message=Required value, reason=FieldValueRequired, additionalProperties={}), 
StatusCause(field=spec.template.spec.containers[0].image, message=Required 
value, reason=FieldValueRequired, additionalProperties={})], group=apps, 
kind=Deployment, name=flink-bdra-sql-application-job-s3p, 
retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, 
message=Deployment.apps "flink-bdra-sql-application-job-s3p" is invalid: 
[spec.template.spec.containers[0].name: Required value, 
spec.template.spec.containers[0].image: Required value], 
metadata=ListMeta(_continue=null, remainingItemCount=null, 
resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, 
status=Failure, additionalProperties={}).
 at 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:673)
 at 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.assertResponseCode(OperationSupport.java:612)
 at 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:560)


Env:
Flink version:Flink 1.16
Flink Kubernetes Operator:1.3.1


Last stable  spec:
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  image: flink:1.16
  flinkVersion: v1_16
  flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"

Re: Re: flink canal json格式忽略不识别的type

2023-02-19 Thread Weihua Hu
Hi,
可以尝试使用: json.ignore-parse-errors[1] 来忽略解析的报错,需要注意这个参数会忽略所有的解析错误

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/#json-ignore-parse-errors

Best,
Weihua


On Mon, Feb 20, 2023 at 10:14 AM casel.chen  wrote:

> 日志中就是报这个 "type":"INIT_DDL" 不能识别呀,然后作业就退出了
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-02-20 09:58:56,"Shengkai Fang"  写道:
> >Hi. 能同时分享下复现这个 case 的sql 以及相关的报错栈吗?
> >
> >Best,
> >Shengkai
> >
> >casel.chen  于2023年2月9日周四 12:03写道:
> >
> >> 不同云厂商的数据同步工具对于全量+增量同步mysql数据到kafka canal json格式时行为不一致
> >> 有的厂商会将DDL语句同步到topic导致下游flink sql作业不能识别抛错,建议flink canal
> >> json格式解析时直接忽略不识别的type,例如
> >> 例1:
> >>
> {"jobId":"76d140e3-2ef7-40c2-a795-af35a0fe1d61","shardId":null,"identifier":null,"eventId":"","mysqlType":null,"id":0,"es":1675908021700,"ts":1675908021700,"database":"demo","table":"oms_parcels","type":"INIT_DDL","isDdl":true,"sql":"CREATE
> >> TABLE `oms_parcels` (  `id` varchar(255) NOT NULL,  `createdby`
> >> varchar(255) DEFAULT NULL,  `createdat` timestamp NOT NULL DEFAULT
> >> CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,  `updatedat` timestamp
> NOT
> >> NULL DEFAULT '-00-00 00:00:00',  `updatedby` varchar(255) DEFAULT
> >> NULL,  `account` varchar(255) DEFAULT NULL,  `batch` varchar(255)
> DEFAULT
> >> NULL,  `client` varchar(255) DEFAULT NULL,  `command` varchar(255)
> DEFAULT
> >> NULL,  `container` varchar(255) DEFAULT NULL,  `items` mediumtext,
> >> `trackingnumber` varchar(255) NOT NULL,  `transporter` varchar(255)
> DEFAULT
> >> NULL,  `weight` decimal(19,2) NOT NULL,  `zipcode` varchar(255) DEFAULT
> >> NULL,  `ld3` varchar(255) DEFAULT NULL,  `destination_code` varchar(255)
> >> DEFAULT NULL,  PRIMARY KEY (`id`,`trackingnumber`)) ENGINE=InnoDB
> DEFAULT
> >> CHARSET=utf8mb4","sqlType":null,"data":null,"old":null,"pkNames":null}
> >>
> >>
> >> 例2:
> >> {
> >> "action":"ALTER",
> >> "before":[],
> >> "bid":0,
> >> "data":[],
> >> "db":"db_test",
> >> "dbValType":{
> >> "col1":"varchar(22)",
> >> "col2":"varchar(22)",
> >> "col_pk":"varchar(22)"
> >> },
> >> "ddl":true,
> >> "entryType":"ROWDATA",
> >> "execTs":1669789188000,
> >> "jdbcType":{
> >> "col1":12,
> >> "col2":12,
> >> "col_pk":12
> >> },
> >> "pks":[],
> >> "schema":"db_test",
> >> "sendTs":1669789189533,
> >> "sql":"alter table table_test add col2 varchar(22) null",
> >> "table":"table_test",
> >> "tableChanges":{
> >> "table":{
> >> "columns":[
> >> {
> >> "jdbcType":12, // jdbc 类型。
> >> "name":"col1",// 字段名称。
> >> "position":0,  // 字段的顺序。
> >> "typeExpression":"varchar(22)", // 类型描述。
> >> "typeName":"varchar" // 类型名称。
> >> },
> >> {
> >> "jdbcType":12,
> >> "name":"col2",
> >> "position":1,
> >> "typeExpression":"varchar(22)",
> >> "typeName":"varchar"
> >> },
> >> {
> >> "jdbcType":12,
> >> "name":"col_pk",
> >> "position":2,
> >> "typeExpression":"varchar(22)",
> >> "typeName":"varchar"
> >> }
> >> ],
> >> "primaryKeyColumnNames":["col_pk"] // 主键名列表。
> >> },
> >> "type":"ALTER"
> >> }
> >> }
>


Re: Re: Flink程序内存Dump不了

2023-02-19 Thread Weihua Hu
Hi,

可以把心跳超时(heartbeat.timeout)[1]也调大再尝试 dump 内存。


[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#advanced-fault-tolerance-options

Best,
Weihua


On Mon, Feb 20, 2023 at 1:58 PM lxk  wrote:

> 我尝试调整了参数,具体数值如下
>
>
> akka.ask.timeout: 900s
>
>
>
> 但还是报同样的错
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-02-17 17:32:51,"Guo Thompson"  写道:
> >可能是jm 和 tm之间的心跳时间太短了, dump的过程会stop the world,tm就不响应jm的heartbeat了;
> >
> >lxk  于2023年2月14日周二 14:32写道:
> >
> >> Flink version:1.16
> >> java version: jdk1.8.0_251
> >> 问题:最近上线的Flink程序,频繁young
> >>
> gc,几秒一次,在调整了新生代大小之后,还是没有解决,目前整个jvm堆大小是3.57g。因此想通过程序内存情况来分析哪里问题有问题,我们通过yarn上的applicationId,使用ps
> >> -ef|grep 1666758697316_2639108,找到对应的pid,最后执行 jmap -dump:format
> >> b,file=user.dump 26326
> >>
> 命令生成dump文件,但我们测试了很多个程序,只要一开始dump,都会对线上程序产生影响,程序的container会莫名的死掉,然后程序重启。具体执行命令后的报错如下:
> >> sun.jvm.hotspot.debugger.UnmappedAddressException: 7f74efa5d410
> >> https://pic.imgdb.cn/item/63eb2a46f144a010071899ba.png
> >> 不知道大家有没有遇见这个问题,是我们使用的姿势不对,还是目前使用的版本有什么问题,希望大家能够给出一些建议和看法。
>


Re:Re: Flink程序内存Dump不了

2023-02-19 Thread lxk
我尝试调整了参数,具体数值如下


akka.ask.timeout: 900s



但还是报同样的错











在 2023-02-17 17:32:51,"Guo Thompson"  写道:
>可能是jm 和 tm之间的心跳时间太短了, dump的过程会stop the world,tm就不响应jm的heartbeat了;
>
>lxk  于2023年2月14日周二 14:32写道:
>
>> Flink version:1.16
>> java version: jdk1.8.0_251
>> 问题:最近上线的Flink程序,频繁young
>> gc,几秒一次,在调整了新生代大小之后,还是没有解决,目前整个jvm堆大小是3.57g。因此想通过程序内存情况来分析哪里问题有问题,我们通过yarn上的applicationId,使用ps
>> -ef|grep 1666758697316_2639108,找到对应的pid,最后执行 jmap -dump:format
>> b,file=user.dump 26326
>> 命令生成dump文件,但我们测试了很多个程序,只要一开始dump,都会对线上程序产生影响,程序的container会莫名的死掉,然后程序重启。具体执行命令后的报错如下:
>> sun.jvm.hotspot.debugger.UnmappedAddressException: 7f74efa5d410
>> https://pic.imgdb.cn/item/63eb2a46f144a010071899ba.png
>> 不知道大家有没有遇见这个问题,是我们使用的姿势不对,还是目前使用的版本有什么问题,希望大家能够给出一些建议和看法。


Re:Re: Flink1.16写入kafka 报错:Cluster authorization failed.

2023-02-19 Thread lxk
我们改了权限确实解决了这个问题。但我现在想了解的是为什么Flink在1.16的时候需要创建producerID的权限,以及这个权限是不是针对新老Kafka 
API都需要的。针对新的Kafka 
API在精准一次的时候需要管理ProducerID在源码中有体现,但是老的API没看见相关的,只使用了一个ProducerID也需要由Flink内部自己管理吗?

















在 2023-02-20 08:45:18,"Shammon FY"  写道:
>Hi
>
>从`Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException:
>Cluster authorization failed.`这个错误看起来像是权限错误,可以你检查下是否有权限问题
>
>Best,
>Shammon
>
>On Fri, Feb 17, 2023 at 6:29 PM lxk  wrote:
>
>> Flink版本:1.16
>> 目前公司针对Flink版本进行升级,从Flink1.14升级到Flink1.16,代码没做任何调整,但是在写入kafka的时候报错:
>> 2023-02-17 15:03:19
>> org.apache.kafka.common.KafkaException: Cannot execute transactional
>> method because we are in an error state
>> at
>> org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1125)
>> at
>> org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartition(TransactionManager.java:442)
>> at
>> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:998)
>> at
>> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:912)
>> at
>> org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:197)
>> at
>> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:160)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
>> at org.apache.flink.streaming.runtime.io
>> .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>> at org.apache.flink.streaming.runtime.io
>> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>> at org.apache.flink.streaming.runtime.io
>> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
>> at
>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>> at
>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException:
>> Cluster authorization failed.
>>
>>
>> 在了解了相关源码之后,知道KafkaSink这种新的kafka
>> api在实现精准一次的时候,分为了两个阶段,一个是writer,一个是commiter,其中在writer中维护了一个producerpool,因此需要权限创建producer,这块能理解。
>> 但是在使用老的kafka
>> api,即FlinkKafkaProducer时,只需要维护一个Producer。不明白为啥在使用老的api的时候还是会报同样的错误。
>>
>>
>> 或者我说的原因不是这个报错的根本原因,希望大家能帮忙解答下


Re: Metrics or runtimeContext in global commit

2023-02-19 Thread yuxia
It seems no other way to get the runtimeContext in a global commit. For me, I 
think it's reasoable to propose the fetature. 
I added flink-devs channel for more attention/discussion in flink devs.

Best regards,
Yuxia

- 原始邮件 -
发件人: "Tobias Fröhlich" 
收件人: "User" 
发送时间: 星期二, 2023年 2 月 14日 下午 9:26:34
主题: Metrics or runtimeContext in global commit

Dear flink team,

I would like to use metrics (which are then written to an influxdb) in the 
method 
org.apache.flink.api.connector.sink2.Committer::commit(Collection>
 committables) that I use for global commit. I use the helper method 
StandardSinkTopologies.addGlobalCommitter(...) to define the post-commit 
topology.

The problem is: When I implement the interface Committer, I cannot get the 
runtimeContext that I need for the metrics, because it is not an Operator.

The only solution I found was by cloning the flink source code and amending it 
in the following way:

 1. declaring an abstract class "CommitterWithRuntimeContext" that 
implements Committer and has:
- an additional field for the runtimeContext
- setter and getter for this field
- an abstract method "void init()"

 2. in the setup() method of GlobalCommitterOperator (which is an operator and 
thus has a runtimeContext) adding the following lines at the end:

if (committer instanceof  CommitterWithRuntimeContext) {
((CommitterWithRuntimeContext) 
committer).setRuntimeContext(getRuntimeContext());
((CommitterWithRuntimeContext) committer).init();
}

I can then implement the method CommitterWithRuntimeContext::init() in our code 
and call the method CommitterWithRuntimeContext::getRuntimeContext() when I 
need the runtimeContext.

Is there another way to get the runtimeContext in a global commit? If not, is 
it justified to propose a feature request for a future release, where the 
global commit method can be implemented in a way that the user has access to 
the runtimeContext?

Best regards and thanks in advance
Tobias Fröhlich


Re:Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

2023-02-19 Thread casel.chen
Flink SQL作业示意如下:


create table user_source_table (
  id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
  name STRING,
  dept_id BIGINT NOT NULL,
  proctime AS PROCTIME()
) with (
 'connector' = 'kafka', 
 'format' = 'canal-json',
 ...
);


create table department_dim_table (
   id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
   name STRING
) with (
 'connector' = 'jdbc',
 ...
);


create table user_rich_sink_table (
  id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
  name STRING,
  dept_name STRING
) with (
 'connector' = 'jdbc'
 ...
);


insert into user_rich_sink_table 
select id, name, d.name as dept_name 
from user_source_table u
  left join department_dim_table for system_time as of u.proctime as d 
  on u.dept_id = d.id;


用户id是主键,按你所说需要在最后insert into语句之前自己显示加group by用户id再insert?
现在是发现当作业并行度大于1时,相同用户id的记录会落到不同TaskManager上,造成数据更新状态不一致。





在 2023-02-20 08:41:20,"Shammon FY"  写道:
>Hi
>
>如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作
>
>Best,
>Shammon
>
>
>On Sun, Feb 19, 2023 at 1:43 PM RS  wrote:
>
>> Hi,
>> connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
>> 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert into
>>
>>
>> Thanks
>>
>>
>>
>> 在 2023-02-17 15:56:51,"casel.chen"  写道:
>> >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
>> join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
>> >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink
>> Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
>> >
>> >
>> >请问:
>> >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
>> >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
>> >我理解flink
>> sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
>> >
>>


Re:Re: flink canal json格式忽略不识别的type

2023-02-19 Thread casel.chen
日志中就是报这个 "type":"INIT_DDL" 不能识别呀,然后作业就退出了

















在 2023-02-20 09:58:56,"Shengkai Fang"  写道:
>Hi. 能同时分享下复现这个 case 的sql 以及相关的报错栈吗?
>
>Best,
>Shengkai
>
>casel.chen  于2023年2月9日周四 12:03写道:
>
>> 不同云厂商的数据同步工具对于全量+增量同步mysql数据到kafka canal json格式时行为不一致
>> 有的厂商会将DDL语句同步到topic导致下游flink sql作业不能识别抛错,建议flink canal
>> json格式解析时直接忽略不识别的type,例如
>> 例1:
>> {"jobId":"76d140e3-2ef7-40c2-a795-af35a0fe1d61","shardId":null,"identifier":null,"eventId":"","mysqlType":null,"id":0,"es":1675908021700,"ts":1675908021700,"database":"demo","table":"oms_parcels","type":"INIT_DDL","isDdl":true,"sql":"CREATE
>> TABLE `oms_parcels` (  `id` varchar(255) NOT NULL,  `createdby`
>> varchar(255) DEFAULT NULL,  `createdat` timestamp NOT NULL DEFAULT
>> CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,  `updatedat` timestamp NOT
>> NULL DEFAULT '-00-00 00:00:00',  `updatedby` varchar(255) DEFAULT
>> NULL,  `account` varchar(255) DEFAULT NULL,  `batch` varchar(255) DEFAULT
>> NULL,  `client` varchar(255) DEFAULT NULL,  `command` varchar(255) DEFAULT
>> NULL,  `container` varchar(255) DEFAULT NULL,  `items` mediumtext,
>> `trackingnumber` varchar(255) NOT NULL,  `transporter` varchar(255) DEFAULT
>> NULL,  `weight` decimal(19,2) NOT NULL,  `zipcode` varchar(255) DEFAULT
>> NULL,  `ld3` varchar(255) DEFAULT NULL,  `destination_code` varchar(255)
>> DEFAULT NULL,  PRIMARY KEY (`id`,`trackingnumber`)) ENGINE=InnoDB DEFAULT
>> CHARSET=utf8mb4","sqlType":null,"data":null,"old":null,"pkNames":null}
>>
>>
>> 例2:
>> {
>> "action":"ALTER",
>> "before":[],
>> "bid":0,
>> "data":[],
>> "db":"db_test",
>> "dbValType":{
>> "col1":"varchar(22)",
>> "col2":"varchar(22)",
>> "col_pk":"varchar(22)"
>> },
>> "ddl":true,
>> "entryType":"ROWDATA",
>> "execTs":1669789188000,
>> "jdbcType":{
>> "col1":12,
>> "col2":12,
>> "col_pk":12
>> },
>> "pks":[],
>> "schema":"db_test",
>> "sendTs":1669789189533,
>> "sql":"alter table table_test add col2 varchar(22) null",
>> "table":"table_test",
>> "tableChanges":{
>> "table":{
>> "columns":[
>> {
>> "jdbcType":12, // jdbc 类型。
>> "name":"col1",// 字段名称。
>> "position":0,  // 字段的顺序。
>> "typeExpression":"varchar(22)", // 类型描述。
>> "typeName":"varchar" // 类型名称。
>> },
>> {
>> "jdbcType":12,
>> "name":"col2",
>> "position":1,
>> "typeExpression":"varchar(22)",
>> "typeName":"varchar"
>> },
>> {
>> "jdbcType":12,
>> "name":"col_pk",
>> "position":2,
>> "typeExpression":"varchar(22)",
>> "typeName":"varchar"
>> }
>> ],
>> "primaryKeyColumnNames":["col_pk"] // 主键名列表。
>> },
>> "type":"ALTER"
>> }
>> }


Re:Re:[急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

2023-02-19 Thread casel.chen






你说的这个在写入之前进行shuffle(先执行一个group by主键)这个操作我认为应该是Flink框架层面的事情,不应该在作业层面显式添加。
Flink框架应该在执行sink的时候判断目标表是否有主键,如果有主键的话应该插入一个group by算子将相同主键的记录发到同一个TaskManager处理。
我听说 Flink新版本1.15还是1.16不记得了已经改进了这个问题,有谁知道吗?有相关issue或PR链接没?











在 2023-02-19 13:43:29,"RS"  写道:
>Hi,
>connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
>所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert into
>
>
>Thanks
>
>
>
>在 2023-02-17 15:56:51,"casel.chen"  写道:
>>作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner 
>>join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
>>测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink 
>>Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
>>
>>
>>请问:
>>flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
>>是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
>>我理解flink 
>>sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
>>


Re: Flink SQL support array transform function

2023-02-19 Thread yuxia
Hi, Xuekui. 
As said in the exception stack, may be you can try to provide hint for the 
function's parameters. 


class ArrayTransformFunction extends ScalarFunction { 

def eval ( @DataTypeHint("ARRAY") a: Array [Long], 
@DataTypeHint("RAW") function: Long => Long): Array [Long] = { 
a.map(e => function(e)) 
} 
} 
Hope it can help. 
For more detail, please refer to Flink doc[1] 

[1]: [ 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference
 | 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference
 ] 


Best regards, 
Yuxia 


发件人: "Xuekui"  
收件人: "fskmine" , "Caizhi Weng"  
抄送: "User"  
发送时间: 星期四, 2023年 2 月 16日 上午 10:54:05 
主题: Re: Flink SQL support array transform function 

Hi Caizhi, 

I've tried to write UDF to support this function, but I found I can't pass the 
function parameter to udf because the data type of function is not supported. 
An exception throws in SQL validation. 

My UDF code: 
class ArrayTransformFunction extends ScalarFunction { 

def eval (a: Array [Long], function: Long => Long): Array [Long] = { 
a.map(e => function(e)) 
} 
} 

Exception: 
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL 
validation failed. An error occurred in the type inference logic of function 
'transform'. 
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
 
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
 
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)
 
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77) 
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)
 
at SQLTest$.main(SQLTest.scala:44) 
at SQLTest.main(SQLTest.scala) 
Caused by: org.apache.flink.table.api.ValidationException: An error occurred in 
the type inference logic of function 'transform'. 
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163)
 
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146)
 
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
 
at java.util.Optional.flatMap(Optional.java:241) 
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98)
 
at 
org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67)
 
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1260)
 
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1275)
 
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1245)
 
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1009)
 
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)
 
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147)
 
... 6 more 
Caused by: org.apache.flink.table.api.ValidationException: Could not extract a 
valid type inference for function class 'udf.ArrayTransformFunction'. Please 
check for implementation mistakes and/or provide a corresponding hint. 
at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
 
at 
org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150)
 
at 
org.apache.flink.table.types.extraction.TypeInferenceExtractor.forScalarFunction(TypeInferenceExtractor.java:83)
 
at 
org.apache.flink.table.functions.ScalarFunction.getTypeInference(ScalarFunction.java:143)
 
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160)
 
... 17 more 
Caused by: org.apache.flink.table.api.ValidationException: Error in extracting 
a signature to output mapping. 
at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
 
at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:117)
 
at 
org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:161)
 
at 
org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148)
 
... 20 more 
Caused by: 

Re: flink canal json格式忽略不识别的type

2023-02-19 Thread Shengkai Fang
Hi. 能同时分享下复现这个 case 的sql 以及相关的报错栈吗?

Best,
Shengkai

casel.chen  于2023年2月9日周四 12:03写道:

> 不同云厂商的数据同步工具对于全量+增量同步mysql数据到kafka canal json格式时行为不一致
> 有的厂商会将DDL语句同步到topic导致下游flink sql作业不能识别抛错,建议flink canal
> json格式解析时直接忽略不识别的type,例如
> 例1:
> {"jobId":"76d140e3-2ef7-40c2-a795-af35a0fe1d61","shardId":null,"identifier":null,"eventId":"","mysqlType":null,"id":0,"es":1675908021700,"ts":1675908021700,"database":"demo","table":"oms_parcels","type":"INIT_DDL","isDdl":true,"sql":"CREATE
> TABLE `oms_parcels` (  `id` varchar(255) NOT NULL,  `createdby`
> varchar(255) DEFAULT NULL,  `createdat` timestamp NOT NULL DEFAULT
> CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,  `updatedat` timestamp NOT
> NULL DEFAULT '-00-00 00:00:00',  `updatedby` varchar(255) DEFAULT
> NULL,  `account` varchar(255) DEFAULT NULL,  `batch` varchar(255) DEFAULT
> NULL,  `client` varchar(255) DEFAULT NULL,  `command` varchar(255) DEFAULT
> NULL,  `container` varchar(255) DEFAULT NULL,  `items` mediumtext,
> `trackingnumber` varchar(255) NOT NULL,  `transporter` varchar(255) DEFAULT
> NULL,  `weight` decimal(19,2) NOT NULL,  `zipcode` varchar(255) DEFAULT
> NULL,  `ld3` varchar(255) DEFAULT NULL,  `destination_code` varchar(255)
> DEFAULT NULL,  PRIMARY KEY (`id`,`trackingnumber`)) ENGINE=InnoDB DEFAULT
> CHARSET=utf8mb4","sqlType":null,"data":null,"old":null,"pkNames":null}
>
>
> 例2:
> {
> "action":"ALTER",
> "before":[],
> "bid":0,
> "data":[],
> "db":"db_test",
> "dbValType":{
> "col1":"varchar(22)",
> "col2":"varchar(22)",
> "col_pk":"varchar(22)"
> },
> "ddl":true,
> "entryType":"ROWDATA",
> "execTs":1669789188000,
> "jdbcType":{
> "col1":12,
> "col2":12,
> "col_pk":12
> },
> "pks":[],
> "schema":"db_test",
> "sendTs":1669789189533,
> "sql":"alter table table_test add col2 varchar(22) null",
> "table":"table_test",
> "tableChanges":{
> "table":{
> "columns":[
> {
> "jdbcType":12, // jdbc 类型。
> "name":"col1",// 字段名称。
> "position":0,  // 字段的顺序。
> "typeExpression":"varchar(22)", // 类型描述。
> "typeName":"varchar" // 类型名称。
> },
> {
> "jdbcType":12,
> "name":"col2",
> "position":1,
> "typeExpression":"varchar(22)",
> "typeName":"varchar"
> },
> {
> "jdbcType":12,
> "name":"col_pk",
> "position":2,
> "typeExpression":"varchar(22)",
> "typeName":"varchar"
> }
> ],
> "primaryKeyColumnNames":["col_pk"] // 主键名列表。
> },
> "type":"ALTER"
> }
> }


Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

2023-02-19 Thread Shengkai Fang
我理解如果sink 表上带有 pk,就会主动 keyby 的[1]。

Best,
Shengkai

[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L188

Shammon FY  于2023年2月20日周一 08:41写道:

> Hi
>
> 如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作
>
> Best,
> Shammon
>
>
> On Sun, Feb 19, 2023 at 1:43 PM RS  wrote:
>
> > Hi,
> > connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
> > 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert into
> >
> >
> > Thanks
> >
> >
> >
> > 在 2023-02-17 15:56:51,"casel.chen"  写道:
> > >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
> > join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
> > >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink
> > Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
> > >
> > >
> > >请问:
> > >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
> > >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
> > >我理解flink
> >
> sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
> > >
> >
>


Job Cancellation Failing

2023-02-19 Thread Puneet Duggal
Flink Cluster Context:

Flink Version - 1.15
Deployment Mode - Session
Number of Job Managers - 3 (HA)
Number of Task Managers - 1

Cancellation of Job fails due to following

org.apache.flink.runtime.rest.NotFoundException: Job 
1cb2185d4d72c8c6f0a3a549d7de4ef0 not found
at 
org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$1(AbstractExecutionGraphHandler.java:99)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at 
org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache.lambda$getExecutionGraphInternal$0(DefaultExecutionGraphCache.java:109)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:252)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1387)
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at 
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45)
at akka.dispatch.OnComplete.internal(Future.scala:299)
at akka.dispatch.OnComplete.internal(Future.scala:297)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at 
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:118)
at 
akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:1144)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:540)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) 
Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could 
not find Flink job (1cb2185d4d72c8c6f0a3a549d7de4ef0)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.requestExecutionGraphInfo(Dispatcher.java:812)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304)
at 

Re: Application upgrade rollbacks failed in Flink Kubernetes Operator

2023-02-19 Thread Shammon FY
Hi

I cannot see the difference between the two configurations, but the error
info `Failure executing: POST at: https://*/k8s/clusters/c-
fwkxh/apis/apps/v1/namespaces/test-flink/deployments. Message:
Deployment.apps "basic-example" is invalid` is strange. Maybe you can check
whether the configuration of k8s has changed?

Best,
Shammon


On Mon, Feb 20, 2023 at 12:56 AM hjw  wrote:

> I make a test on the Application upgrade rollback feature, but this
> function fails.The Flink application mode job cannot roll back to  last
> stable spec.
> As shown in the follow example, I declare a error pod-template without a
> container named flink-main-container to test rollback feature.
> However, only the error of deploying the flink application job failed
> without rollback.
>
> Error:
> org.apache.flink.client.deployment.ClusterDeploymentException: Could not
> create Kubernetes cluster "basic-example".
>  at
> org.apache.flink.kubernetes.KubernetesClusterDescriptor.deployClusterInternal(KubernetesClusterDescriptor.java:292)
> Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure
> executing: POST at: 
> https://*/k8s/clusters/c-fwkxh/apis/apps/v1/namespaces/test-flink/deployments.
> Message: Deployment.apps "basic-example" is invalid:
> [spec.template.spec.containers[0].name: Required value,
> spec.template.spec.containers[0].image: Required value]. Received status:
> Status(apiVersion=v1, code=422,
> details=StatusDetails(causes=[StatusCause(field=spec.template.spec.containers[0].name,
> message=Required value, reason=FieldValueRequired,
> additionalProperties={}),
> StatusCause(field=spec.template.spec.containers[0].image, message=Required
> value, reason=FieldValueRequired, additionalProperties={})], group=apps,
> kind=Deployment, name=flink-bdra-sql-application-job-s3p,
> retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status,
> message=Deployment.apps "flink-bdra-sql-application-job-s3p" is invalid:
> [spec.template.spec.containers[0].name: Required value,
> spec.template.spec.containers[0].image: Required value],
> metadata=ListMeta(_continue=null, remainingItemCount=null,
> resourceVersion=null, selfLink=null, additionalProperties={}),
> reason=Invalid, status=Failure, additionalProperties={}).
>  at
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:673)
>  at
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.assertResponseCode(OperationSupport.java:612)
>  at
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:560)
>
> Env:
> Flink version:Flink 1.16
> Flink Kubernetes Operator:1.3.1
>
> *Last* *stable  spec:*
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   name: basic-example
> spec:
>   image: flink:1.16
>   flinkVersion: v1_16
>   flinkConfiguration:
> taskmanager.numberOfTaskSlots: "2"
> kubernetes.operator.deployment.rollback.enabled: true
> state.savepoints.dir: s3:///flink-data/savepoints
> state.checkpoints.dir: s3:///flink-data/checkpoints
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> high-availability.storageDir: s3:///flink-data/ha
>   serviceAccount: flink
>   podTemplate:
> spec:
>   containers:
> - name: flink-main-container
>   env:
>   - name: TZ
> value: Asia/Shanghai
>   jobManager:
> resource:
>   memory: "2048m"
>   cpu: 1
>   taskManager:
> resource:
>   memory: "2048m"
>   cpu: 1
>   job:
> jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
> parallelism: 2
> upgradeMode: stateless
>
> *new Spec:*
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   name: basic-example
> spec:
>   image: flink:1.16
>   flinkVersion: v1_16
>   flinkConfiguration:
> taskmanager.numberOfTaskSlots: "2"
> kubernetes.operator.deployment.rollback.enabled: true
> state.savepoints.dir: s3:///flink-data/savepoints
> state.checkpoints.dir: s3:///flink-data/checkpoints
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> high-availability.storageDir: s3:///flink-data/ha
>   serviceAccount: flink
>   podTemplate:
> spec:
>   containers:
> -   env:
>   - name: TZ
> value: Asia/Shanghai
>   jobManager:
> resource:
>   memory: "2048m"
>   cpu: 1
>   taskManager:
> resource:
>   memory: "2048m"
>   cpu: 1
>   job:
> jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
> parallelism: 2
> upgradeMode: stateless
>
> --
> Best,
> Hjw
>


Re: Flink1.16写入kafka 报错:Cluster authorization failed.

2023-02-19 Thread Shammon FY
Hi

从`Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException:
Cluster authorization failed.`这个错误看起来像是权限错误,可以你检查下是否有权限问题

Best,
Shammon

On Fri, Feb 17, 2023 at 6:29 PM lxk  wrote:

> Flink版本:1.16
> 目前公司针对Flink版本进行升级,从Flink1.14升级到Flink1.16,代码没做任何调整,但是在写入kafka的时候报错:
> 2023-02-17 15:03:19
> org.apache.kafka.common.KafkaException: Cannot execute transactional
> method because we are in an error state
> at
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1125)
> at
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartition(TransactionManager.java:442)
> at
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:998)
> at
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:912)
> at
> org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:197)
> at
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:160)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
> at org.apache.flink.streaming.runtime.io
> .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> at org.apache.flink.streaming.runtime.io
> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> at org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException:
> Cluster authorization failed.
>
>
> 在了解了相关源码之后,知道KafkaSink这种新的kafka
> api在实现精准一次的时候,分为了两个阶段,一个是writer,一个是commiter,其中在writer中维护了一个producerpool,因此需要权限创建producer,这块能理解。
> 但是在使用老的kafka
> api,即FlinkKafkaProducer时,只需要维护一个Producer。不明白为啥在使用老的api的时候还是会报同样的错误。
>
>
> 或者我说的原因不是这个报错的根本原因,希望大家能帮忙解答下


Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

2023-02-19 Thread Shammon FY
Hi

如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作

Best,
Shammon


On Sun, Feb 19, 2023 at 1:43 PM RS  wrote:

> Hi,
> connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
> 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert into
>
>
> Thanks
>
>
>
> 在 2023-02-17 15:56:51,"casel.chen"  写道:
> >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
> join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
> >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink
> Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
> >
> >
> >请问:
> >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
> >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
> >我理解flink
> sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
> >
>


Dead Letter Queue for Kafka connector

2023-02-19 Thread Yang Liu
Hi folks,

Is there a way we can configure the *dead letter queue* (dlq) for Kafka
source connector with *Table API? *Is Datastream API the only option for
now?

Thanks,

Eric


Application upgrade rollbacks failed in Flink Kubernetes Operator

2023-02-19 Thread hjw
I make a test on the Application upgrade rollback feature, but this function 
fails.The Flink application mode job cannot roll back to  last stable spec.
As shown in the follow example, I declare a error pod-template without a 
container named flink-main-container to test rollback feature.
However, only the error of deploying the flink application job failed without 
rollback.


Error:
org.apache.flink.client.deployment.ClusterDeploymentException: Could not create 
Kubernetes cluster "basic-example".
 at 
org.apache.flink.kubernetes.KubernetesClusterDescriptor.deployClusterInternal(KubernetesClusterDescriptor.java:292)
Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure 
executing: POST at: 
https://*/k8s/clusters/c-fwkxh/apis/apps/v1/namespaces/test-flink/deployments. 
Message: Deployment.apps "basic-example" is invalid: 
[spec.template.spec.containers[0].name: Required value, 
spec.template.spec.containers[0].image: Required value]. Received status: 
Status(apiVersion=v1, code=422, 
details=StatusDetails(causes=[StatusCause(field=spec.template.spec.containers[0].name,
 message=Required value, reason=FieldValueRequired, additionalProperties={}), 
StatusCause(field=spec.template.spec.containers[0].image, message=Required 
value, reason=FieldValueRequired, additionalProperties={})], group=apps, 
kind=Deployment, name=flink-bdra-sql-application-job-s3p, 
retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, 
message=Deployment.apps "flink-bdra-sql-application-job-s3p" is invalid: 
[spec.template.spec.containers[0].name: Required value, 
spec.template.spec.containers[0].image: Required value], 
metadata=ListMeta(_continue=null, remainingItemCount=null, 
resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, 
status=Failure, additionalProperties={}).
 at 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:673)
 at 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.assertResponseCode(OperationSupport.java:612)
 at 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:560)


Env:
Flink version:Flink 1.16
Flink Kubernetes Operator:1.3.1


Last stable  spec:
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  image: flink:1.16
  flinkVersion: v1_16
  flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
kubernetes.operator.deployment.rollback.enabled: true
state.savepoints.dir: s3:///flink-data/savepoints
state.checkpoints.dir: s3:///flink-data/checkpoints
high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: s3:///flink-data/ha
  serviceAccount: flink
  podTemplate:
spec:
  containers:
- name: flink-main-container  
  env:
  - name: TZ
value: Asia/Shanghai
  jobManager:
resource:
  memory: "2048m"
  cpu: 1
  taskManager:
resource:
  memory: "2048m"
  cpu: 1
  job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 2
upgradeMode: stateless


new Spec:
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  image: flink:1.16
  flinkVersion: v1_16
  flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
kubernetes.operator.deployment.rollback.enabled: true
state.savepoints.dir: s3:///flink-data/savepoints
state.checkpoints.dir: s3:///flink-data/checkpoints
high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: s3:///flink-data/ha
  serviceAccount: flink
  podTemplate:
spec:
  containers:
-   env:
  - name: TZ
value: Asia/Shanghai
  jobManager:
resource:
  memory: "2048m"
  cpu: 1
  taskManager:
resource:
  memory: "2048m"
  cpu: 1
  job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 2
upgradeMode: stateless

--

Best,
Hjw