回复:flink 1.12 on yarn WebUI不显示logs

2021-06-02 文章 smq
加了一些配置,一些程序有日志,一些没有日志





-- 原始邮件 --
发件人: JasonLee <17610775...@163.com
发送时间: 2021年6月3日 12:44
收件人: user-zh http://apache-flink.147419.n8.nabble.com/

Re: flink 1.12 on yarn WebUI不显示logs

2021-06-02 文章 JasonLee
hi

有改过默认的日志配置文件吗?



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-06-02 文章 yujianbo
好的非常感谢,我拿几个任务测试一波,看看性能能不能接受!


Hi,

没有被引用的文件可能也不是完全无用的,可能是当前pending checkpoint正在上传的,所以还需要比较一下那些不在checkpoint
meta内的文件的修改时间戳,可能比你分析的complete checkpoint的时间戳要大。

总体上来说,我不认为这个问题是一个bug,这个是LSM架构的DB的空间放大问题。如果你对空间放大非常担心的话,可以启用 dynamic level
[1] 来严格控制空间放大,不过这个可能会影响写放大和读放大,导致性能受到一定影响。


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-compaction-level-use-dynamic-size

祝好
唐云



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-06-02 文章 Yun Tang
Hi,

没有被引用的文件可能也不是完全无用的,可能是当前pending checkpoint正在上传的,所以还需要比较一下那些不在checkpoint 
meta内的文件的修改时间戳,可能比你分析的complete checkpoint的时间戳要大。

总体上来说,我不认为这个问题是一个bug,这个是LSM架构的DB的空间放大问题。如果你对空间放大非常担心的话,可以启用 dynamic level [1] 
来严格控制空间放大,不过这个可能会影响写放大和读放大,导致性能受到一定影响。


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-compaction-level-use-dynamic-size

祝好
唐云


From: yujianbo <15205029...@163.com>
Sent: Wednesday, June 2, 2021 15:29
To: user-zh@flink.apache.org 
Subject: Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

Hi,

确认的情况:

大佬,我根据loadCheckpointMeta能够去分析_metadata引用到的/shared目录下的sst文件,然后发现/shared里没有被引用的sst文件即未删除的旧文件,这些旧文件占比很少,只有5%左右。

配置:
   idleStateRetention确实是设置3600秒,保留的ck目录是3个。
目前情况:
 每次checkpoint size增量差不多1-2G,所以size不小。5分钟checkpoint一次。
 最近单次checkpoint的sst文件数是23个,文件大小大约在65M。
 总共目前/checkpoint/shared目录大小是49.4G,然后savepoint结束大小在6.3G。

那我想问一下,那中间是不是有大量的冗余数据存在这还在被应用的sst文件里,有没有什么参数可以调整?还是JM单点删除跟不上相关速度?
JM还能扩展吗??HA不是也是一个JM在工作吗(这个地方不是很明白)





Hi,

先确定一下,你的 idleStateRetention 是 3600秒?其次,要想看是否所有数据均有用,可以利用
Checkpoints.loadCheckpointMeta [1] 去加载你所保留的checkpoint目录下的 _metadata
文件,然后与当前checkpoint目录下的文件作对比,看是否存在大量的未删除旧文件。

目前仅凭你的描述和一段SQL代码其实很难判断。
可能存在的原因有:

  1.  单次checkpoint文件数目过多,JM单点删除跟不上相关速度
  2.  整体checkpoint
size不大,RocksDB的compaction频率较低,导致相关包含过期数据的旧文件并没有被及时删除,仍然存在checkpoint目录下(保留多个checkpoint同样会放大该问题)

所以还是最好先分析已有checkpoint的meta文件,再与目前的实际情况做对比。

另外,Flink
JIRA其实不是一个很好的直接问问题的地方,尤其是这种并不明确的问题,更妥善的方式还是先在邮件列表讨论,具体定位到原因后,再创建相关详细的ticket。

[1]
https://github.com/apache/flink/blob/b582991b8b2b8dadb89e71d5002c4a9cc2055e34/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L99

祝好
唐云

From: yujianbo <[hidden email]>
Sent: Tuesday, June 1, 2021 10:51
To: [hidden email] <[hidden email]>
Subject: Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

没有更好的方式吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink jar包任务如何读取hdfs配置文件

2021-06-02 文章 骆凡
rt flink jar任务 在一个yarn集群上运行 要将数据写入另一个hdfs集群 
会报错
Caused by: java.net.UnknownHostException: xxx-hdfs
有办法让jar包任务读取加载该集群的配置文件么



--
Sent from: http://apache-flink.147419.n8.nabble.com/


??????flink postgres jdbc catalog????????????

2021-06-02 文章 ????
??postgresql??mysql??1.11.1??




----
??: 
   "user-zh"



flink postgres jdbc catalog是只读的吗?

2021-06-02 文章 casel.chen
flink postgres jdbc catalog是只读的吗?能写的catalog 除了Hive Catalog还有哪些?社区什么时候会有Mysql 
JDBC Catalog呢?

flink sql作业表定义部分字段问题

2021-06-02 文章 casel.chen
有一个flink sql 
mysql-cdc作业,从kafka消费canal-json格式的binlog数据,打到下游存储,遇到一个问题:上游源表字段是会动态添加的,而我的sql 
table定义是静态的,如果上游schema发生变化,我的作业就会失败。在flink 
sql中是否可以针对目标数据源只定义用到的字段?如果可以的话应该怎么实现?现在是会抛类似下面的error。意思是原本包含43列的数据表,在我的DDL中只定义了其中的4列。有哪些格式是支持定义部分字段的呢?


21/06/02 18:54:22 [Source: TableSourceScan(table=[[default_catalog, 
default_database, charge_log]], fields=[id, charge_id, trace_id, app_id]) -> 
Sink: Sink(table=[default_catalog.default_database.print_table], fields=[id, 
charge_id, trace_id, app_id]) (3/12)#0] WARN taskmanager.Task: Source: 
TableSourceScan(table=[[default_catalog, default_database, charge_log]], 
fields=[id, charge_id, trace_id, app_id]) -> Sink: 
Sink(table=[default_catalog.default_database.print_table], fields=[id, 
charge_id, trace_id, app_id]) (3/12)#0 (8810adcd7960cb22a6954c985ba49d0d) 
switched from RUNNING to FAILED.
 java.lang.IllegalArgumentException: Row arity: 43, but serializer arity: 4
at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:124)
at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)

怎么避免flink sql cdc作业重启后重新从头开始消费binlog?

2021-06-02 文章 casel.chen
我有一个如下flink sql cdc作业,设置了'scan.startup.mode' = 
'latest-offset'。但在作业重启后发现它又从头开始消费binlog,导致sink下游数据库频繁报duplicate key 
error,有什么办法可以避免吗?


CREATE TABLE `mysql_source` (
`id` STRING,
`acct_id` STRING,
`acct_name` STRING,
`acct_type` STRING,
`acct_bal` STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'mysql',
'password' = 'mysql',
'database-name' = 'test',
'scan.startup.mode' = 'latest-offset',
'table-name' = 'test',
'server-time-zone' = 'Asia/Shanghai'
);

Re: pyflink kafka connector报错ByteArrayDeserializer

2021-06-02 文章 Dian Fu
要用fat jar: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.13.1/flink-sql-connector-kafka_2.11-1.13.1.jar


> 2021年6月2日 下午2:43,qianhuan <819687...@qq.com> 写道:
> 
> 版本:
> python 3.8
> apache-flink   1.13.1
> apache-flink-libraries 1.13.1
> 
> 代码:
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings
> 
> def log_processing():
>env = StreamExecutionEnvironment.get_execution_environment()
>env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
>t_env = StreamTableEnvironment.create(stream_execution_environment=env,
> environment_settings=env_settings)
>t_env.get_config().get_configuration().\
>set_string("pipeline.jars",
> "file:///root/flink/jars/flink-connector-kafka_2.11-1.13.1.jar")
> 
>kafka_source_ddl = f"""
>CREATE TABLE kafka_source_table(
>a VARCHAR
>) WITH (
>  'connector.type' = 'kafka',
>  'connector.version' = 'universal',
>  'connector.topic' = 'test5',
>  'connector.properties.bootstrap.servers' = 'localhost:9092',
>  'connector.properties.zookeeper.connect' = 'localhost:2181',
>  'connector.startup-mode' = 'latest-offset',
>  'format.type' = 'json'
>)
>"""
> 
>kafka_sink_ddl = f"""
>CREATE TABLE kafka_sink_table(
>b VARCHAR
>) WITH (
>  'connector.type' = 'kafka',
>  'connector.version' = 'universal',
>  'connector.topic' = 'test6',
>  'connector.properties.bootstrap.servers' = 'localhost:9092',
>  'connector.properties.zookeeper.connect' = 'localhost:2181',
>  'format.type' = 'json'
>)
>"""
> 
>t_env.execute_sql(kafka_source_ddl)
>t_env.execute_sql(kafka_sink_ddl)
>print("all_tables", t_env.list_tables())
>t_env.sql_query("SELECT a FROM kafka_source_table") \
>.execute_insert("kafka_sink_table").wait()
> 
> if __name__ == '__main__':
>log_processing()
> 
> 报错:
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o65.executeInsert.
> : java.lang.NoClassDefFoundError:
> org/apache/kafka/common/serialization/ByteArrayDeserializer
>   at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:322)
>   at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:223)
>   at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:154)
>   at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:139)
>   at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:108)
>   at
> org.apache.flink.streaming.connectors.kafka.KafkaTableSource.createKafkaConsumer(KafkaTableSource.java:106)
>   at
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getKafkaConsumer(KafkaTableSourceBase.java:293)
>   at
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:194)
>   at
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacyTableSourceScan.translateToPlanInternal(CommonExecLegacyTableSourceScan.java:94)
>   at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>   at
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:247)
>   at
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToTransformation(CommonExecLegacySink.java:172)
>   at
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToPlanInternal(CommonExecLegacySink.java:112)
>   at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>   at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:70)
>   at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:69)
>   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at
> 

flink 1.12 on yarn WebUI不显示logs

2021-06-02 文章 smq
大家好
   
目前在测试1.12.1版本时遇到个问题,提交程序之后在WebUI查看日志,jobmanager和taskmanager 
logs都不显示,按f12查看,logs 是404,并响应 errors :this file does not exist in Jobmanager 
log dir
  这是在启动过程中没有创建日志吗,不清楚什么原因导致的。

Re: Re: Flink SQL 1.11.3问题请教

2021-06-02 文章 yinghua...@163.com
我这个情况还有点不一样的,本来单条数据是如下的:一条数据对应一个offset
 {"name":"test1"}
但是Nifi采集数据后,写入kafka格式是下面这样的,一个offset对应下面几条数据(每一个offset对应的真实数据条数还不是固定的)
 {"name":"test1"}
 {"name":"test2"}
 {"name":"test3"}
...

感谢你的回复,我借鉴下看怎么处理下,多谢了!




yinghua...@163.com
 
发件人: WeiXubin
发送时间: 2021-06-02 17:44
收件人: user-zh
主题: Re: Flink SQL 1.11.3问题请教
不知道下面场景是否与你描述的场景相同 ,假设采集到单条json消息格式为 {"name":"test"},将多条json消息合并为一条的格式为 
[{"name":"test"},{"name":"test2"}]。 我的 Flink 任务是采用 FlinkSQL
编写,处理这种情况我的方式是通过编写一个 UDF (TableFunction), 之后在这个 UDF 中进行数据拆解,变为多行 (row),再输出到
sink。
 
Row row = new Row(arity);
collect(row);
 
具体使用可参考:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/
 
Best,Weixubin
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于 flinksql 维表的问题

2021-06-02 文章 WeiXubin
你好,可以麻烦详细描述一下吗? 谢谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink SQL 1.11.3问题请教

2021-06-02 文章 WeiXubin
不知道下面场景是否与你描述的场景相同 ,假设采集到单条json消息格式为 {"name":"test"},将多条json消息合并为一条的格式为 
[{"name":"test"},{"name":"test2"}]。 我的 Flink 任务是采用 FlinkSQL
编写,处理这种情况我的方式是通过编写一个 UDF (TableFunction), 之后在这个 UDF 中进行数据拆解,变为多行 (row),再输出到
sink。

Row row = new Row(arity);
collect(row);

具体使用可参考:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/

Best,Weixubin



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink自定义connector相关报错

2021-06-02 文章 chenlei677
你好,还有完整信息么?一般逻辑是,先找文件,然后匹配属性。麻烦将完整日志输出出来看看。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复: flink自定义connector相关报错

2021-06-02 文章 MOBIN


Sorry, 工程下路径是没错的,是我发邮件时打错了,还可能是什么原因导致的呢?谢谢
src/main/resources/META-INF/services
| |
MOBIN
|
签名由网易邮箱大师定制


在2021年06月2日 17:05,Leonard Xu 写道:
路径错了

在 2021年6月2日,17:02,MOBIN <18814118...@163.com> 写道:

META-INF.services/org.apache.flink.table.factories.Factory

=>   META-INF/services/org.apache.flink.table.factories.Factory

祝好
Leonard

Re: flink自定义connector相关报错

2021-06-02 文章 Leonard Xu
路径错了

> 在 2021年6月2日,17:02,MOBIN <18814118...@163.com> 写道:
> 
> META-INF.services/org.apache.flink.table.factories.Factory

=>   META-INF/services/org.apache.flink.table.factories.Factory

祝好
Leonard

flink自定义connector相关报错

2021-06-02 文章 MOBIN


请教下,在自定义connector,IDEA上直接运demo时报了以下的错误:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a 
suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' 
in
the classpath.


工程的resource目录也准确引入了META-INF.services/org.apache.flink.table.factories.Factory,但是感觉没生效一样
Tabel-common依赖也已经引入了
谢谢
| |
MOBIN
|
签名由网易邮箱大师定制



Re: pyflink kafka connector报错ByteArrayDeserializer

2021-06-02 文章 qianhuan
是不是connector版本问题,之前1.12.2可以跑,有没有大神帮忙看下



--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复:flink1.12版本,yarn-application模式Flink web ui看不到日志

2021-06-02 文章 smq
请问你是怎么解决这个问题的





-- 原始邮件 --
发件人: r pp http://apache-flink.147419.n8.nabble.com/



-- 
Best,
 pp

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-06-02 文章 yujianbo
Hi,

确认的情况:
 
大佬,我根据loadCheckpointMeta能够去分析_metadata引用到的/shared目录下的sst文件,然后发现/shared里没有被引用的sst文件即未删除的旧文件,这些旧文件占比很少,只有5%左右。

配置:
   idleStateRetention确实是设置3600秒,保留的ck目录是3个。
目前情况:
 每次checkpoint size增量差不多1-2G,所以size不小。5分钟checkpoint一次。
 最近单次checkpoint的sst文件数是23个,文件大小大约在65M。
 总共目前/checkpoint/shared目录大小是49.4G,然后savepoint结束大小在6.3G。

那我想问一下,那中间是不是有大量的冗余数据存在这还在被应用的sst文件里,有没有什么参数可以调整?还是JM单点删除跟不上相关速度?
JM还能扩展吗??HA不是也是一个JM在工作吗(这个地方不是很明白)





Hi,

先确定一下,你的 idleStateRetention 是 3600秒?其次,要想看是否所有数据均有用,可以利用
Checkpoints.loadCheckpointMeta [1] 去加载你所保留的checkpoint目录下的 _metadata
文件,然后与当前checkpoint目录下的文件作对比,看是否存在大量的未删除旧文件。

目前仅凭你的描述和一段SQL代码其实很难判断。
可能存在的原因有:

  1.  单次checkpoint文件数目过多,JM单点删除跟不上相关速度
  2.  整体checkpoint
size不大,RocksDB的compaction频率较低,导致相关包含过期数据的旧文件并没有被及时删除,仍然存在checkpoint目录下(保留多个checkpoint同样会放大该问题)

所以还是最好先分析已有checkpoint的meta文件,再与目前的实际情况做对比。

另外,Flink
JIRA其实不是一个很好的直接问问题的地方,尤其是这种并不明确的问题,更妥善的方式还是先在邮件列表讨论,具体定位到原因后,再创建相关详细的ticket。

[1]
https://github.com/apache/flink/blob/b582991b8b2b8dadb89e71d5002c4a9cc2055e34/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L99

祝好
唐云

From: yujianbo <[hidden email]>
Sent: Tuesday, June 1, 2021 10:51
To: [hidden email] <[hidden email]>
Subject: Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

没有更好的方式吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复:flink1.12版本,yarn-application模式Flink web ui看不到日志

2021-06-02 文章 smq
你的意思是在log4j.properties中的配置吗,我门在这个里边配置了生成日志文件的格式,是在安装节点里加的,不过这个应该不是在webui里显示的。奇怪的一点是我们组有别的程序是正常的,但是一部分在webUI不显示日志。我们目前是从1.10升级到1.12,这种情况在1.12出现的





-- 原始邮件 --
发件人: r pp http://apache-flink.147419.n8.nabble.com/



-- 
Best,
 pp

pyflink kafka connector报错ByteArrayDeserializer

2021-06-02 文章 qianhuan
版本:
python 3.8
apache-flink   1.13.1
apache-flink-libraries 1.13.1

代码:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings

def log_processing():
env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
t_env = StreamTableEnvironment.create(stream_execution_environment=env,
environment_settings=env_settings)
t_env.get_config().get_configuration().\
set_string("pipeline.jars",
"file:///root/flink/jars/flink-connector-kafka_2.11-1.13.1.jar")

kafka_source_ddl = f"""
CREATE TABLE kafka_source_table(
a VARCHAR
) WITH (
  'connector.type' = 'kafka',
  'connector.version' = 'universal',
  'connector.topic' = 'test5',
  'connector.properties.bootstrap.servers' = 'localhost:9092',
  'connector.properties.zookeeper.connect' = 'localhost:2181',
  'connector.startup-mode' = 'latest-offset',
  'format.type' = 'json'
)
"""

kafka_sink_ddl = f"""
CREATE TABLE kafka_sink_table(
b VARCHAR
) WITH (
  'connector.type' = 'kafka',
  'connector.version' = 'universal',
  'connector.topic' = 'test6',
  'connector.properties.bootstrap.servers' = 'localhost:9092',
  'connector.properties.zookeeper.connect' = 'localhost:2181',
  'format.type' = 'json'
)
"""

t_env.execute_sql(kafka_source_ddl)
t_env.execute_sql(kafka_sink_ddl)
print("all_tables", t_env.list_tables())
t_env.sql_query("SELECT a FROM kafka_source_table") \
.execute_insert("kafka_sink_table").wait()

if __name__ == '__main__':
log_processing()

报错:
py4j.protocol.Py4JJavaError: An error occurred while calling
o65.executeInsert.
: java.lang.NoClassDefFoundError:
org/apache/kafka/common/serialization/ByteArrayDeserializer
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:322)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:223)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:154)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:139)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:108)
at
org.apache.flink.streaming.connectors.kafka.KafkaTableSource.createKafkaConsumer(KafkaTableSource.java:106)
at
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getKafkaConsumer(KafkaTableSourceBase.java:293)
at
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:194)
at
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacyTableSourceScan.translateToPlanInternal(CommonExecLegacyTableSourceScan.java:94)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:247)
at
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToTransformation(CommonExecLegacySink.java:172)
at
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToPlanInternal(CommonExecLegacySink.java:112)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:70)
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:69)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:69)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
at

Re: flink1.12版本,yarn-application模式Flink web ui看不到日志

2021-06-02 文章 r pp
嗨~  你们有没有改日志文件的名字

smq <374060...@qq.com> 于2021年6月2日周三 下午12:24写道:

> 你这个解决了吗,我也遇到了同样的问题
>
>
>
>
>
> -- 原始邮件 --
> 发件人: todd  发送时间: 2021年4月14日 19:11
> 收件人: user-zh  主题: 回复:flink1.12版本,yarn-application模式Flink web ui看不到日志
>
>
>
>  yarn上只有.out,.error的日志信息,但是从flink web ui的log框,无法显示日志内容。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 
Best,
  pp


flink sql cli 模式下,flink-conf.yaml 配置checkpoint无法生效

2021-06-02 文章 guozhi mang
各位好,我在flink1.13版本
的flink配置文件里配置checkpoint和savepoint参数时,相关配置项并没有生效,现将我的配置文件信息和日志放在下文。

*配置文件*
#==
# Fault tolerance and checkpointing
#==

# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# .
#

# Directory for checkpoints filesystem, when using any of the default
bundled
# state backends.
#
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
execution.checkpointing.interval: 1
state.backend: filesystem
state.checkpoints.dir: file:///opt/xxx/flink-1.13.0/savepoint/checkpoints
state.savepoints.dir: file:///opt/xxx/flink-1.13.0/savepoint/savepoints
# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints

# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend).
#
state.backend.incremental: false

# The failover strategy, i.e., how the job computation recovers from task
failures.
# Only restart tasks that may have been affected by the task failure, which
typically includes
# downstream tasks and potentially upstream tasks if their produced data is
no longer available for consumption.

jobmanager.execution.failover-strategy: region

#==

*服务器log日志:*

flink-root-sql-client-xxx03.log


2021-06-02 11:29:57,566 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: execution.checkpointing.interval, 1
2021-06-02 11:29:57,566 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: state.backend, filesystem
2021-06-02 11:29:57,566 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: state.checkpoints.dir,
file:///opt/xxx/flink-1.13.0/savepoint/checkpoints
2021-06-02 11:29:57,566 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: state.savepoints.dir,
file:///opt/xxx/flink-1.13.0/savepoint/savepoints
2021-06-02 11:29:57,566 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: state.backend.incremental, false
2021-06-02 11:29:57,566 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: jobmanager.execution.failover-strategy, region
2021-06-02 11:29:57,567 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: rest.bind-port, 8086
2021-06-02 11:29:57,596 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli
   [] - Found Yarn properties file under
/tmp/.yarn-properties-root.
2021-06-02 11:29:57,998 INFO
 org.apache.flink.table.client.config.entries.ExecutionEntry  [] - Property
'execution.restart-strategy.type' not specified. Using default value:
fallback
2021-06-02 11:29:58,028 INFO
 org.apache.flink.table.client.gateway.context.DefaultContext [] - Executor
config: {execution.savepoint.ignore-unclaimed-state=false,
execution.attached=true, yarn.application.id=xxx,
execution.shutdown-on-attached-exit=false,
pipeline.jars=[file:/opt/xxx/flink-1.13.0/opt/flink-sql-client_2.11-1.13.0.jar],
high-availability.cluster-id=application_1620482572059_3697,
pipeline.classpaths=[], execution.target=yarn-session,
$internal.deployment.config-dir=/opt/xxx/flink-1.13.0/conf}
2021-06-02 11:30:02,947 WARN
 org.apache.flink.yarn.configuration.YarnLogConfigUtil[] - The
configuration directory ('/opt/xxx/flink-1.13.0/conf') already contains a
LOG4J config file.If you want to use logback, then please delete or rename
the log configuration file.
2021-06-02 11:30:02,986 WARN  org.apache.flink.runtime.util.HadoopUtils
   [] - Could not find Hadoop configuration via any of the
supported methods (Flink configuration, environment variables).
2021-06-02 11:30:03,391 INFO  org.apache.flink.yarn.YarnClusterDescriptor
   [] - No path for the flink jar passed. Using the location of
class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-06-02 11:30:03,393 WARN  org.apache.flink.yarn.YarnClusterDescriptor
   [] - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR
environment variable is set.The Flink YARN Client needs one of these to be
set to properly load the Hadoop configuration for accessing YARN.
2021-06-02 11:30:03,445 INFO
 org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider [] -
Failing over to rm236
2021-06-02 11:30:03,502 INFO  org.apache.flink.yarn.YarnClusterDescriptor
   [] - Found Web Interface xxx:54194 of application 'xxx


*通过web ui观察到的日志:*

2021-05-28 15:32:30,673 INFO