Re:Re: Application upgrade rollbacks failed in Flink Kubernetes Operator
Hi I declare a error pod-template without a container named flink-main-container to test rollback feature. Please pay attention to the Pod-template in the old and new specs. Last stable spec: apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: image: flink:1.16 flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" kubernetes.operator.deployment.rollback.enabled: true state.savepoints.dir: s3://flink-data/savepoints state.checkpoints.dir: s3://flink-data/checkpoints high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.storageDir: s3://flink-data/ha serviceAccount: flink podTemplate: spec: containers: - name: flink-main-container env: - name: TZ value: Asia/Shanghai jobManager: resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar parallelism: 2 upgradeMode: stateless new Spec: apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: image: flink:1.16 flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" kubernetes.operator.deployment.rollback.enabled: true state.savepoints.dir: s3://flink-data/savepoints state.checkpoints.dir: s3://flink-data/checkpoints high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.storageDir: s3://flink-data/ha serviceAccount: flink podTemplate: spec: containers: - env: - name: TZ value: Asia/Shanghai jobManager: resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar parallelism: 2 upgradeMode: stateless -- Best, Hjw At 2023-02-20 08:48:46, "Shammon FY" wrote: Hi I cannot see the difference between the two configurations, but the error info `Failure executing: POST at: https://*/k8s/clusters/c-fwkxh/apis/apps/v1/namespaces/test-flink/deployments. Message: Deployment.apps "basic-example" is invalid` is strange. Maybe you can check whether the configuration of k8s has changed? Best, Shammon On Mon, Feb 20, 2023 at 12:56 AM hjw wrote: I make a test on the Application upgrade rollback feature, but this function fails.The Flink application mode job cannot roll back to last stable spec. As shown in the follow example, I declare a error pod-template without a container named flink-main-container to test rollback feature. However, only the error of deploying the flink application job failed without rollback. Error: org.apache.flink.client.deployment.ClusterDeploymentException: Could not create Kubernetes cluster "basic-example". at org.apache.flink.kubernetes.KubernetesClusterDescriptor.deployClusterInternal(KubernetesClusterDescriptor.java:292) Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: POST at: https://*/k8s/clusters/c-fwkxh/apis/apps/v1/namespaces/test-flink/deployments. Message: Deployment.apps "basic-example" is invalid: [spec.template.spec.containers[0].name: Required value, spec.template.spec.containers[0].image: Required value]. Received status: Status(apiVersion=v1, code=422, details=StatusDetails(causes=[StatusCause(field=spec.template.spec.containers[0].name, message=Required value, reason=FieldValueRequired, additionalProperties={}), StatusCause(field=spec.template.spec.containers[0].image, message=Required value, reason=FieldValueRequired, additionalProperties={})], group=apps, kind=Deployment, name=flink-bdra-sql-application-job-s3p, retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, message=Deployment.apps "flink-bdra-sql-application-job-s3p" is invalid: [spec.template.spec.containers[0].name: Required value, spec.template.spec.containers[0].image: Required value], metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, status=Failure, additionalProperties={}). at io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:673) at io.fabric8.kubernetes.client.dsl.base.OperationSupport.assertResponseCode(OperationSupport.java:612) at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:560) Env: Flink version:Flink 1.16 Flink Kubernetes Operator:1.3.1 Last stable spec: apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: image: flink:1.16 flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "2"
Re: Re: flink canal json格式忽略不识别的type
Hi, 可以尝试使用: json.ignore-parse-errors[1] 来忽略解析的报错,需要注意这个参数会忽略所有的解析错误 [1] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/#json-ignore-parse-errors Best, Weihua On Mon, Feb 20, 2023 at 10:14 AM casel.chen wrote: > 日志中就是报这个 "type":"INIT_DDL" 不能识别呀,然后作业就退出了 > > > > > > > > > > > > > > > > > > 在 2023-02-20 09:58:56,"Shengkai Fang" 写道: > >Hi. 能同时分享下复现这个 case 的sql 以及相关的报错栈吗? > > > >Best, > >Shengkai > > > >casel.chen 于2023年2月9日周四 12:03写道: > > > >> 不同云厂商的数据同步工具对于全量+增量同步mysql数据到kafka canal json格式时行为不一致 > >> 有的厂商会将DDL语句同步到topic导致下游flink sql作业不能识别抛错,建议flink canal > >> json格式解析时直接忽略不识别的type,例如 > >> 例1: > >> > {"jobId":"76d140e3-2ef7-40c2-a795-af35a0fe1d61","shardId":null,"identifier":null,"eventId":"","mysqlType":null,"id":0,"es":1675908021700,"ts":1675908021700,"database":"demo","table":"oms_parcels","type":"INIT_DDL","isDdl":true,"sql":"CREATE > >> TABLE `oms_parcels` ( `id` varchar(255) NOT NULL, `createdby` > >> varchar(255) DEFAULT NULL, `createdat` timestamp NOT NULL DEFAULT > >> CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `updatedat` timestamp > NOT > >> NULL DEFAULT '-00-00 00:00:00', `updatedby` varchar(255) DEFAULT > >> NULL, `account` varchar(255) DEFAULT NULL, `batch` varchar(255) > DEFAULT > >> NULL, `client` varchar(255) DEFAULT NULL, `command` varchar(255) > DEFAULT > >> NULL, `container` varchar(255) DEFAULT NULL, `items` mediumtext, > >> `trackingnumber` varchar(255) NOT NULL, `transporter` varchar(255) > DEFAULT > >> NULL, `weight` decimal(19,2) NOT NULL, `zipcode` varchar(255) DEFAULT > >> NULL, `ld3` varchar(255) DEFAULT NULL, `destination_code` varchar(255) > >> DEFAULT NULL, PRIMARY KEY (`id`,`trackingnumber`)) ENGINE=InnoDB > DEFAULT > >> CHARSET=utf8mb4","sqlType":null,"data":null,"old":null,"pkNames":null} > >> > >> > >> 例2: > >> { > >> "action":"ALTER", > >> "before":[], > >> "bid":0, > >> "data":[], > >> "db":"db_test", > >> "dbValType":{ > >> "col1":"varchar(22)", > >> "col2":"varchar(22)", > >> "col_pk":"varchar(22)" > >> }, > >> "ddl":true, > >> "entryType":"ROWDATA", > >> "execTs":1669789188000, > >> "jdbcType":{ > >> "col1":12, > >> "col2":12, > >> "col_pk":12 > >> }, > >> "pks":[], > >> "schema":"db_test", > >> "sendTs":1669789189533, > >> "sql":"alter table table_test add col2 varchar(22) null", > >> "table":"table_test", > >> "tableChanges":{ > >> "table":{ > >> "columns":[ > >> { > >> "jdbcType":12, // jdbc 类型。 > >> "name":"col1",// 字段名称。 > >> "position":0, // 字段的顺序。 > >> "typeExpression":"varchar(22)", // 类型描述。 > >> "typeName":"varchar" // 类型名称。 > >> }, > >> { > >> "jdbcType":12, > >> "name":"col2", > >> "position":1, > >> "typeExpression":"varchar(22)", > >> "typeName":"varchar" > >> }, > >> { > >> "jdbcType":12, > >> "name":"col_pk", > >> "position":2, > >> "typeExpression":"varchar(22)", > >> "typeName":"varchar" > >> } > >> ], > >> "primaryKeyColumnNames":["col_pk"] // 主键名列表。 > >> }, > >> "type":"ALTER" > >> } > >> } >
Re: Re: Flink程序内存Dump不了
Hi, 可以把心跳超时(heartbeat.timeout)[1]也调大再尝试 dump 内存。 [1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#advanced-fault-tolerance-options Best, Weihua On Mon, Feb 20, 2023 at 1:58 PM lxk wrote: > 我尝试调整了参数,具体数值如下 > > > akka.ask.timeout: 900s > > > > 但还是报同样的错 > > > > > > > > > > > > 在 2023-02-17 17:32:51,"Guo Thompson" 写道: > >可能是jm 和 tm之间的心跳时间太短了, dump的过程会stop the world,tm就不响应jm的heartbeat了; > > > >lxk 于2023年2月14日周二 14:32写道: > > > >> Flink version:1.16 > >> java version: jdk1.8.0_251 > >> 问题:最近上线的Flink程序,频繁young > >> > gc,几秒一次,在调整了新生代大小之后,还是没有解决,目前整个jvm堆大小是3.57g。因此想通过程序内存情况来分析哪里问题有问题,我们通过yarn上的applicationId,使用ps > >> -ef|grep 1666758697316_2639108,找到对应的pid,最后执行 jmap -dump:format > >> b,file=user.dump 26326 > >> > 命令生成dump文件,但我们测试了很多个程序,只要一开始dump,都会对线上程序产生影响,程序的container会莫名的死掉,然后程序重启。具体执行命令后的报错如下: > >> sun.jvm.hotspot.debugger.UnmappedAddressException: 7f74efa5d410 > >> https://pic.imgdb.cn/item/63eb2a46f144a010071899ba.png > >> 不知道大家有没有遇见这个问题,是我们使用的姿势不对,还是目前使用的版本有什么问题,希望大家能够给出一些建议和看法。 >
Re:Re: Flink程序内存Dump不了
我尝试调整了参数,具体数值如下 akka.ask.timeout: 900s 但还是报同样的错 在 2023-02-17 17:32:51,"Guo Thompson" 写道: >可能是jm 和 tm之间的心跳时间太短了, dump的过程会stop the world,tm就不响应jm的heartbeat了; > >lxk 于2023年2月14日周二 14:32写道: > >> Flink version:1.16 >> java version: jdk1.8.0_251 >> 问题:最近上线的Flink程序,频繁young >> gc,几秒一次,在调整了新生代大小之后,还是没有解决,目前整个jvm堆大小是3.57g。因此想通过程序内存情况来分析哪里问题有问题,我们通过yarn上的applicationId,使用ps >> -ef|grep 1666758697316_2639108,找到对应的pid,最后执行 jmap -dump:format >> b,file=user.dump 26326 >> 命令生成dump文件,但我们测试了很多个程序,只要一开始dump,都会对线上程序产生影响,程序的container会莫名的死掉,然后程序重启。具体执行命令后的报错如下: >> sun.jvm.hotspot.debugger.UnmappedAddressException: 7f74efa5d410 >> https://pic.imgdb.cn/item/63eb2a46f144a010071899ba.png >> 不知道大家有没有遇见这个问题,是我们使用的姿势不对,还是目前使用的版本有什么问题,希望大家能够给出一些建议和看法。
Re:Re: Flink1.16写入kafka 报错:Cluster authorization failed.
我们改了权限确实解决了这个问题。但我现在想了解的是为什么Flink在1.16的时候需要创建producerID的权限,以及这个权限是不是针对新老Kafka API都需要的。针对新的Kafka API在精准一次的时候需要管理ProducerID在源码中有体现,但是老的API没看见相关的,只使用了一个ProducerID也需要由Flink内部自己管理吗? 在 2023-02-20 08:45:18,"Shammon FY" 写道: >Hi > >从`Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException: >Cluster authorization failed.`这个错误看起来像是权限错误,可以你检查下是否有权限问题 > >Best, >Shammon > >On Fri, Feb 17, 2023 at 6:29 PM lxk wrote: > >> Flink版本:1.16 >> 目前公司针对Flink版本进行升级,从Flink1.14升级到Flink1.16,代码没做任何调整,但是在写入kafka的时候报错: >> 2023-02-17 15:03:19 >> org.apache.kafka.common.KafkaException: Cannot execute transactional >> method because we are in an error state >> at >> org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1125) >> at >> org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartition(TransactionManager.java:442) >> at >> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:998) >> at >> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:912) >> at >> org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:197) >> at >> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:160) >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) >> at org.apache.flink.streaming.runtime.io >> .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) >> at org.apache.flink.streaming.runtime.io >> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) >> at org.apache.flink.streaming.runtime.io >> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) >> at >> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) >> at >> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException: >> Cluster authorization failed. >> >> >> 在了解了相关源码之后,知道KafkaSink这种新的kafka >> api在实现精准一次的时候,分为了两个阶段,一个是writer,一个是commiter,其中在writer中维护了一个producerpool,因此需要权限创建producer,这块能理解。 >> 但是在使用老的kafka >> api,即FlinkKafkaProducer时,只需要维护一个Producer。不明白为啥在使用老的api的时候还是会报同样的错误。 >> >> >> 或者我说的原因不是这个报错的根本原因,希望大家能帮忙解答下
Re: Metrics or runtimeContext in global commit
It seems no other way to get the runtimeContext in a global commit. For me, I think it's reasoable to propose the fetature. I added flink-devs channel for more attention/discussion in flink devs. Best regards, Yuxia - 原始邮件 - 发件人: "Tobias Fröhlich" 收件人: "User" 发送时间: 星期二, 2023年 2 月 14日 下午 9:26:34 主题: Metrics or runtimeContext in global commit Dear flink team, I would like to use metrics (which are then written to an influxdb) in the method org.apache.flink.api.connector.sink2.Committer::commit(Collection> committables) that I use for global commit. I use the helper method StandardSinkTopologies.addGlobalCommitter(...) to define the post-commit topology. The problem is: When I implement the interface Committer, I cannot get the runtimeContext that I need for the metrics, because it is not an Operator. The only solution I found was by cloning the flink source code and amending it in the following way: 1. declaring an abstract class "CommitterWithRuntimeContext" that implements Committer and has: - an additional field for the runtimeContext - setter and getter for this field - an abstract method "void init()" 2. in the setup() method of GlobalCommitterOperator (which is an operator and thus has a runtimeContext) adding the following lines at the end: if (committer instanceof CommitterWithRuntimeContext) { ((CommitterWithRuntimeContext) committer).setRuntimeContext(getRuntimeContext()); ((CommitterWithRuntimeContext) committer).init(); } I can then implement the method CommitterWithRuntimeContext::init() in our code and call the method CommitterWithRuntimeContext::getRuntimeContext() when I need the runtimeContext. Is there another way to get the runtimeContext in a global commit? If not, is it justified to propose a feature request for a future release, where the global commit method can be implemented in a way that the user has access to the runtimeContext? Best regards and thanks in advance Tobias Fröhlich
Re:Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
Flink SQL作业示意如下: create table user_source_table ( id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED, name STRING, dept_id BIGINT NOT NULL, proctime AS PROCTIME() ) with ( 'connector' = 'kafka', 'format' = 'canal-json', ... ); create table department_dim_table ( id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED, name STRING ) with ( 'connector' = 'jdbc', ... ); create table user_rich_sink_table ( id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED, name STRING, dept_name STRING ) with ( 'connector' = 'jdbc' ... ); insert into user_rich_sink_table select id, name, d.name as dept_name from user_source_table u left join department_dim_table for system_time as of u.proctime as d on u.dept_id = d.id; 用户id是主键,按你所说需要在最后insert into语句之前自己显示加group by用户id再insert? 现在是发现当作业并行度大于1时,相同用户id的记录会落到不同TaskManager上,造成数据更新状态不一致。 在 2023-02-20 08:41:20,"Shammon FY" 写道: >Hi > >如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作 > >Best, >Shammon > > >On Sun, Feb 19, 2023 at 1:43 PM RS wrote: > >> Hi, >> connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重 >> 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert into >> >> >> Thanks >> >> >> >> 在 2023-02-17 15:56:51,"casel.chen" 写道: >> >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner >> join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。 >> >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink >> Function的invoke方法打的日志),该行为导致最终结果表数据不正确。 >> > >> > >> >请问: >> >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗? >> >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢? >> >我理解flink >> sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。 >> > >>
Re:Re: flink canal json格式忽略不识别的type
日志中就是报这个 "type":"INIT_DDL" 不能识别呀,然后作业就退出了 在 2023-02-20 09:58:56,"Shengkai Fang" 写道: >Hi. 能同时分享下复现这个 case 的sql 以及相关的报错栈吗? > >Best, >Shengkai > >casel.chen 于2023年2月9日周四 12:03写道: > >> 不同云厂商的数据同步工具对于全量+增量同步mysql数据到kafka canal json格式时行为不一致 >> 有的厂商会将DDL语句同步到topic导致下游flink sql作业不能识别抛错,建议flink canal >> json格式解析时直接忽略不识别的type,例如 >> 例1: >> {"jobId":"76d140e3-2ef7-40c2-a795-af35a0fe1d61","shardId":null,"identifier":null,"eventId":"","mysqlType":null,"id":0,"es":1675908021700,"ts":1675908021700,"database":"demo","table":"oms_parcels","type":"INIT_DDL","isDdl":true,"sql":"CREATE >> TABLE `oms_parcels` ( `id` varchar(255) NOT NULL, `createdby` >> varchar(255) DEFAULT NULL, `createdat` timestamp NOT NULL DEFAULT >> CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `updatedat` timestamp NOT >> NULL DEFAULT '-00-00 00:00:00', `updatedby` varchar(255) DEFAULT >> NULL, `account` varchar(255) DEFAULT NULL, `batch` varchar(255) DEFAULT >> NULL, `client` varchar(255) DEFAULT NULL, `command` varchar(255) DEFAULT >> NULL, `container` varchar(255) DEFAULT NULL, `items` mediumtext, >> `trackingnumber` varchar(255) NOT NULL, `transporter` varchar(255) DEFAULT >> NULL, `weight` decimal(19,2) NOT NULL, `zipcode` varchar(255) DEFAULT >> NULL, `ld3` varchar(255) DEFAULT NULL, `destination_code` varchar(255) >> DEFAULT NULL, PRIMARY KEY (`id`,`trackingnumber`)) ENGINE=InnoDB DEFAULT >> CHARSET=utf8mb4","sqlType":null,"data":null,"old":null,"pkNames":null} >> >> >> 例2: >> { >> "action":"ALTER", >> "before":[], >> "bid":0, >> "data":[], >> "db":"db_test", >> "dbValType":{ >> "col1":"varchar(22)", >> "col2":"varchar(22)", >> "col_pk":"varchar(22)" >> }, >> "ddl":true, >> "entryType":"ROWDATA", >> "execTs":1669789188000, >> "jdbcType":{ >> "col1":12, >> "col2":12, >> "col_pk":12 >> }, >> "pks":[], >> "schema":"db_test", >> "sendTs":1669789189533, >> "sql":"alter table table_test add col2 varchar(22) null", >> "table":"table_test", >> "tableChanges":{ >> "table":{ >> "columns":[ >> { >> "jdbcType":12, // jdbc 类型。 >> "name":"col1",// 字段名称。 >> "position":0, // 字段的顺序。 >> "typeExpression":"varchar(22)", // 类型描述。 >> "typeName":"varchar" // 类型名称。 >> }, >> { >> "jdbcType":12, >> "name":"col2", >> "position":1, >> "typeExpression":"varchar(22)", >> "typeName":"varchar" >> }, >> { >> "jdbcType":12, >> "name":"col_pk", >> "position":2, >> "typeExpression":"varchar(22)", >> "typeName":"varchar" >> } >> ], >> "primaryKeyColumnNames":["col_pk"] // 主键名列表。 >> }, >> "type":"ALTER" >> } >> }
Re:Re:[急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
你说的这个在写入之前进行shuffle(先执行一个group by主键)这个操作我认为应该是Flink框架层面的事情,不应该在作业层面显式添加。 Flink框架应该在执行sink的时候判断目标表是否有主键,如果有主键的话应该插入一个group by算子将相同主键的记录发到同一个TaskManager处理。 我听说 Flink新版本1.15还是1.16不记得了已经改进了这个问题,有谁知道吗?有相关issue或PR链接没? 在 2023-02-19 13:43:29,"RS" 写道: >Hi, >connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重 >所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert into > > >Thanks > > > >在 2023-02-17 15:56:51,"casel.chen" 写道: >>作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner >>join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。 >>测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink >>Function的invoke方法打的日志),该行为导致最终结果表数据不正确。 >> >> >>请问: >>flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗? >>是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢? >>我理解flink >>sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。 >>
Re: Flink SQL support array transform function
Hi, Xuekui. As said in the exception stack, may be you can try to provide hint for the function's parameters. class ArrayTransformFunction extends ScalarFunction { def eval ( @DataTypeHint("ARRAY") a: Array [Long], @DataTypeHint("RAW") function: Long => Long): Array [Long] = { a.map(e => function(e)) } } Hope it can help. For more detail, please refer to Flink doc[1] [1]: [ https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference | https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference ] Best regards, Yuxia 发件人: "Xuekui" 收件人: "fskmine" , "Caizhi Weng" 抄送: "User" 发送时间: 星期四, 2023年 2 月 16日 上午 10:54:05 主题: Re: Flink SQL support array transform function Hi Caizhi, I've tried to write UDF to support this function, but I found I can't pass the function parameter to udf because the data type of function is not supported. An exception throws in SQL validation. My UDF code: class ArrayTransformFunction extends ScalarFunction { def eval (a: Array [Long], function: Long => Long): Array [Long] = { a.map(e => function(e)) } } Exception: Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. An error occurred in the type inference logic of function 'transform'. at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660) at SQLTest$.main(SQLTest.scala:44) at SQLTest.main(SQLTest.scala) Caused by: org.apache.flink.table.api.ValidationException: An error occurred in the type inference logic of function 'transform'. at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163) at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146) at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100) at java.util.Optional.flatMap(Optional.java:241) at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98) at org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67) at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1260) at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1275) at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1245) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1009) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147) ... 6 more Caused by: org.apache.flink.table.api.ValidationException: Could not extract a valid type inference for function class 'udf.ArrayTransformFunction'. Please check for implementation mistakes and/or provide a corresponding hint. at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333) at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150) at org.apache.flink.table.types.extraction.TypeInferenceExtractor.forScalarFunction(TypeInferenceExtractor.java:83) at org.apache.flink.table.functions.ScalarFunction.getTypeInference(ScalarFunction.java:143) at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160) ... 17 more Caused by: org.apache.flink.table.api.ValidationException: Error in extracting a signature to output mapping. at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333) at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:117) at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:161) at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148) ... 20 more Caused by:
Re: flink canal json格式忽略不识别的type
Hi. 能同时分享下复现这个 case 的sql 以及相关的报错栈吗? Best, Shengkai casel.chen 于2023年2月9日周四 12:03写道: > 不同云厂商的数据同步工具对于全量+增量同步mysql数据到kafka canal json格式时行为不一致 > 有的厂商会将DDL语句同步到topic导致下游flink sql作业不能识别抛错,建议flink canal > json格式解析时直接忽略不识别的type,例如 > 例1: > {"jobId":"76d140e3-2ef7-40c2-a795-af35a0fe1d61","shardId":null,"identifier":null,"eventId":"","mysqlType":null,"id":0,"es":1675908021700,"ts":1675908021700,"database":"demo","table":"oms_parcels","type":"INIT_DDL","isDdl":true,"sql":"CREATE > TABLE `oms_parcels` ( `id` varchar(255) NOT NULL, `createdby` > varchar(255) DEFAULT NULL, `createdat` timestamp NOT NULL DEFAULT > CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `updatedat` timestamp NOT > NULL DEFAULT '-00-00 00:00:00', `updatedby` varchar(255) DEFAULT > NULL, `account` varchar(255) DEFAULT NULL, `batch` varchar(255) DEFAULT > NULL, `client` varchar(255) DEFAULT NULL, `command` varchar(255) DEFAULT > NULL, `container` varchar(255) DEFAULT NULL, `items` mediumtext, > `trackingnumber` varchar(255) NOT NULL, `transporter` varchar(255) DEFAULT > NULL, `weight` decimal(19,2) NOT NULL, `zipcode` varchar(255) DEFAULT > NULL, `ld3` varchar(255) DEFAULT NULL, `destination_code` varchar(255) > DEFAULT NULL, PRIMARY KEY (`id`,`trackingnumber`)) ENGINE=InnoDB DEFAULT > CHARSET=utf8mb4","sqlType":null,"data":null,"old":null,"pkNames":null} > > > 例2: > { > "action":"ALTER", > "before":[], > "bid":0, > "data":[], > "db":"db_test", > "dbValType":{ > "col1":"varchar(22)", > "col2":"varchar(22)", > "col_pk":"varchar(22)" > }, > "ddl":true, > "entryType":"ROWDATA", > "execTs":1669789188000, > "jdbcType":{ > "col1":12, > "col2":12, > "col_pk":12 > }, > "pks":[], > "schema":"db_test", > "sendTs":1669789189533, > "sql":"alter table table_test add col2 varchar(22) null", > "table":"table_test", > "tableChanges":{ > "table":{ > "columns":[ > { > "jdbcType":12, // jdbc 类型。 > "name":"col1",// 字段名称。 > "position":0, // 字段的顺序。 > "typeExpression":"varchar(22)", // 类型描述。 > "typeName":"varchar" // 类型名称。 > }, > { > "jdbcType":12, > "name":"col2", > "position":1, > "typeExpression":"varchar(22)", > "typeName":"varchar" > }, > { > "jdbcType":12, > "name":"col_pk", > "position":2, > "typeExpression":"varchar(22)", > "typeName":"varchar" > } > ], > "primaryKeyColumnNames":["col_pk"] // 主键名列表。 > }, > "type":"ALTER" > } > }
Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
我理解如果sink 表上带有 pk,就会主动 keyby 的[1]。 Best, Shengkai [1] https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L188 Shammon FY 于2023年2月20日周一 08:41写道: > Hi > > 如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作 > > Best, > Shammon > > > On Sun, Feb 19, 2023 at 1:43 PM RS wrote: > > > Hi, > > connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重 > > 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert into > > > > > > Thanks > > > > > > > > 在 2023-02-17 15:56:51,"casel.chen" 写道: > > >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner > > join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。 > > >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink > > Function的invoke方法打的日志),该行为导致最终结果表数据不正确。 > > > > > > > > >请问: > > >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗? > > >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢? > > >我理解flink > > > sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。 > > > > > >
Job Cancellation Failing
Flink Cluster Context: Flink Version - 1.15 Deployment Mode - Session Number of Job Managers - 3 (HA) Number of Task Managers - 1 Cancellation of Job fails due to following org.apache.flink.runtime.rest.NotFoundException: Job 1cb2185d4d72c8c6f0a3a549d7de4ef0 not found at org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$1(AbstractExecutionGraphHandler.java:99) at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884) at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) at org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache.lambda$getExecutionGraphInternal$0(DefaultExecutionGraphCache.java:109) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:252) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1387) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45) at akka.dispatch.OnComplete.internal(Future.scala:299) at akka.dispatch.OnComplete.internal(Future.scala:297) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621) at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:118) at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:1144) at akka.actor.Actor.aroundReceive(Actor.scala:537) at akka.actor.Actor.aroundReceive$(Actor.scala:535) at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:540) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) at akka.actor.ActorCell.invoke(ActorCell.scala:548) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) at akka.dispatch.Mailbox.run(Mailbox.scala:231) at akka.dispatch.Mailbox.exec(Mailbox.scala:243) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (1cb2185d4d72c8c6f0a3a549d7de4ef0) at org.apache.flink.runtime.dispatcher.Dispatcher.requestExecutionGraphInfo(Dispatcher.java:812) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304) at
Re: Application upgrade rollbacks failed in Flink Kubernetes Operator
Hi I cannot see the difference between the two configurations, but the error info `Failure executing: POST at: https://*/k8s/clusters/c- fwkxh/apis/apps/v1/namespaces/test-flink/deployments. Message: Deployment.apps "basic-example" is invalid` is strange. Maybe you can check whether the configuration of k8s has changed? Best, Shammon On Mon, Feb 20, 2023 at 12:56 AM hjw wrote: > I make a test on the Application upgrade rollback feature, but this > function fails.The Flink application mode job cannot roll back to last > stable spec. > As shown in the follow example, I declare a error pod-template without a > container named flink-main-container to test rollback feature. > However, only the error of deploying the flink application job failed > without rollback. > > Error: > org.apache.flink.client.deployment.ClusterDeploymentException: Could not > create Kubernetes cluster "basic-example". > at > org.apache.flink.kubernetes.KubernetesClusterDescriptor.deployClusterInternal(KubernetesClusterDescriptor.java:292) > Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure > executing: POST at: > https://*/k8s/clusters/c-fwkxh/apis/apps/v1/namespaces/test-flink/deployments. > Message: Deployment.apps "basic-example" is invalid: > [spec.template.spec.containers[0].name: Required value, > spec.template.spec.containers[0].image: Required value]. Received status: > Status(apiVersion=v1, code=422, > details=StatusDetails(causes=[StatusCause(field=spec.template.spec.containers[0].name, > message=Required value, reason=FieldValueRequired, > additionalProperties={}), > StatusCause(field=spec.template.spec.containers[0].image, message=Required > value, reason=FieldValueRequired, additionalProperties={})], group=apps, > kind=Deployment, name=flink-bdra-sql-application-job-s3p, > retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, > message=Deployment.apps "flink-bdra-sql-application-job-s3p" is invalid: > [spec.template.spec.containers[0].name: Required value, > spec.template.spec.containers[0].image: Required value], > metadata=ListMeta(_continue=null, remainingItemCount=null, > resourceVersion=null, selfLink=null, additionalProperties={}), > reason=Invalid, status=Failure, additionalProperties={}). > at > io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:673) > at > io.fabric8.kubernetes.client.dsl.base.OperationSupport.assertResponseCode(OperationSupport.java:612) > at > io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:560) > > Env: > Flink version:Flink 1.16 > Flink Kubernetes Operator:1.3.1 > > *Last* *stable spec:* > apiVersion: flink.apache.org/v1beta1 > kind: FlinkDeployment > metadata: > name: basic-example > spec: > image: flink:1.16 > flinkVersion: v1_16 > flinkConfiguration: > taskmanager.numberOfTaskSlots: "2" > kubernetes.operator.deployment.rollback.enabled: true > state.savepoints.dir: s3:///flink-data/savepoints > state.checkpoints.dir: s3:///flink-data/checkpoints > high-availability: > org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory > high-availability.storageDir: s3:///flink-data/ha > serviceAccount: flink > podTemplate: > spec: > containers: > - name: flink-main-container > env: > - name: TZ > value: Asia/Shanghai > jobManager: > resource: > memory: "2048m" > cpu: 1 > taskManager: > resource: > memory: "2048m" > cpu: 1 > job: > jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar > parallelism: 2 > upgradeMode: stateless > > *new Spec:* > apiVersion: flink.apache.org/v1beta1 > kind: FlinkDeployment > metadata: > name: basic-example > spec: > image: flink:1.16 > flinkVersion: v1_16 > flinkConfiguration: > taskmanager.numberOfTaskSlots: "2" > kubernetes.operator.deployment.rollback.enabled: true > state.savepoints.dir: s3:///flink-data/savepoints > state.checkpoints.dir: s3:///flink-data/checkpoints > high-availability: > org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory > high-availability.storageDir: s3:///flink-data/ha > serviceAccount: flink > podTemplate: > spec: > containers: > - env: > - name: TZ > value: Asia/Shanghai > jobManager: > resource: > memory: "2048m" > cpu: 1 > taskManager: > resource: > memory: "2048m" > cpu: 1 > job: > jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar > parallelism: 2 > upgradeMode: stateless > > -- > Best, > Hjw >
Re: Flink1.16写入kafka 报错:Cluster authorization failed.
Hi 从`Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.`这个错误看起来像是权限错误,可以你检查下是否有权限问题 Best, Shammon On Fri, Feb 17, 2023 at 6:29 PM lxk wrote: > Flink版本:1.16 > 目前公司针对Flink版本进行升级,从Flink1.14升级到Flink1.16,代码没做任何调整,但是在写入kafka的时候报错: > 2023-02-17 15:03:19 > org.apache.kafka.common.KafkaException: Cannot execute transactional > method because we are in an error state > at > org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1125) > at > org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartition(TransactionManager.java:442) > at > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:998) > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:912) > at > org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:197) > at > org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:160) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) > at org.apache.flink.streaming.runtime.io > .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) > at org.apache.flink.streaming.runtime.io > .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > at org.apache.flink.streaming.runtime.io > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException: > Cluster authorization failed. > > > 在了解了相关源码之后,知道KafkaSink这种新的kafka > api在实现精准一次的时候,分为了两个阶段,一个是writer,一个是commiter,其中在writer中维护了一个producerpool,因此需要权限创建producer,这块能理解。 > 但是在使用老的kafka > api,即FlinkKafkaProducer时,只需要维护一个Producer。不明白为啥在使用老的api的时候还是会报同样的错误。 > > > 或者我说的原因不是这个报错的根本原因,希望大家能帮忙解答下
Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
Hi 如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作 Best, Shammon On Sun, Feb 19, 2023 at 1:43 PM RS wrote: > Hi, > connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重 > 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert into > > > Thanks > > > > 在 2023-02-17 15:56:51,"casel.chen" 写道: > >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner > join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。 > >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink > Function的invoke方法打的日志),该行为导致最终结果表数据不正确。 > > > > > >请问: > >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗? > >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢? > >我理解flink > sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。 > > >
Dead Letter Queue for Kafka connector
Hi folks, Is there a way we can configure the *dead letter queue* (dlq) for Kafka source connector with *Table API? *Is Datastream API the only option for now? Thanks, Eric
Application upgrade rollbacks failed in Flink Kubernetes Operator
I make a test on the Application upgrade rollback feature, but this function fails.The Flink application mode job cannot roll back to last stable spec. As shown in the follow example, I declare a error pod-template without a container named flink-main-container to test rollback feature. However, only the error of deploying the flink application job failed without rollback. Error: org.apache.flink.client.deployment.ClusterDeploymentException: Could not create Kubernetes cluster "basic-example". at org.apache.flink.kubernetes.KubernetesClusterDescriptor.deployClusterInternal(KubernetesClusterDescriptor.java:292) Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: POST at: https://*/k8s/clusters/c-fwkxh/apis/apps/v1/namespaces/test-flink/deployments. Message: Deployment.apps "basic-example" is invalid: [spec.template.spec.containers[0].name: Required value, spec.template.spec.containers[0].image: Required value]. Received status: Status(apiVersion=v1, code=422, details=StatusDetails(causes=[StatusCause(field=spec.template.spec.containers[0].name, message=Required value, reason=FieldValueRequired, additionalProperties={}), StatusCause(field=spec.template.spec.containers[0].image, message=Required value, reason=FieldValueRequired, additionalProperties={})], group=apps, kind=Deployment, name=flink-bdra-sql-application-job-s3p, retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, message=Deployment.apps "flink-bdra-sql-application-job-s3p" is invalid: [spec.template.spec.containers[0].name: Required value, spec.template.spec.containers[0].image: Required value], metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, status=Failure, additionalProperties={}). at io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:673) at io.fabric8.kubernetes.client.dsl.base.OperationSupport.assertResponseCode(OperationSupport.java:612) at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:560) Env: Flink version:Flink 1.16 Flink Kubernetes Operator:1.3.1 Last stable spec: apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: image: flink:1.16 flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" kubernetes.operator.deployment.rollback.enabled: true state.savepoints.dir: s3:///flink-data/savepoints state.checkpoints.dir: s3:///flink-data/checkpoints high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.storageDir: s3:///flink-data/ha serviceAccount: flink podTemplate: spec: containers: - name: flink-main-container env: - name: TZ value: Asia/Shanghai jobManager: resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar parallelism: 2 upgradeMode: stateless new Spec: apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: image: flink:1.16 flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" kubernetes.operator.deployment.rollback.enabled: true state.savepoints.dir: s3:///flink-data/savepoints state.checkpoints.dir: s3:///flink-data/checkpoints high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.storageDir: s3:///flink-data/ha serviceAccount: flink podTemplate: spec: containers: - env: - name: TZ value: Asia/Shanghai jobManager: resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar parallelism: 2 upgradeMode: stateless -- Best, Hjw