??????????flink sql ???? Multiple factories for identifier 'jdbc' that implement

2020-12-02 文章 xuzh
??


Caused by: org.apache.flink.table.api.ValidationException: Multiple factories 
for identifier 'jdbc' that implement 
'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the 
classpath


DynamicTableSinkFactory


??
package org.apache.flink.examples;


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.factories.DynamicTableSinkFactory;


public class CDC2ss2 {
  public static void main(String[] args) throws Exception {


// set up execution environment
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv;


EnvironmentSettings settings = 
EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
tEnv = StreamTableEnvironment.create(env, settings);
String src_sql = "CREATE TABLE userss (\n" +
"  
user_id INT,\n" +
"  
user_nm STRING\n" +
") WITH (\n" +
"   
'connector' = 'mysql-cdc',\n" +
"   
'hostname' = '10.12.5.37',\n" +
"   
'port' = '3306',\n" +
"   
'username' = 'dps',\n" +
"   
'password' = 'dps1234',\n" +
"   
'database-name' = 'rpt',\n" +
"   
'table-name' = 'users'\n" +
"   
)";


tEnv.executeSql(src_sql); // ??


String sink="CREATE TABLE sink (\n" +
"  
user_id INT,\n" +
"  
user_nm STRING,\n" +
"  
primary key(user_id) NOT ENFORCED \n" +
") WITH (\n" +
"   
'connector' = 'jdbc',\n" +
"   
'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n" +
"   
'username' = 'dps',\n" +
"   
'password' = 'dps1234',\n" +
"   
'table-name' = 'sink'\n" +
"   
)";
String to_print_sql="insert into sink select 
user_id ,user_nm from userss";
tEnv.executeSql(sink);
tEnv.executeSql(to_print_sql);
env.execute();
  }


}





??


Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Unable to create a sink for writing table 
'default_catalog.default_database.sink'.


Table options are:


'connector'='jdbc'
'password'='dps1234'
'table-name'='sink'
'url'='jdbc:mysql://10.0.171.171:3306/dps?useSSL=false'
'username'='dps'
at 
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
at 
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
at org.apache.flink.examples.CDC2ss2.main(CDC2ss2.java:50)
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a 
connector using option ''connector'='jdbc''.
at 
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
at 
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
... 18 more
Caused by: org.apache.flink.table.api.ValidationException: Multiple factories 
for identifier 'jdbc' that implement 
'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the 
classpath.


Ambiguous factory classes are:


java.util.LinkedList
java.util.LinkedList
java.util.LinkedList
java.util.LinkedList
at 
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:253)
at 

Re: flink-cdc 无法读出binlog,程序也不报错

2020-12-02 文章 chenjb
破案了,字段类型没按官网的要求对应起来,对应起来后正常了



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flinksql1.11长时间没有数据写入mysql,会报ava.sql.SQLException: No operations allowed after statement closed.

2020-12-02 文章 Leonard Xu
Hi,yanzi

可以贴下完整的报错信息吗?

祝好,
Leonard Xu

> 在 2020年12月3日,10:36,yanzi  写道:
> 
> 使用flinksql 1.11版本,运行计算好的指标落入mysql,长时间没有数据会报错,导致任务会失败。
> 针对:https://issues.apache.org/jira/browse/FLINK-16681,已经修复此问题,但是我们使用1.11最新版本,运行一段时间后,发现还是会有此问题,如何解决
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re:Flink SQL使用Tumble窗口函数报NoSuchMethodError functions/AggregateFunction 异常

2020-12-02 文章 hailongwang
Hi,
   你的 Flink 版本是哪个呢。从报错来看你在用 legacy planner,可以使用 blink planner 试试。

Best,
Hailong
在 2020-12-03 10:02:08,"18293503878" <18293503...@163.com> 写道:
>大家使用Flink SQL的tumble函数时,将结果表转换为流,报如下错误的异常吗
>Exception in thread "main" java.lang.NoSuchMethodError: 
>org.apache.flink.streaming.api.datastream.WindowedStream.aggregate(Lorg/apache/flink/api/common/functions/AggregateFunction;Lorg/apache/flink/streaming/api/functions/windowing/WindowFunction;Lorg/apache/flink/api/common/typeinfo/TypeInformation;Lorg/apache/flink/api/common/typeinfo/TypeInformation;Lorg/apache/flink/api/common/typeinfo/TypeInformation;)Lorg/apache/flink/streaming/api/datastream/SingleOutputStreamOperator;
>at 
>org.apache.flink.table.plan.nodes.datastream.DataStreamGroupWindowAggregate.translateToPlan(DataStreamGroupWindowAggregate.scala:214)


Re:flinksql1.11长时间没有数据写入mysql,会报ava.sql.SQLException: No operations allowed after statement closed.

2020-12-02 文章 kandy.wang



JdbcBatchingOutputFormat:
for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
try {
  attemptFlush();
batchCount = 0;
  break;
} catch (SQLException e) {
LOG.error("JDBC executeBatch error, retry times = {}", i, e);
  if (i >= executionOptions.getMaxRetries()) {
throw new IOException(e);
}
try {
if (!connection.isValid(CONNECTION_CHECK_TIMEOUT_SECONDS)) {
connection = connectionProvider.reestablishConnection();
jdbcStatementExecutor.closeStatements();
jdbcStatementExecutor.prepareStatements(connection);
}
  } catch (Exception excpetion) {
LOG.error("JDBC connection is not valid, and reestablish connection failed.", 
excpetion);
 throw new IOException("Reestablish JDBC connection failed", excpetion);
}
try {
 Thread.sleep(1000 * i);
} catch (InterruptedException ex) {
 Thread.currentThread().interrupt();
 throw new IOException("unable to flush; interrupted while doing 
another attempt", e);
}
   }
}

嗯,看起来是这样的。

 if (i >= executionOptions.getMaxRetries()) {
throw new IOException(e);
}这个判断重试的代码应该放在sleep 后面。不然,Caused by: java.io.IOException: 
java.sql.SQLException: No operations allowed after statement closed. 就没机会重建连接了。



在 2020-12-03 10:36:28,"yanzi"  写道:
>使用flinksql 1.11版本,运行计算好的指标落入mysql,长时间没有数据会报错,导致任务会失败。
>针对:https://issues.apache.org/jira/browse/FLINK-16681,已经修复此问题,但是我们使用1.11最新版本,运行一段时间后,发现还是会有此问题,如何解决
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


flink sql??????????????????????

2020-12-02 文章 ??????????
??
  flink 
sql??percentile
  

flinksql1.11长时间没有数据写入mysql,会报ava.sql.SQLException: No operations allowed after statement closed.

2020-12-02 文章 yanzi
使用flinksql 1.11版本,运行计算好的指标落入mysql,长时间没有数据会报错,导致任务会失败。
针对:https://issues.apache.org/jira/browse/FLINK-16681,已经修复此问题,但是我们使用1.11最新版本,运行一段时间后,发现还是会有此问题,如何解决



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: zookeeper更换leader对flink的影响

2020-12-02 文章 赵一旦
那Curator的state为什么会变成suspended或lost呢?我重启zk一般都是一台一台重启,而且我最近才刚刚又试过一次,我是先重启了follower
zk节点,结果刚刚kill一瞬间flink任务全部出问题了。

Yang Wang  于2020年12月1日周二 下午8:18写道:

> Flink是利用Curator Framework来进行Leader Election和Retrieval,当时Curator的State
> 变成Suspended或者Lost的时候都会触发leader的revoke,进而导致需要Cancel掉之前的job
> 等待新的leader出现再重新调度
>
> 你可以提供一下JobManager的log或者自己观察一下JobManager的log是不是有Curator Connection State的变化
> 进而导致了Failover
>
>
> Best,
> Yang
>
> 赵一旦  于2020年12月1日周二 下午7:13写道:
>
> > 又石沉大海了,有没有懂的人出来解释下。
> >
> > RS  于2020年11月17日周二 上午9:35写道:
> >
> > > 哈哈, 我的也是, flink和ZK断开连接的话, 任务会全部重启, 这边测试了各种场景, 比如部署HA方案,
> > > 部署多个jobmanager都测试过, 任务都是会重启的, 同样不知道如何解决.
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > 在 2020-11-16 18:39:29,"赵一旦"  写道:
> > >
> > >
> >
> >按照我在工作中经验,有过几次需要重启zk集群,我是单个zk节点逐个重启。结论是导致了flink集群中任务的全部自动重启(基于最近一次的ckpt)。这对任务还是有一定影响的,因为ckpt是10分钟一次,会导致瞬间压力变高。
> > > >
> > > >问下这个合理嘛,还是我配置的有问题or操作有问题。
> > >
> >
>


flink-cdc 无法读出binlog,程序也不报错

2020-12-02 文章 chenjb
hi,大佬们好,我用写了段java代码,通过cdc读取mysql的数据并通过print-table打印出来,但实际没打印,代码也不报错,一直处于运行状态

*idea中运行信息如下:*
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
details.
SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
SLF4J: Defaulting to no-operation MDCAdapter implementation.
SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further
details.
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver
class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered
via the SPI and manual loading of the driver class is generally unnecessary.
十二月 02, 2020 9:31:45 下午 com.github.shyiko.mysql.binlog.BinaryLogClient
connect
信息: Connected to localhost:3307 at mysql-bin.02/2556 (sid:6668, cid:11)




*mysql相关配置:*
binlog_format   ROW
log_bin ON
binlog_row_imageFULL




*java主要代码如下*

import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


public class mysqlSourceAndSink {
public static void main(String[] args) {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings envSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
envSettings);


String cdc_user_id = "create table cdc_user_id(\n" +
"id INT \n" +
",pid INT \n" +
",PRIMARY KEY (id) NOT ENFORCED \n" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = 'localhost',\n" +
" 'port' = '3307',\n" +
" 'username' = 'flink',\n" +
" 'password' = '123456',\n" +
" 'server-id' = '6668',\n" +
//" 'server-time-zone' = 'UTC',\n" +
" 'server-time-zone' = 'Asia/Shanghai',\n" +
" 'database-name' = 'flinktest',\n" +
" 'table-name' = 'flinktest.user_id'\n" +
")";

String table_print = "create table table_print( \n" +
"id bigint,\n" +
"pid bigint\n" +
") WITH(\n" +
"'connector' = 'print',\n" +
"'print-identifier' = '',\n" +
"'standard-error' = 'true'\n" +
")";

tableEnv.executeSql(table_print);
//tableEnv.executeSql(jdbc_user_id);
tableEnv.executeSql(cdc_user_id);

   String cdcUserPid2Print = "insert into table_print select id, pid
from cdc_user_id";
tableEnv.executeSql(cdcUserPid2Print);



*
mysql的err日志中有如下打印*

2020-12-02T13:29:52.412476Z 8 [Note] Start binlog_dump to
master_thread_id(8) slave_server(6668), pos(mysql-bin.02, 2273)
2020-12-02T13:31:03.416362Z 7 [Note] Aborted connection 7 to db:
'unconnected' user: 'flink' host: 'localhost' (Got an error reading
communication packets)
2020-12-02T13:31:36.146219Z 8 [Note] Aborted connection 8 to db:
'unconnected' user: 'flink' host: 'localhost' (failed on flush_net())
2020-12-02T13:31:45.575431Z 11 [Note] Start binlog_dump to
master_thread_id(11) slave_server(6668), pos(mysql-bin.02, 2556)



*
账号有相关授权*
create user 'flink'@'%' identified by '123456';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT
ON *.* TO 'flink' IDENTIFIED BY '123456';
flush privileges;



另外我用flink的jdbc测试也是类似的,不报错,也没把数据print出来

烦请大佬帮忙看看我这是什么原因,谢谢





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink CEP 动态加载 pattern

2020-12-02 文章 Wei Zhong
Hi 你好,

现在Flink CEP还不支持动态加载规则。社区现在有一个JIRA来跟踪这个需求:

https://issues.apache.org/jira/browse/FLINK-7129 


您可以关注这个JIRA来获取最新进展。

> 在 2020年12月2日,17:48,huang botao  写道:
> 
> Hi,在项目中常遇到规则变更的情况,我们一般怎么动态加载这些规则?Flink CEP有原生支持动态加载规则的API吗?



Re: the remote task manager was lost

2020-12-02 文章 Congxian Qiu
可以看一下 remote task 对应的 tm 日志,看看有没有啥异常

Best,
Congxian


赵一旦  于2020年12月2日周三 下午6:17写道:

> 我都是80G、100G这么分配资源的。。。
>
> guanxianchun  于2020年10月28日周三 下午5:02写道:
>
> > flink版本: flink-1.11
> > taskmanager memory: 8G
> > jobmanager memory: 2G
> > akka.ask.timeout:20s
> > akka.retry-gate-closed-for: 5000
> > client.timeout:600s
> >
> > 运行一段时间后报the remote task manager was lost ,错误信息如下:
> > 2020-10-28 00:25:30,608 INFO
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Completed
> > checkpoint 411 for job 031e5f122711786fcc11ee6eb47291fa (2703770 bytes in
> > 336 ms).
> > 2020-10-28 00:27:30,273 INFO
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> > Triggering
> > checkpoint 412 (type=CHECKPOINT) @ 1603816050239 for job
> > 031e5f122711786fcc11ee6eb47291fa.
> > 2020-10-28 00:27:30,776 INFO
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Completed
> > checkpoint 412 for job 031e5f122711786fcc11ee6eb47291fa (3466688 bytes in
> > 509 ms).
> > 2020-10-28 00:29:30,246 INFO
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> > Triggering
> > checkpoint 413 (type=CHECKPOINT) @ 1603816170239 for job
> > 031e5f122711786fcc11ee6eb47291fa.
> > 2020-10-28 00:29:30,597 INFO
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Completed
> > checkpoint 413 for job 031e5f122711786fcc11ee6eb47291fa (2752681 bytes in
> > 334 ms).
> > 2020-10-28 00:29:47,353 WARN  akka.remote.ReliableDeliverySupervisor
> >
> > [] - Association with remote system
> > [akka.tcp://fl...@hadoop01.dev.test.cn:13912] has failed, address is now
> > gated for [5000] ms. Reason: [Disassociated]
> > 2020-10-28 00:29:47,353 WARN  akka.remote.ReliableDeliverySupervisor
> >
> > [] - Association with remote system
> > [akka.tcp://flink-metr...@hadoop01.dev.test.cn:31260] has failed,
> address
> > is
> > now gated for [5000] ms. Reason: [Disassociated]
> > 2020-10-28 00:29:47,377 INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
> > KeyedProcess -> async wait operator -> Map (1/3)
> > (f84731e57528b326ad15ddc17821d1b8) switched from RUNNING to FAILED on
> > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@538198b8.
> > org.apache.flink.runtime.io
> > .network.netty.exception.RemoteTransportException:
> > Connection unexpectedly closed by remote task manager
> > 'hadoop01.dev.test.cn/192.168.1.21:7527'. This might indicate that the
> > remote task manager was lost.
> > at
> > org.apache.flink.runtime.io
> >
> .network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:144)
> > ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> > at
> >
> >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
> > ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> > at
> >
> >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
> > ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> > at
> >
> >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
> > ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> > at
> >
> >
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
> > ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> > at
> > org.apache.flink.runtime.io
> >
> .network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:97)
> > ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> > at
> >
> >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
> > ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> > at
> >
> >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
> > ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> > at
> >
> >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
> > ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> > at
> >
> >
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1416)
> > ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> > at
> >
> >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
> > ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> > at
> >
> >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
> > ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> > at
> >
> >
> 

Flink SQL????source ????

2020-12-02 文章 zz
hi??
??source 
table??topic??6??sinkinsert 
mysqlinsert
source 
tablekafkakafka 
topic??
topic??18??18

为什么要关闭calcite的隐式转换功能

2020-12-02 文章 tangshiwei
目前flink sql,flink hive 
sql都不支持隐式转换功能。我们在调试的时候发现其实calcite本身是支持的。但是flink这边强制关闭了。而hive本身是支持隐式转换的。这导致我们的hive任务无法迁移到flink上执行。请问关闭的原因是什么?如果我们这边开启会带来什么问题吗?

请问为什么关闭calcite的隐式转换

2020-12-02 文章 stgztsw
目前flink sql这边不支持隐式转换,导致sql中发现类型不一致的场景会报错,而且flink hive
sql也有同样的问题,但是hive本身其实是支持的,这个差别导致原来hive
sql任务基本无法使用flink来执行。我在调试的时候发现其实calcite本身是有隐式转换的功能的,但是flink这边强制的关闭了隐式转换。请问关闭的原因是什么?如果我们希望开启的话,会导致什么问题吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

为什么要关闭calcite的隐式转换功能

2020-12-02 文章 stgztsw
目前flink sql,flink hive 
sql都不支持隐式转换功能。我们在调试的时候发现其实calcite本身是支持的。但是flink这边强制关闭了。而hive本身是支持隐式转换的。这导致我们的hive任务无法迁移到flink上执行。请问关闭的原因是什么?如果我们这边开启会带来什么问题吗?

Re: the remote task manager was lost

2020-12-02 文章 赵一旦
我都是80G、100G这么分配资源的。。。

guanxianchun  于2020年10月28日周三 下午5:02写道:

> flink版本: flink-1.11
> taskmanager memory: 8G
> jobmanager memory: 2G
> akka.ask.timeout:20s
> akka.retry-gate-closed-for: 5000
> client.timeout:600s
>
> 运行一段时间后报the remote task manager was lost ,错误信息如下:
> 2020-10-28 00:25:30,608 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed
> checkpoint 411 for job 031e5f122711786fcc11ee6eb47291fa (2703770 bytes in
> 336 ms).
> 2020-10-28 00:27:30,273 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Triggering
> checkpoint 412 (type=CHECKPOINT) @ 1603816050239 for job
> 031e5f122711786fcc11ee6eb47291fa.
> 2020-10-28 00:27:30,776 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed
> checkpoint 412 for job 031e5f122711786fcc11ee6eb47291fa (3466688 bytes in
> 509 ms).
> 2020-10-28 00:29:30,246 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Triggering
> checkpoint 413 (type=CHECKPOINT) @ 1603816170239 for job
> 031e5f122711786fcc11ee6eb47291fa.
> 2020-10-28 00:29:30,597 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed
> checkpoint 413 for job 031e5f122711786fcc11ee6eb47291fa (2752681 bytes in
> 334 ms).
> 2020-10-28 00:29:47,353 WARN  akka.remote.ReliableDeliverySupervisor
>
> [] - Association with remote system
> [akka.tcp://fl...@hadoop01.dev.test.cn:13912] has failed, address is now
> gated for [5000] ms. Reason: [Disassociated]
> 2020-10-28 00:29:47,353 WARN  akka.remote.ReliableDeliverySupervisor
>
> [] - Association with remote system
> [akka.tcp://flink-metr...@hadoop01.dev.test.cn:31260] has failed, address
> is
> now gated for [5000] ms. Reason: [Disassociated]
> 2020-10-28 00:29:47,377 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
> KeyedProcess -> async wait operator -> Map (1/3)
> (f84731e57528b326ad15ddc17821d1b8) switched from RUNNING to FAILED on
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@538198b8.
> org.apache.flink.runtime.io
> .network.netty.exception.RemoteTransportException:
> Connection unexpectedly closed by remote task manager
> 'hadoop01.dev.test.cn/192.168.1.21:7527'. This might indicate that the
> remote task manager was lost.
> at
> org.apache.flink.runtime.io
> .network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:144)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.io
> .network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:97)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1416)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:912)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:816)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> 

I defined a Kafka dynamic table in SQL-Client, but the kafka theme had some elements in the wrong format, so an exception was thrown in SQL-Client. Can we define the Kafka dynamic table with some addi

2020-12-02 文章 mr.meng...@ouglook.com

 


Caused by: java.io.IOException: Failed to deserialize JSON ''.
at
org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:126)
~[flink-json-1.11.2.jar:1.11.2]
at
org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:76)
~[flink-json-1.11.2.jar:1.11.2]
at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
Caused by: java.lang.ClassCastException:
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.MissingNode
cannot be cast to
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode








--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink CEP 动态加载 pattern

2020-12-02 文章 huang botao
Hi,在项目中常遇到规则变更的情况,我们一般怎么动态加载这些规则?Flink CEP有原生支持动态加载规则的API吗?


Re: 修改topic名称后从Savepoint重启会怎么消费Kafka

2020-12-02 文章 zhisheng
这个是正解,参考之前提的一个 Issue https://issues.apache.org/jira/browse/FLINK-16865

Best
zhisheng

Shuai Xia  于2020年12月2日周三 下午2:03写道:

>
> hi,实时上并不是你说的这样,从sp重启时因为存在RestoreState,而且Topic名称被修改,会导致restoredState内找不到新的KafkaTopicPartition
> 新的消费位置会置为EARLIEST_OFFSET
>
>
> if (restoredState != null) {
>for (KafkaTopicPartition partition : allPartitions) {
>   if (!restoredState.containsKey(partition)) {
>  restoredState.put(partition,
> KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
>   }
>}
>
>
>
>
> --
> 发件人:熊云昆 
> 发送时间:2020年12月1日(星期二) 22:57
> 收件人:user-zh ; Shuai Xia 
> 主 题:Re:修改topic名称后从Savepoint重启会怎么消费Kafka
>
>
> 可以手动控制的,如果不设置默认是从最新位置开始消费,否则按照你的startup-mode来
>
>
>
>
>
> 在 2020-12-01 20:59:48,"Shuai Xia"  写道:
> >
> >Hi,大佬们
> >突然有一个疑问点,Flink消费kafka的Job保存了SP之后停止,修改代码中的topic名称之后重启。
> >会从新topic的哪里开始消费?与startup-mode一致,还是从最新呢?
> >可以手动控制么?
>
>
>