回复:flink 1.12 on yarn WebUI不显示logs
加了一些配置,一些程序有日志,一些没有日志 -- 原始邮件 -- 发件人: JasonLee <17610775...@163.com 发送时间: 2021年6月3日 12:44 收件人: user-zh http://apache-flink.147419.n8.nabble.com/
Re: flink 1.12 on yarn WebUI不显示logs
hi 有改过默认的日志配置文件吗? - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?
好的非常感谢,我拿几个任务测试一波,看看性能能不能接受! Hi, 没有被引用的文件可能也不是完全无用的,可能是当前pending checkpoint正在上传的,所以还需要比较一下那些不在checkpoint meta内的文件的修改时间戳,可能比你分析的complete checkpoint的时间戳要大。 总体上来说,我不认为这个问题是一个bug,这个是LSM架构的DB的空间放大问题。如果你对空间放大非常担心的话,可以启用 dynamic level [1] 来严格控制空间放大,不过这个可能会影响写放大和读放大,导致性能受到一定影响。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-compaction-level-use-dynamic-size 祝好 唐云 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?
Hi, 没有被引用的文件可能也不是完全无用的,可能是当前pending checkpoint正在上传的,所以还需要比较一下那些不在checkpoint meta内的文件的修改时间戳,可能比你分析的complete checkpoint的时间戳要大。 总体上来说,我不认为这个问题是一个bug,这个是LSM架构的DB的空间放大问题。如果你对空间放大非常担心的话,可以启用 dynamic level [1] 来严格控制空间放大,不过这个可能会影响写放大和读放大,导致性能受到一定影响。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-compaction-level-use-dynamic-size 祝好 唐云 From: yujianbo <15205029...@163.com> Sent: Wednesday, June 2, 2021 15:29 To: user-zh@flink.apache.org Subject: Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制? Hi, 确认的情况: 大佬,我根据loadCheckpointMeta能够去分析_metadata引用到的/shared目录下的sst文件,然后发现/shared里没有被引用的sst文件即未删除的旧文件,这些旧文件占比很少,只有5%左右。 配置: idleStateRetention确实是设置3600秒,保留的ck目录是3个。 目前情况: 每次checkpoint size增量差不多1-2G,所以size不小。5分钟checkpoint一次。 最近单次checkpoint的sst文件数是23个,文件大小大约在65M。 总共目前/checkpoint/shared目录大小是49.4G,然后savepoint结束大小在6.3G。 那我想问一下,那中间是不是有大量的冗余数据存在这还在被应用的sst文件里,有没有什么参数可以调整?还是JM单点删除跟不上相关速度? JM还能扩展吗??HA不是也是一个JM在工作吗(这个地方不是很明白) Hi, 先确定一下,你的 idleStateRetention 是 3600秒?其次,要想看是否所有数据均有用,可以利用 Checkpoints.loadCheckpointMeta [1] 去加载你所保留的checkpoint目录下的 _metadata 文件,然后与当前checkpoint目录下的文件作对比,看是否存在大量的未删除旧文件。 目前仅凭你的描述和一段SQL代码其实很难判断。 可能存在的原因有: 1. 单次checkpoint文件数目过多,JM单点删除跟不上相关速度 2. 整体checkpoint size不大,RocksDB的compaction频率较低,导致相关包含过期数据的旧文件并没有被及时删除,仍然存在checkpoint目录下(保留多个checkpoint同样会放大该问题) 所以还是最好先分析已有checkpoint的meta文件,再与目前的实际情况做对比。 另外,Flink JIRA其实不是一个很好的直接问问题的地方,尤其是这种并不明确的问题,更妥善的方式还是先在邮件列表讨论,具体定位到原因后,再创建相关详细的ticket。 [1] https://github.com/apache/flink/blob/b582991b8b2b8dadb89e71d5002c4a9cc2055e34/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L99 祝好 唐云 From: yujianbo <[hidden email]> Sent: Tuesday, June 1, 2021 10:51 To: [hidden email] <[hidden email]> Subject: Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制? 没有更好的方式吗 -- Sent from: http://apache-flink.147419.n8.nabble.com/
flink jar包任务如何读取hdfs配置文件
rt flink jar任务 在一个yarn集群上运行 要将数据写入另一个hdfs集群 会报错 Caused by: java.net.UnknownHostException: xxx-hdfs 有办法让jar包任务读取加载该集群的配置文件么 -- Sent from: http://apache-flink.147419.n8.nabble.com/
??????flink postgres jdbc catalog????????????
??postgresql??mysql??1.11.1?? ---- ??: "user-zh"
flink postgres jdbc catalog是只读的吗?
flink postgres jdbc catalog是只读的吗?能写的catalog 除了Hive Catalog还有哪些?社区什么时候会有Mysql JDBC Catalog呢?
flink sql作业表定义部分字段问题
有一个flink sql mysql-cdc作业,从kafka消费canal-json格式的binlog数据,打到下游存储,遇到一个问题:上游源表字段是会动态添加的,而我的sql table定义是静态的,如果上游schema发生变化,我的作业就会失败。在flink sql中是否可以针对目标数据源只定义用到的字段?如果可以的话应该怎么实现?现在是会抛类似下面的error。意思是原本包含43列的数据表,在我的DDL中只定义了其中的4列。有哪些格式是支持定义部分字段的呢? 21/06/02 18:54:22 [Source: TableSourceScan(table=[[default_catalog, default_database, charge_log]], fields=[id, charge_id, trace_id, app_id]) -> Sink: Sink(table=[default_catalog.default_database.print_table], fields=[id, charge_id, trace_id, app_id]) (3/12)#0] WARN taskmanager.Task: Source: TableSourceScan(table=[[default_catalog, default_database, charge_log]], fields=[id, charge_id, trace_id, app_id]) -> Sink: Sink(table=[default_catalog.default_database.print_table], fields=[id, charge_id, trace_id, app_id]) (3/12)#0 (8810adcd7960cb22a6954c985ba49d0d) switched from RUNNING to FAILED. java.lang.IllegalArgumentException: Row arity: 43, but serializer arity: 4 at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:124) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
怎么避免flink sql cdc作业重启后重新从头开始消费binlog?
我有一个如下flink sql cdc作业,设置了'scan.startup.mode' = 'latest-offset'。但在作业重启后发现它又从头开始消费binlog,导致sink下游数据库频繁报duplicate key error,有什么办法可以避免吗? CREATE TABLE `mysql_source` ( `id` STRING, `acct_id` STRING, `acct_name` STRING, `acct_type` STRING, `acct_bal` STRING, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'mysql', 'password' = 'mysql', 'database-name' = 'test', 'scan.startup.mode' = 'latest-offset', 'table-name' = 'test', 'server-time-zone' = 'Asia/Shanghai' );
Re: pyflink kafka connector报错ByteArrayDeserializer
要用fat jar: https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.13.1/flink-sql-connector-kafka_2.11-1.13.1.jar > 2021年6月2日 下午2:43,qianhuan <819687...@qq.com> 写道: > > 版本: > python 3.8 > apache-flink 1.13.1 > apache-flink-libraries 1.13.1 > > 代码: > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import StreamTableEnvironment, EnvironmentSettings > > def log_processing(): >env = StreamExecutionEnvironment.get_execution_environment() >env_settings = EnvironmentSettings.Builder().use_blink_planner().build() >t_env = StreamTableEnvironment.create(stream_execution_environment=env, > environment_settings=env_settings) >t_env.get_config().get_configuration().\ >set_string("pipeline.jars", > "file:///root/flink/jars/flink-connector-kafka_2.11-1.13.1.jar") > >kafka_source_ddl = f""" >CREATE TABLE kafka_source_table( >a VARCHAR >) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'test5', > 'connector.properties.bootstrap.servers' = 'localhost:9092', > 'connector.properties.zookeeper.connect' = 'localhost:2181', > 'connector.startup-mode' = 'latest-offset', > 'format.type' = 'json' >) >""" > >kafka_sink_ddl = f""" >CREATE TABLE kafka_sink_table( >b VARCHAR >) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'test6', > 'connector.properties.bootstrap.servers' = 'localhost:9092', > 'connector.properties.zookeeper.connect' = 'localhost:2181', > 'format.type' = 'json' >) >""" > >t_env.execute_sql(kafka_source_ddl) >t_env.execute_sql(kafka_sink_ddl) >print("all_tables", t_env.list_tables()) >t_env.sql_query("SELECT a FROM kafka_source_table") \ >.execute_insert("kafka_sink_table").wait() > > if __name__ == '__main__': >log_processing() > > 报错: > py4j.protocol.Py4JJavaError: An error occurred while calling > o65.executeInsert. > : java.lang.NoClassDefFoundError: > org/apache/kafka/common/serialization/ByteArrayDeserializer > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:322) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:223) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:154) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:139) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:108) > at > org.apache.flink.streaming.connectors.kafka.KafkaTableSource.createKafkaConsumer(KafkaTableSource.java:106) > at > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getKafkaConsumer(KafkaTableSourceBase.java:293) > at > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:194) > at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacyTableSourceScan.translateToPlanInternal(CommonExecLegacyTableSourceScan.java:94) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:247) > at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToTransformation(CommonExecLegacySink.java:172) > at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToPlanInternal(CommonExecLegacySink.java:112) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:70) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:69) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at >
flink 1.12 on yarn WebUI不显示logs
大家好 目前在测试1.12.1版本时遇到个问题,提交程序之后在WebUI查看日志,jobmanager和taskmanager logs都不显示,按f12查看,logs 是404,并响应 errors :this file does not exist in Jobmanager log dir 这是在启动过程中没有创建日志吗,不清楚什么原因导致的。
Re: Re: Flink SQL 1.11.3问题请教
我这个情况还有点不一样的,本来单条数据是如下的:一条数据对应一个offset {"name":"test1"} 但是Nifi采集数据后,写入kafka格式是下面这样的,一个offset对应下面几条数据(每一个offset对应的真实数据条数还不是固定的) {"name":"test1"} {"name":"test2"} {"name":"test3"} ... 感谢你的回复,我借鉴下看怎么处理下,多谢了! yinghua...@163.com 发件人: WeiXubin 发送时间: 2021-06-02 17:44 收件人: user-zh 主题: Re: Flink SQL 1.11.3问题请教 不知道下面场景是否与你描述的场景相同 ,假设采集到单条json消息格式为 {"name":"test"},将多条json消息合并为一条的格式为 [{"name":"test"},{"name":"test2"}]。 我的 Flink 任务是采用 FlinkSQL 编写,处理这种情况我的方式是通过编写一个 UDF (TableFunction), 之后在这个 UDF 中进行数据拆解,变为多行 (row),再输出到 sink。 Row row = new Row(arity); collect(row); 具体使用可参考:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/ Best,Weixubin -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 关于 flinksql 维表的问题
你好,可以麻烦详细描述一下吗? 谢谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink SQL 1.11.3问题请教
不知道下面场景是否与你描述的场景相同 ,假设采集到单条json消息格式为 {"name":"test"},将多条json消息合并为一条的格式为 [{"name":"test"},{"name":"test2"}]。 我的 Flink 任务是采用 FlinkSQL 编写,处理这种情况我的方式是通过编写一个 UDF (TableFunction), 之后在这个 UDF 中进行数据拆解,变为多行 (row),再输出到 sink。 Row row = new Row(arity); collect(row); 具体使用可参考:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/ Best,Weixubin -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink自定义connector相关报错
你好,还有完整信息么?一般逻辑是,先找文件,然后匹配属性。麻烦将完整日志输出出来看看。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
回复: flink自定义connector相关报错
Sorry, 工程下路径是没错的,是我发邮件时打错了,还可能是什么原因导致的呢?谢谢 src/main/resources/META-INF/services | | MOBIN | 签名由网易邮箱大师定制 在2021年06月2日 17:05,Leonard Xu 写道: 路径错了 在 2021年6月2日,17:02,MOBIN <18814118...@163.com> 写道: META-INF.services/org.apache.flink.table.factories.Factory => META-INF/services/org.apache.flink.table.factories.Factory 祝好 Leonard
Re: flink自定义connector相关报错
路径错了 > 在 2021年6月2日,17:02,MOBIN <18814118...@163.com> 写道: > > META-INF.services/org.apache.flink.table.factories.Factory => META-INF/services/org.apache.flink.table.factories.Factory 祝好 Leonard
flink自定义connector相关报错
请教下,在自定义connector,IDEA上直接运demo时报了以下的错误: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' in the classpath. 工程的resource目录也准确引入了META-INF.services/org.apache.flink.table.factories.Factory,但是感觉没生效一样 Tabel-common依赖也已经引入了 谢谢 | | MOBIN | 签名由网易邮箱大师定制
Re: pyflink kafka connector报错ByteArrayDeserializer
是不是connector版本问题,之前1.12.2可以跑,有没有大神帮忙看下 -- Sent from: http://apache-flink.147419.n8.nabble.com/
回复:flink1.12版本,yarn-application模式Flink web ui看不到日志
请问你是怎么解决这个问题的 -- 原始邮件 -- 发件人: r pp http://apache-flink.147419.n8.nabble.com/ -- Best, pp
Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?
Hi, 确认的情况: 大佬,我根据loadCheckpointMeta能够去分析_metadata引用到的/shared目录下的sst文件,然后发现/shared里没有被引用的sst文件即未删除的旧文件,这些旧文件占比很少,只有5%左右。 配置: idleStateRetention确实是设置3600秒,保留的ck目录是3个。 目前情况: 每次checkpoint size增量差不多1-2G,所以size不小。5分钟checkpoint一次。 最近单次checkpoint的sst文件数是23个,文件大小大约在65M。 总共目前/checkpoint/shared目录大小是49.4G,然后savepoint结束大小在6.3G。 那我想问一下,那中间是不是有大量的冗余数据存在这还在被应用的sst文件里,有没有什么参数可以调整?还是JM单点删除跟不上相关速度? JM还能扩展吗??HA不是也是一个JM在工作吗(这个地方不是很明白) Hi, 先确定一下,你的 idleStateRetention 是 3600秒?其次,要想看是否所有数据均有用,可以利用 Checkpoints.loadCheckpointMeta [1] 去加载你所保留的checkpoint目录下的 _metadata 文件,然后与当前checkpoint目录下的文件作对比,看是否存在大量的未删除旧文件。 目前仅凭你的描述和一段SQL代码其实很难判断。 可能存在的原因有: 1. 单次checkpoint文件数目过多,JM单点删除跟不上相关速度 2. 整体checkpoint size不大,RocksDB的compaction频率较低,导致相关包含过期数据的旧文件并没有被及时删除,仍然存在checkpoint目录下(保留多个checkpoint同样会放大该问题) 所以还是最好先分析已有checkpoint的meta文件,再与目前的实际情况做对比。 另外,Flink JIRA其实不是一个很好的直接问问题的地方,尤其是这种并不明确的问题,更妥善的方式还是先在邮件列表讨论,具体定位到原因后,再创建相关详细的ticket。 [1] https://github.com/apache/flink/blob/b582991b8b2b8dadb89e71d5002c4a9cc2055e34/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L99 祝好 唐云 From: yujianbo <[hidden email]> Sent: Tuesday, June 1, 2021 10:51 To: [hidden email] <[hidden email]> Subject: Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制? 没有更好的方式吗 -- Sent from: http://apache-flink.147419.n8.nabble.com/
回复:flink1.12版本,yarn-application模式Flink web ui看不到日志
你的意思是在log4j.properties中的配置吗,我门在这个里边配置了生成日志文件的格式,是在安装节点里加的,不过这个应该不是在webui里显示的。奇怪的一点是我们组有别的程序是正常的,但是一部分在webUI不显示日志。我们目前是从1.10升级到1.12,这种情况在1.12出现的 -- 原始邮件 -- 发件人: r pp http://apache-flink.147419.n8.nabble.com/ -- Best, pp
pyflink kafka connector报错ByteArrayDeserializer
版本: python 3.8 apache-flink 1.13.1 apache-flink-libraries 1.13.1 代码: from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, EnvironmentSettings def log_processing(): env = StreamExecutionEnvironment.get_execution_environment() env_settings = EnvironmentSettings.Builder().use_blink_planner().build() t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=env_settings) t_env.get_config().get_configuration().\ set_string("pipeline.jars", "file:///root/flink/jars/flink-connector-kafka_2.11-1.13.1.jar") kafka_source_ddl = f""" CREATE TABLE kafka_source_table( a VARCHAR ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'test5', 'connector.properties.bootstrap.servers' = 'localhost:9092', 'connector.properties.zookeeper.connect' = 'localhost:2181', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' ) """ kafka_sink_ddl = f""" CREATE TABLE kafka_sink_table( b VARCHAR ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'test6', 'connector.properties.bootstrap.servers' = 'localhost:9092', 'connector.properties.zookeeper.connect' = 'localhost:2181', 'format.type' = 'json' ) """ t_env.execute_sql(kafka_source_ddl) t_env.execute_sql(kafka_sink_ddl) print("all_tables", t_env.list_tables()) t_env.sql_query("SELECT a FROM kafka_source_table") \ .execute_insert("kafka_sink_table").wait() if __name__ == '__main__': log_processing() 报错: py4j.protocol.Py4JJavaError: An error occurred while calling o65.executeInsert. : java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:322) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:223) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:154) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:139) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:108) at org.apache.flink.streaming.connectors.kafka.KafkaTableSource.createKafkaConsumer(KafkaTableSource.java:106) at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getKafkaConsumer(KafkaTableSourceBase.java:293) at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:194) at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacyTableSourceScan.translateToPlanInternal(CommonExecLegacyTableSourceScan.java:94) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:247) at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToTransformation(CommonExecLegacySink.java:172) at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToPlanInternal(CommonExecLegacySink.java:112) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:70) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:69) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:69) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518) at
Re: flink1.12版本,yarn-application模式Flink web ui看不到日志
嗨~ 你们有没有改日志文件的名字 smq <374060...@qq.com> 于2021年6月2日周三 下午12:24写道: > 你这个解决了吗,我也遇到了同样的问题 > > > > > > -- 原始邮件 -- > 发件人: todd 发送时间: 2021年4月14日 19:11 > 收件人: user-zh 主题: 回复:flink1.12版本,yarn-application模式Flink web ui看不到日志 > > > > yarn上只有.out,.error的日志信息,但是从flink web ui的log框,无法显示日志内容。 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Best, pp
flink sql cli 模式下,flink-conf.yaml 配置checkpoint无法生效
各位好,我在flink1.13版本 的flink配置文件里配置checkpoint和savepoint参数时,相关配置项并没有生效,现将我的配置文件信息和日志放在下文。 *配置文件* #== # Fault tolerance and checkpointing #== # The backend that will be used to store operator state checkpoints if # checkpointing is enabled. # # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the # . # # Directory for checkpoints filesystem, when using any of the default bundled # state backends. # # state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints execution.checkpointing.interval: 1 state.backend: filesystem state.checkpoints.dir: file:///opt/xxx/flink-1.13.0/savepoint/checkpoints state.savepoints.dir: file:///opt/xxx/flink-1.13.0/savepoint/savepoints # Default target directory for savepoints, optional. # # state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints # Flag to enable/disable incremental checkpoints for backends that # support incremental checkpoints (like the RocksDB state backend). # state.backend.incremental: false # The failover strategy, i.e., how the job computation recovers from task failures. # Only restart tasks that may have been affected by the task failure, which typically includes # downstream tasks and potentially upstream tasks if their produced data is no longer available for consumption. jobmanager.execution.failover-strategy: region #== *服务器log日志:* flink-root-sql-client-xxx03.log 2021-06-02 11:29:57,566 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: execution.checkpointing.interval, 1 2021-06-02 11:29:57,566 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.backend, filesystem 2021-06-02 11:29:57,566 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.checkpoints.dir, file:///opt/xxx/flink-1.13.0/savepoint/checkpoints 2021-06-02 11:29:57,566 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.savepoints.dir, file:///opt/xxx/flink-1.13.0/savepoint/savepoints 2021-06-02 11:29:57,566 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.backend.incremental, false 2021-06-02 11:29:57,566 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.execution.failover-strategy, region 2021-06-02 11:29:57,567 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: rest.bind-port, 8086 2021-06-02 11:29:57,596 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-root. 2021-06-02 11:29:57,998 INFO org.apache.flink.table.client.config.entries.ExecutionEntry [] - Property 'execution.restart-strategy.type' not specified. Using default value: fallback 2021-06-02 11:29:58,028 INFO org.apache.flink.table.client.gateway.context.DefaultContext [] - Executor config: {execution.savepoint.ignore-unclaimed-state=false, execution.attached=true, yarn.application.id=xxx, execution.shutdown-on-attached-exit=false, pipeline.jars=[file:/opt/xxx/flink-1.13.0/opt/flink-sql-client_2.11-1.13.0.jar], high-availability.cluster-id=application_1620482572059_3697, pipeline.classpaths=[], execution.target=yarn-session, $internal.deployment.config-dir=/opt/xxx/flink-1.13.0/conf} 2021-06-02 11:30:02,947 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil[] - The configuration directory ('/opt/xxx/flink-1.13.0/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file. 2021-06-02 11:30:02,986 WARN org.apache.flink.runtime.util.HadoopUtils [] - Could not find Hadoop configuration via any of the supported methods (Flink configuration, environment variables). 2021-06-02 11:30:03,391 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2021-06-02 11:30:03,393 WARN org.apache.flink.yarn.YarnClusterDescriptor [] - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN. 2021-06-02 11:30:03,445 INFO org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider [] - Failing over to rm236 2021-06-02 11:30:03,502 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface xxx:54194 of application 'xxx *通过web ui观察到的日志:* 2021-05-28 15:32:30,673 INFO