Re: Flink 提交作业时的缓存可以删除吗

2021-02-02 文章 Robin Zhang
Hi,tison 感谢提供思路。当前版本flink1.10,测试发现在yarn web ui点击左上角kill,无法触发删除。通过flink web ui中的cancel按钮以及 官方建议的停止job 的方式(echo "stop" | ./bin/yarn-session.sh -id application_Id)是可以实现停止任务即可清除文件。 之前没有清除的文件是因为在yarn web ui直接点击kill。 调用栈: org.apache.flink.yarn.Utils.deleteApplicationFiles:214

Re: flink on yarn , JobManager和ApplicationMaster的关系

2021-02-02 文章 lp
谢谢! 我摘录的是flink1.11.2版本文档最后那部分:Background / Internals,介绍flink 如何在yarn上运行的 的内容:https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html 。感觉版本比较新了,应该没有问题吧,也是我们生产上在用的版本。1.12版本中没有找到相关内容。 仔细看了下文档,可能是我对flink on yarn的理解不太清楚,还是有几个问题请教下: ①flink on yarn模式下,jobmanager 和

?????? ??????????????POJO??????????????????????????????GenericType????

2021-02-02 文章 ?Y??????????????????
publicget/set() ---- ??: "??"

Re: 关于无法被看作POJO的类型,怎么转变可以让其不当作GenericType呢?

2021-02-02 文章 赵一旦
我看Flink的要求是public,每个属性要么public,要么有getter/setter。估计内嵌的属性也会递归检查的。 ℡小新的蜡笔不见嘞、 <1515827...@qq.com> 于2021年2月3日周三 下午1:52写道: > 你好,我们是否可以通过对该类LinkedHashMap进行包装来实现当前功能呢?如果你需要PojoSerializer来序列化数据的话。 > > > > > --原始邮件-- > 发件人: "赵一旦" 发送时间: 2021年2月3日(星期三) 中午1:24 > 收件人:

????????????????????POJO??????????????????????????????GenericType????

2021-02-02 文章 ?Y??????????????????
LinkedHashMapPojoSerializer?? ---- ??: "??"

关于无法被看作POJO的类型,怎么转变可以让其不当作GenericType呢?

2021-02-02 文章 赵一旦
如题,按照flink对POJO的定义,感觉还是比较严格的。 我有个类是继承了LinkedHashMap的,就被当作GenericType了。像这种情况,我没办法去修改LinkedHashMap实现,同时也不好不继承。因为我一个实体是动态扩展,不清楚有多少属性的,需要json方式反序列化到Map类型上。

PyFlink How to set timeout for UDF

2021-02-02 文章 苗红宾
Hi: Hope you are doing well! My UDF always running in a long time, so I'm wondering, how to set timeout for UDF in Pyflink, in order to auto-stop the execution when it running in a long time. Many Thanks!

Re: 测试pandas udf报错:AttributeError: 'decimal.Decimal' object has no attribute 'isnull'

2021-02-02 文章 Xingbo Huang
Hi, 报错的原因是你函数逻辑实际上是一个aggregate function的语义, 不是scalar function的语义。 scalar function要求的是一进一出,输入输出的数量是保持一致的,pandas udf只是利用了pandas的batch特性,把数据封装成了一个batch的series给你,但你实际上用还是得保持输入输出数量一致。比如你输入的是pd.Series([1,2,3]),你执行完+1操作之后,结果就是pd.Series([2,3,4]),两个series的长度是保持一致的,都是3。

flink clickhouse connector

2021-02-02 文章 阿华田
使用阿里的clickhouse connector 报找不到factory 大佬们遇到吗 现在flnk sql写入clickhouse 除了使用阿里的 还有别的方式吗? org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'clickhouse' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath. |

Re: Flink job与自己系统平台的一体化集成

2021-02-02 文章 冯嘉伟
Hi 可以分解为两步: 1、生成JobGraph,可以参考org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils 中 toJobGraph() 2、向Yarn提交JobGraph,可以参考org.apache.flink.yarn.YarnClusterDescriptor 中 deployJobCluster() 注:1.11.x Jacob wrote > 有一个模糊的需求,不知道是否合理 > > 目前我们的实时计算的Job都是以On

Re: 检查点成功,但从检查点恢复失败。使用了guava的bloomFilter,有人帮忙分析下吗?

2021-02-02 文章 赵一旦
@Kezhu Wang Hi. 最近序列化相关问题遇到好多,如上这个是因为LongAdder非public,这个简单覆盖倒是也能解决。 但是我还遇到好多关于kryo序列化问题,比如我停任务(stop -p)的时候,会在保存点成功的瞬间报错,如何开始进入restarting状态。 报的是kryo的错误: 2021-02-03 11:00:54 com.esotericsoftware.kryo.KryoException: Unable to find class: eU at

测试pandas udf报错:AttributeError: 'decimal.Decimal' object has no attribute 'isnull'

2021-02-02 文章 肖越
# 定义计算逻辑函数 @udf(input_types=DataTypes.DECIMAL(38,18,True), result_type=DataTypes.DECIMAL(38,18,True), udf_type="pandas") def multi_production(yldrate): yldrate_1 = yldrate + 1 return np.prod(yldrate_1) - 1 调用:env.sql_query('SELECT multi_production(YLDRATE) FROM query_result')

请问在richFunction中如何获取该function输入的元素类型呢?

2021-02-02 文章 赵一旦
如题,在RichFunction中如何获取输入元素类型。TypeInformation。 目前这部分信息封在transformation中,在function层面貌似没有。 function中需要用到,如果可以获取,可以省略一个传参。

Re:pyflink1.11 udf计算结果打印问题 Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb

2021-02-02 文章 肖越
抱歉,报错信息理解错误,问题已经解决,感谢大佬。 在 2021-02-03 10:23:32,"肖越" <18242988...@163.com> 写道: >pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积, >结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。 >结果print报错: >Traceback (most recent call last): > File "C:*/udtf_test.py", line 42, in >

Re:pyflink1.11 udf计算结果打印问题 The configured Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb

2021-02-02 文章 肖越
抱歉,报错信息理解错误,问题已经解决,感谢大佬。 在 2021-02-03 10:16:38,"肖越" <18242988...@163.com> 写道: pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积, 结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。 结果print报错: Traceback (most recent call last): File "C:*/udtf_test.py", line 42, in

Re: pyflink1.11 udf计算结果打印问题 Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb

2021-02-02 文章 Xingbo Huang
Hi, 报错信息说了最少需要79m,我看你代码配成0m,当然还是继续报错呀 Best, Xingbo 肖越 <18242988...@163.com> 于2021年2月3日周三 上午10:24写道: > pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积, > 结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。 > 结果print报错: > Traceback (most recent call last): > File

Re: pyflink连接kerberos的kafka问题

2021-02-02 文章 Wei Zhong
Hi, 第一个问题应该是通过你现在的配置找不到对应的KDC realm, 可以继续尝试使用System.setProperty手动配置, 例如 System.setProperty("java.security.krb5.realm", ""); System.setProperty("java.security.krb5.kdc","”); 第二个问题, 'update-mode’=‘append'指的是只接受来自上游算子的append消息,而不是写文件时采用append模式。我想你可能想要配置的属性是'format.write-mode’='OVERWRITE’?

pyflink1.11 udf计算结果打印问题 Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb

2021-02-02 文章 肖越
pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积, 结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。 结果print报错: Traceback (most recent call last): File "C:*/udtf_test.py", line 42, in env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) FROM

Re: flink 1.12 中如何读取 mysql datetime 字段

2021-02-02 文章 macdoor
我通过实验确认这是升级 MySql JDBC Driver 8.0.23 造成的,回到 MySql JDBC Driver 8.0.22,就没有问题,我提交了 issue https://issues.apache.org/jira/browse/FLINK-21240 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 关于kafka中csv格式数据字段分隔符请教

2021-02-02 文章 JasonLee
hi 1,11 也是支持的 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/

stop -p停并生产保存点,报错kyro错误。

2021-02-02 文章 赵一旦
如题,报错如下。 2021-02-02 20:44:19 com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 95 at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass( DefaultClassResolver.java:119) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) at

flink1.10??flink-sql,pyflink????hdfs??sink??????????append????????????????????????

2021-02-02 文章 ??????
?? ??hdfs??sink?? ?? CREATE TABLE csvTableSink ( id BIGINT,name STRING) WITH ('connector.path'= 'hdfs://hacluster/flink/qyq_qyq13','connector.type'='filesystem','format.type'='csv','update-mode' = 'append')

Re: flink做checkpoint失败 Checkpoint Coordinator is suspending.

2021-02-02 文章 chen310
flink版本是1.11,checkpoint配置是: pipeline.time-characteristic EventTime execution.checkpointing.interval 60 execution.checkpointing.min-pause 12 execution.checkpointing.timeout 12 execution.checkpointing.externalized-checkpoint-retention RETAIN_ON_CANCELLATION state.backend rocksdb

Re: flink on yarn , JobManager和ApplicationMaster的关系

2021-02-02 文章 Xintong Song
你之前的理解是正确的。Yarn 的 AM 就是 Flink 的 JM。 你看到的文档描述是有问题的。我查了一下 git history,你所摘录的内容 2014 年撰写的,描述的应该是项目初期的 on yarn 部署方式,早已经过时了。这部分内容在最新的 1.12 版本文档中已经被移除了。 Thank you~ Xintong Song On Tue, Feb 2, 2021 at 6:43 PM lp <973182...@qq.com> wrote: >

flink??hdfs ????????kerberos??????

2021-02-02 文章 ??????
Hi?? ??flinkkafka??hdfs??

Re: flink on yarn , JobManager和ApplicationMaster的关系

2021-02-02 文章 lp
或者说,我知道,对于MapReduce任务,ApplicationMaster的实现是MRAppMaster,那flink on yarn ,ApplicationMaster对应的实现是啥? -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink-parcel使用求助

2021-02-02 文章 lp
flink-parcel是什么提交方式,能详细发下么。如果采用per-job mode 或者application mode ,各个job的flink 集群在yarn上是独立的,kill一个job并不会影响宁一个 -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink on yarn , JobManager和ApplicationMaster的关系

2021-02-02 文章 lp
flink on yarn中,yarn的applicationMaster和flink JobManager的关系是啥,我对yarn不是很熟悉,之前的理解是 JobManager就是yarn中的applicationMaster的角色。但我在官网中看到如下摘录:...Once that has finished, the ApplicationMaster (AM) is started.The JobManager and AM are running in the same container. Once they successfully started, the AM knows

写hdfs异常 org.apache.hadoop.fs.FileAlreadyExistsException

2021-02-02 文章 jiangjiguang719
flink版本:1.10 在写HDFS时报如下异常,并且出现这个异常后,之后的所有checkpoint都会出现此异常,任务下线重启后恢复正常 请问这是咋回事呢? level :WARN location :org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:796) log

Flink SQL关于'connector' = 'filesystem‘的问题求助!

2021-02-02 文章 yinghua...@163.com
今天在使用Flink 1.11.3版本使用Flink SQL将kafka中数据导入到HDFS上时提示如下的错误 Caused by: org.apache.flink.table.api.TableException: Could not load service provider for factories. at org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:346) at

Flink job与自己系统平台的一体化集成

2021-02-02 文章 Jacob
有一个模糊的需求,不知道是否合理 目前我们的实时计算的Job都是以On Yarn模式运行在hadoop集群,每次提交新的job,都是在Flink客户端下面,用./bin/flink run-application -t yarn-application ... 的形式去提交Job。 现在我们有自研的一个关于数据处理平台,flink job是数据处理的一个环节,想着能不能在我们系统的portal中配一个菜单,上传flink项目的jar包,可以提交Job到hadoop集群,形成一体化的管理,不用每次去一个flink客户端下面去提交了,不知道这种需求是否合理?

Re: flink做checkpoint失败 Checkpoint Coordinator is suspending.

2021-02-02 文章 Congxian Qiu
Hi 你 flink 是什么版本,以及你作业 checkpoint/state 相关的配置是什么呢?如果可以的话,把完整的 jm log 发一下 Best, Congxian chen310 <1...@163.com> 于2021年2月1日周一 下午5:41写道: > 补充下,jobmanager日志异常: > > 2021-02-01 08:54:43,639 ERROR > org.apache.flink.runtime.rest.handler.job.JobDetailsHandler [] - Exception > occurred in REST