????

2022-02-14 文章 ????


flinkcdc:slave with the same server_uuid/server_id as this slave has connected to the master;

2022-02-14 文章 maker_d...@foxmail.com
flink version:flink-1.13.5
cdc version:2.1.1

在使用flinkcdc同步多个表时遇到报错:
org.apache.flink.runtime.JobException: Recovery is suppressed by 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, 
backoffTimeMS=1)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)
at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
at sun.reflect.GeneratedMethodAccessor135.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
exception
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:223)
at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)
at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)
at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294)
at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: SplitFetcher thread 3 received 
unexpected exception while polling the records
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:148)
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:103)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 

flink sql jdbc sink事务提交问题

2022-02-14 文章 casel.chen
最近在扩展flink sql jdbc 
connector以支持phoenix数据库,测试debug的时候发现数据能够通过PhoenixStatement.executeBatch()写入,但因为没有提交事务,所以其他人看不到。
源码中PhoenixPreparedStatement.execute()方法会调用executeMutation(statement)方法,继而判断connection.getAutoCommit()与否来执行connection.commit()方法。完了回到PhoenixStatement.executeBatch()执行flushIfNecessary()方法,里面根据connection.getAutoFlush()与否来执行connection.flush()操作。
一开始我没有在phoenix jdbc 
url上添加;autocommit=true参数,发现变化的数据并没有commit到数据库。后来添加了;autocommit=true参数后执行了connection.commit()方法才将数据提交成功。


有几个疑问:
1. 换成sink进mysql数据库就没有这个问题,难道不同数据库的jdbc sink行为会不一样么?
2. connection autoflush参数在哪里设置?跟autocommit区别是什么?
3. 
buffer条数满了或interval周期达到又或者checkpoint时就会执行flush操作,里面执行的是JdbcBatchingOutputFormat.flush方法,这里我也没有找到connection.commit()操作,数据是如何提交到数据库的呢?不开启事务情况下,执行完statement.executeBatch()就会提交么?

Re: jdbc connector 写入异常数据后,不再写入新数据时,异常信息不断嵌套,且不会抛出

2022-02-14 文章 Caizhi Weng
Hi!

图片不能显示,建议传到 imgur 等外部图床上,并在邮件里贴出链接。

虽然看不到图片,但看描述应该是一个已知问题 [1],只是目前还没人修复。

[1] https://issues.apache.org/jira/browse/FLINK-24677

jianjianjianjianjianjianjianjian <724125...@qq.com.invalid> 于2022年2月14日周一
15:40写道:

> 老师们,你们好:
>   在使用jdbc connector写入操作时,*写入一条错误数据*(字段值超长)后*不再写入数据*
> ,异常信息打印,但错误信息不会抛出,且异常信息会不断嵌套。该情况可能会导致问题延迟。
>   当前使用为1.13版本 
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat
> 类
>   查阅 master 分支的 org.apache.flink.connector.jdbc.internal.
> JdbcOutputFormat  类也存在类似问题,代码截图如下:
>
> *1.13版本:*
>
> master 分支:
>
>
>
>
>


Re: 请教设置了table.exec.state.ttl后,结果数据不更新问题

2022-02-14 文章 Caizhi Weng
Hi!

图片不能显示,建议传到 imgur 等外部图床上,再把链接贴到邮件里。

设置 state ttl 之前 sink 数据能一直更新吗?确认不是因为后来的数据不符合某些 where 条件导致的吗?

liangjinghong  于2022年2月12日周六 14:39写道:

> 你好,我是一个flink新手。为了进行状态管理,我在代码中设置了configuration.setString("table.exec.state.ttl","12h");
>
> 然而,在flink作业运行12小时后,*我的update结果表再也没有更新过*。从web ui可以看到,我的源头与一些算子的Records 
> Sent一直在增长,任务也没有出现任何异常,checkpoint也正常,所以我很苦恼是哪里出现了问题
>
> 以下是我的SQL语句:
>
> CREATE TABLE `tbl_rpt_app_usage`
>
>  (`datepoint` TIMESTAMP,
>
>   `appId` VARCHAR ,
>
>   `flavor` VARCHAR ,
>
>   `applyNum` BIGINT ,
>
>   `releaseNum` BIGINT ,
>
>   `usedNum` BIGINT ,
>
>PRIMARY KEY (`datepoint`) NOT ENFORCED
>
>  )WITH(
>
>  'connector' = 'mysql-cdc',
>
>  'hostname' = '',
>
>  'port' = '3306',
>
>  'username' = '',
>
>  'password' = '',
>
>  'database-name' = '',
>
>  'table-name' = ''
>
>  );
>
> CREATE TABLE `temporary_usage`
>
>  (`datepoint` TIMESTAMP,
>
>   `appId` VARCHAR ,
>
>   `flavor` VARCHAR ,
>
>   `applyNum` BIGINT ,
>
>   `releaseNum` BIGINT ,
>
>   `usedNum` BIGINT ,
>
>PRIMARY KEY (`datepoint`) NOT ENFORCED
>
>  )WITH(
>
>  'connector' = 'mysql-cdc',
>
>  'hostname' = '',
>
>  'port' = '',
>
>  'username' = '',
>
>  'password' = '',
>
>  'database-name' = '',
>
>  'table-name' = ''
>
>  );
>
> CREATE TABLE `tbl_cce_cluster`
>
>  (`clusterId` VARCHAR ,
>
>   `region` VARCHAR ,
>
>   `flavor` VARCHAR ,
>
>   `totalNode` BIGINT ,
>
>   `toolName` VARCHAR ,
>
>   `appId` VARCHAR ,
>
>PRIMARY KEY (`clusterId`) NOT ENFORCED
>
>  )WITH(
>
>  'connector' = 'mysql-cdc',
>
>  'hostname' = '',
>
>  'port' = '3306',
>
>  'username' = '',
>
>  'password' = '',
>
>  'database-name' = '',
>
>  'table-name' = ''
>
>  );
>
> ---mysql落地表
>
> CREATE table sink(
>
>   code STRING,
>
>   name STRING,
>
>   usedcnt BIGINT,
>
>   `time` TIMESTAMP,
>
>   type STRING,
>
>   PRIMARY KEY (`time`) NOT ENFORCED
>
> ) with (
>
>'connector' = 'jdbc',
>
>'url' = 'jdbc:mysql: ',
>
>'username' = '',
>
>'password' = '',
>
>'table-name' = 'sink'
>
> );
>
> (因为MySQLCDC似乎不支持在同步时筛选指定内容,所以目前同步后先创建筛选结果的虚拟表,再进行计算,还想请教下是否有更好的办法)
>
> CREATE VIEW rpt
>
>  (datepoint,appId,flavor,applyNum,releaseNum,usedNum)
>
>  AS
>
>  select * from tbl_rpt_app_usage
>
>  where datepoint > '2022-02-11 14:54:01'
>
>  and appId not in ('aaa')
>
>  union all
>
>  select * from temporary_usage
>
>
>
> CREATE VIEW cce
>
> (clusterId,region,flavor,totalNode,toolName,appId)
>
> AS
>
> select * from tbl_cce_cluster
>
> where toolName in ('bbb','ccc')
>
>
>
> ---代码逻辑如下:
>
> insert into sink
>
> select code,name,usedcnt,LOCALTIMESTAMP as `time`,type from(
>
> select
>
> info.code,
>
> info.name,
>
> sum(rpt.usedNum) as usedcnt,'online' type
>
> from (
>
> select * from (
>
> select *,ROW_NUMBER() OVER(PARTITION by appId,flavor order by datepoint
> desc) as row_num from rpt/*+ OPTIONS('server-id'='1001-1005') */
>
> )where row_num =1
>
> )rpt
>
> join info
>
> on info.appid=rpt.appId
>
> group by info.code,info.name)
>
> union all
>
> select code,name,usedcnt,LOCALTIMESTAMP as `time`,type from(
>
> select
>
> info.code,
>
> info.name,
>
> sum(totalNode) as usedcnt,'online' type
>
> from
>
> cce/*+ OPTIONS('server-id'='1006-1010') */
>
> join info
>
> on cce.appId=info.appid
>
> group by info.code,info.name)
>
> 我的结果表在的12小时(我设置的过期时间)后,再也没更新过。
>
>
>
> 另外,还想请教一个问题:目前group by算子的落地到数据库的结果只支持update
> 吗?业务期待获取历史数据,目前只能想到每分钟全量同步一次结果表的数据到另一个表里,这样就可以追踪到历史数据,是否还有更好的解决办法呢?
>
>
>
>
>
> 非常感谢您的阅读与解惑!
>


flink-netty-shuffle文件占满节点磁盘

2022-02-14 文章 智能平台
各位老师好:
执行下面代码导致所有节点磁盘占满,在本地调试时C盘也沾满了
文件名称:flink-netty-shuffle-b71f58a6-0fdb-437f-a6b8-6b0bbedd3dfa
说明:

1.  批处理模式
2.本地测试时输入目录oneDay和long大小在1G左右,启动程序后会把C(C:\Users\xxx 
\AppData\Local\Temp)盘剩余的几十G空间占满,部署到集群后,也会逐渐占满各节点磁盘

3.广播流blackListStream大概一万条记录,尝试把process中获取广播变量的代码和processBroadcastElement方法注释了,仍不起作用



String oneDayLogFile = "C:\\Users\\xianghuibai\\Desktop\\oneDay";
String historyFileName = "C:\\Users\\xianghuibai\\Desktop\\long";

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

DataStream blackListStream = 
env.fromCollection(RedisPool20484Utils.getCustomJedisCluster().smembers("user_blacklist_cid_test"));

MapStateDescriptor type =
new MapStateDescriptor("blackList_type", 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO);
BroadcastStream blackList_b = blackListStream.broadcast(type);

DataStream> oneDayLog = 
env.readTextFile(oneDayLogFile)
.map(new MapFunction>() {
@Override
public Tuple5 
map(String line) throws Exception {
String[] arrs = line.split("\t");
return new Tuple5<>(arrs[0], arrs[1], arrs[2], arrs[3], 
arrs[4]);
}
});

SingleOutputStreamOperator> dayOutput = env.readTextFile(historyFileName)
.flatMap(new FlatParseLong())
.union(oneDayLog)
.connect(blackList_b)
.process(new BroadcastProcessFunction, String, Tuple5>() {
private transient ReadOnlyBroadcastState 
broadcastState;

@Override
public void processElement(Tuple5 value, ReadOnlyContext ctx, Collector> out) throws Exception {
if(broadcastState == null){
broadcastState = ctx.getBroadcastState(type);
}
if(value!=null && !broadcastState.contains(value.f0)){
out.collect(value);
}
}
@Override
public void processBroadcastElement(String value, Context 
ctx, Collector> out) throws 
Exception {
if(StringUtils.isNotEmpty(value)){
BroadcastState broadcastState = 
ctx.getBroadcastState(type);
broadcastState.put(value, true);
}
}
});



flink创建视图后,SELECT语句后使用OPTIONS报错

2022-02-14 文章 liangjinghong
各位老师们好,以下代码在开发环境中可以执行,打包部署后报错:
代码:
CREATE VIEW used_num_common
(toolName,region,type,flavor,used_num)
AS
select info.toolName as toolName,r.regionName as 
region,f.type,f.flavor,count(1) as used_num from
tbl_schedule_job/*+ OPTIONS('server-id'='1001-1031') */ job
join
tbl_schedule_task/*+ OPTIONS('server-id'='2001-2031') */ task
on job.jobId = task.jobId
join
tbl_broker_node/*+ OPTIONS('server-id'='3001-3031') */  node
on task.nodeId = node.id
join
tbl_app_info/*+ OPTIONS('server-id'='4001-4031') */ info
on job.appId = info.appId
join
tbl_region r
on node.region/*+ OPTIONS('server-id'='5001-5031') */ = r.region
join
tbl_flavor/*+ OPTIONS('server-id'='6001-6031') */  f
on node.resourcesSpec = f.flavor
where job.jobStatus in ('RUNNING','ERROR','INITING')
and task.taskStatus in ('RUNNING','ERROR','INITING')
and node.machineStatus <> 'DELETED'
and toolName is not null
group by info.toolName,r.regionName,f.type,f.flavor
…
打包部署后报错如下:
The main method caused an error: class org.apache.calcite.sql.SqlSyntax$6: 
SPECIAL
2022-02-08 13:33:39,350 WARN 
org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - 
Could not execute application:
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: class org.apache.calcite.sql.SqlSyntax$6: SPECIAL
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
 ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
~[flink-dist_2.11-1.13.0.jar:1.13.0]
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
 ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
 ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102)
 ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
 [?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) 
[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 [?:?]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.UnsupportedOperationException: class 
org.apache.calcite.sql.SqlSyntax$6: SPECIAL
at org.apache.calcite.util.Util.needToImplement(Util.java:1075) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlSyntax$6.unparse(SqlSyntax.java:116) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlOperator.unparse(SqlOperator.java:329) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:101) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlJoin$SqlJoinOperator.unparse(SqlJoin.java:199) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:104) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlJoin$SqlJoinOperator.unparse(SqlJoin.java:199) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:104) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlJoin$SqlJoinOperator.unparse(SqlJoin.java:199) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:104) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlJoin$SqlJoinOperator.unparse(SqlJoin.java:199) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:104) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at 
org.apache.calcite.sql.SqlSelectOperator.unparse(SqlSelectOperator.java:176) 
~[flink-table_2.11-1.13.0.jar:1.13.0]
at