Hi,tison
感谢提供思路。当前版本flink1.10,测试发现在yarn web ui点击左上角kill,无法触发删除。通过flink web
ui中的cancel按钮以及 官方建议的停止job 的方式(echo "stop" | ./bin/yarn-session.sh -id
application_Id)是可以实现停止任务即可清除文件。
之前没有清除的文件是因为在yarn web ui直接点击kill。
调用栈:
org.apache.flink.yarn.Utils.deleteApplicationFiles:214
谢谢!
我摘录的是flink1.11.2版本文档最后那部分:Background / Internals,介绍flink 如何在yarn上运行的
的内容:https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html
。感觉版本比较新了,应该没有问题吧,也是我们生产上在用的版本。1.12版本中没有找到相关内容。
仔细看了下文档,可能是我对flink on yarn的理解不太清楚,还是有几个问题请教下:
①flink on yarn模式下,jobmanager 和
publicget/set()
----
??: "??"
我看Flink的要求是public,每个属性要么public,要么有getter/setter。估计内嵌的属性也会递归检查的。
℡小新的蜡笔不见嘞、 <1515827...@qq.com> 于2021年2月3日周三 下午1:52写道:
> 你好,我们是否可以通过对该类LinkedHashMap进行包装来实现当前功能呢?如果你需要PojoSerializer来序列化数据的话。
>
>
>
>
> --原始邮件--
> 发件人: "赵一旦" 发送时间: 2021年2月3日(星期三) 中午1:24
> 收件人:
LinkedHashMapPojoSerializer??
----
??: "??"
如题,按照flink对POJO的定义,感觉还是比较严格的。
我有个类是继承了LinkedHashMap的,就被当作GenericType了。像这种情况,我没办法去修改LinkedHashMap实现,同时也不好不继承。因为我一个实体是动态扩展,不清楚有多少属性的,需要json方式反序列化到Map类型上。
Hi:
Hope you are doing well!
My UDF always running in a long time, so I'm wondering, how to set timeout for
UDF in Pyflink, in order to auto-stop the execution when it running in a long
time.
Many Thanks!
Hi,
报错的原因是你函数逻辑实际上是一个aggregate function的语义, 不是scalar function的语义。
scalar function要求的是一进一出,输入输出的数量是保持一致的,pandas
udf只是利用了pandas的batch特性,把数据封装成了一个batch的series给你,但你实际上用还是得保持输入输出数量一致。比如你输入的是pd.Series([1,2,3]),你执行完+1操作之后,结果就是pd.Series([2,3,4]),两个series的长度是保持一致的,都是3。
使用阿里的clickhouse connector 报找不到factory 大佬们遇到吗
现在flnk sql写入clickhouse 除了使用阿里的 还有别的方式吗?
org.apache.flink.table.api.ValidationException: Could not find any factory for
identifier 'clickhouse' that implements
'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.
|
Hi
可以分解为两步:
1、生成JobGraph,可以参考org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils
中 toJobGraph()
2、向Yarn提交JobGraph,可以参考org.apache.flink.yarn.YarnClusterDescriptor 中
deployJobCluster()
注:1.11.x
Jacob wrote
> 有一个模糊的需求,不知道是否合理
>
> 目前我们的实时计算的Job都是以On
@Kezhu Wang Hi.
最近序列化相关问题遇到好多,如上这个是因为LongAdder非public,这个简单覆盖倒是也能解决。
但是我还遇到好多关于kryo序列化问题,比如我停任务(stop -p)的时候,会在保存点成功的瞬间报错,如何开始进入restarting状态。
报的是kryo的错误:
2021-02-03 11:00:54
com.esotericsoftware.kryo.KryoException: Unable to find class: eU
at
# 定义计算逻辑函数
@udf(input_types=DataTypes.DECIMAL(38,18,True),
result_type=DataTypes.DECIMAL(38,18,True), udf_type="pandas")
def multi_production(yldrate):
yldrate_1 = yldrate + 1
return np.prod(yldrate_1) - 1
调用:env.sql_query('SELECT multi_production(YLDRATE) FROM query_result')
如题,在RichFunction中如何获取输入元素类型。TypeInformation。
目前这部分信息封在transformation中,在function层面貌似没有。
function中需要用到,如果可以获取,可以省略一个传参。
抱歉,报错信息理解错误,问题已经解决,感谢大佬。
在 2021-02-03 10:23:32,"肖越" <18242988...@163.com> 写道:
>pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积,
>结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。
>结果print报错:
>Traceback (most recent call last):
> File "C:*/udtf_test.py", line 42, in
>
抱歉,报错信息理解错误,问题已经解决,感谢大佬。
在 2021-02-03 10:16:38,"肖越" <18242988...@163.com> 写道:
pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积,
结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。
结果print报错:
Traceback (most recent call last):
File "C:*/udtf_test.py", line 42, in
Hi,
报错信息说了最少需要79m,我看你代码配成0m,当然还是继续报错呀
Best,
Xingbo
肖越 <18242988...@163.com> 于2021年2月3日周三 上午10:24写道:
> pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积,
> 结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。
> 结果print报错:
> Traceback (most recent call last):
> File
Hi,
第一个问题应该是通过你现在的配置找不到对应的KDC realm, 可以继续尝试使用System.setProperty手动配置, 例如
System.setProperty("java.security.krb5.realm", "");
System.setProperty("java.security.krb5.kdc","”);
第二个问题,
'update-mode’=‘append'指的是只接受来自上游算子的append消息,而不是写文件时采用append模式。我想你可能想要配置的属性是'format.write-mode’='OVERWRITE’?
pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积,
结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。
结果print报错:
Traceback (most recent call last):
File "C:*/udtf_test.py", line 42, in
env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE)
FROM
我通过实验确认这是升级 MySql JDBC Driver 8.0.23 造成的,回到 MySql JDBC Driver
8.0.22,就没有问题,我提交了 issue
https://issues.apache.org/jira/browse/FLINK-21240
--
Sent from: http://apache-flink.147419.n8.nabble.com/
hi
1,11 也是支持的
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/
如题,报错如下。
2021-02-02 20:44:19
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID:
95
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at
??
??hdfs??sink??
??
CREATE TABLE csvTableSink ( id BIGINT,name STRING) WITH ('connector.path'=
'hdfs://hacluster/flink/qyq_qyq13','connector.type'='filesystem','format.type'='csv','update-mode'
= 'append')
flink版本是1.11,checkpoint配置是:
pipeline.time-characteristic EventTime
execution.checkpointing.interval 60
execution.checkpointing.min-pause 12
execution.checkpointing.timeout 12
execution.checkpointing.externalized-checkpoint-retention
RETAIN_ON_CANCELLATION
state.backend rocksdb
你之前的理解是正确的。Yarn 的 AM 就是 Flink 的 JM。
你看到的文档描述是有问题的。我查了一下 git history,你所摘录的内容 2014 年撰写的,描述的应该是项目初期的 on yarn
部署方式,早已经过时了。这部分内容在最新的 1.12 版本文档中已经被移除了。
Thank you~
Xintong Song
On Tue, Feb 2, 2021 at 6:43 PM lp <973182...@qq.com> wrote:
>
Hi??
??flinkkafka??hdfs??
或者说,我知道,对于MapReduce任务,ApplicationMaster的实现是MRAppMaster,那flink on yarn
,ApplicationMaster对应的实现是啥?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
flink-parcel是什么提交方式,能详细发下么。如果采用per-job mode 或者application mode ,各个job的flink
集群在yarn上是独立的,kill一个job并不会影响宁一个
--
Sent from: http://apache-flink.147419.n8.nabble.com/
flink on yarn中,yarn的applicationMaster和flink
JobManager的关系是啥,我对yarn不是很熟悉,之前的理解是
JobManager就是yarn中的applicationMaster的角色。但我在官网中看到如下摘录:...Once that has
finished, the ApplicationMaster (AM) is started.The JobManager and AM are
running in the same container. Once they successfully started, the AM knows
flink版本:1.10
在写HDFS时报如下异常,并且出现这个异常后,之后的所有checkpoint都会出现此异常,任务下线重启后恢复正常
请问这是咋回事呢?
level
:WARN
location
:org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:796)
log
今天在使用Flink 1.11.3版本使用Flink SQL将kafka中数据导入到HDFS上时提示如下的错误
Caused by: org.apache.flink.table.api.TableException: Could not load service
provider for factories.
at
org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:346)
at
有一个模糊的需求,不知道是否合理
目前我们的实时计算的Job都是以On Yarn模式运行在hadoop集群,每次提交新的job,都是在Flink客户端下面,用./bin/flink
run-application -t yarn-application ... 的形式去提交Job。
现在我们有自研的一个关于数据处理平台,flink
job是数据处理的一个环节,想着能不能在我们系统的portal中配一个菜单,上传flink项目的jar包,可以提交Job到hadoop集群,形成一体化的管理,不用每次去一个flink客户端下面去提交了,不知道这种需求是否合理?
Hi
你 flink 是什么版本,以及你作业 checkpoint/state 相关的配置是什么呢?如果可以的话,把完整的 jm log 发一下
Best,
Congxian
chen310 <1...@163.com> 于2021年2月1日周一 下午5:41写道:
> 补充下,jobmanager日志异常:
>
> 2021-02-01 08:54:43,639 ERROR
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler [] - Exception
> occurred in REST
32 matches
Mail list logo