exception:beyond the 'PHYSICAL' memory limit

2020-05-07 Thread tiantingting5...@163.com
最近用flink1.10写一个流任务,大致的逻辑,是将数据清洗以后写入hbase,中间不需要保存任何状态 这里是启动脚本: export HADOOP_CONF_DIR=/etc/hadoop/conf export HADOOP_CLASSPATH=/opt/cloudera/parcels/CDH/jars/* /opt/flink/flink-1.10.0/bin/flink run -p 1 -m yarn-cluster -yqu root.crawler \ -ynm testRealTime22 \ -yD

Re: Shared Checkpoint Cleanup and S3 Lifecycle Policy

2020-05-07 Thread Trystan
Aha, so incremental checkpointing *does* rely on infinitely-previous checkpoint state, regardless of the incremental retention number. The documentation wasn't entirely clear about this. One would assume that if you retain 3 checkpoints, anything older than the 3rd is irrelevant, but that's

Re: flink 1.10内存设置

2020-05-07 Thread Xintong Song
基本上是这样的,关于内存释放的部分稍微有点不准确。 Heap 空间可以认为是 JVM 向 OS 申请好的一段连续内存。Java 对象 new 的时候是从这段 JVM 已经申请的内存中划分出一部分,GC 时对象 finalize 也是将内存还给 JVM,并不会真的像 OS 去释放内存。 Direct/Native 内存则是直接向 OS 申请的内存。持有该内存的对象在 finalize 的时候必须向 OS 释放这段内存,否则 GC 是无法自动释放该内存的,就会造成泄漏。 Direct 内存相比 Native 内存的区别主要有两点,一是申请时 JVM 会检查

Re: flink 1.10 使用 createTemporaryTable 注册表,SQL 使用 order by 报错

2020-05-07 Thread Hito Zhu
感谢回答! 我定义了 schema 指定了时间字段,正如下面的代码,唯一的区别是使用 createTemporaryTable 替代原先的 registerTableSource 方法。 Rowtime rowtime = new Rowtime()     .timestampsFromField("searchTime")     .watermarksPeriodicBounded(5 * 1000); Schema schema = new Schema()     .field("code", DataTypes.INT())     .field("results",

?????? flink 1.10????????

2020-05-07 Thread ??????(Jiacheng Jiang)
direct??nativeunsafe.allocateMemoryDirectByteBuffer??newdirectMaxDirectMemorySize,jvm??DirectByteBuffer ---- ??:

??????flink on kubernetes ????????????????

2020-05-07 Thread a511955993
map??unionkeyby agg kafka??kafka?? | | a511955993 | | ??a511955...@163.com | ?? ??2020??05??08?? 10:06(Jiacheng Jiang) ?? Map??es

回复:flink on kubernetes 作业卡主现象咨询

2020-05-07 Thread a511955993
taskmanager.network.memory.min:2g max:3g。 如果网络内存不足,集群不是应该在启动的时候就报错吗。是否会在运行期间才出现卡住的现象? | | a511955993 | | 邮箱:a511955...@163.com | 签名由 网易邮箱大师 定制 在2020年05月08日 10:08,LakeShen 写道: Hi , 你可以看下你的内存配置情况,看看是不是内存配置太小,导致 networkd bufffers 不够。 具体文档参考:

Re: Streaming SQL 的 Source/Sink 在 Append Mode/Upsert Mode 上的疑问

2020-05-07 Thread Jark Wu
Hi, FLIP-95 和 FLIP-105 后,上述的 query 就可以原生支持了。 FLIP-95 和 FLIP-105 的核心工作就是识别 binlog 中的 update/delete/insert 消息,而不是全当成 append 消息。 预计 1.11 能见到这些功能。 Best, Jark On Thu, 7 May 2020 at 21:34, oliver wrote: > > 有其他办法吗 > > 可以尝试group by id并配合UDF:LAST_VALUE, > SQL示例如下: > insert into sink_es > select

Re: Shared Checkpoint Cleanup and S3 Lifecycle Policy

2020-05-07 Thread Congxian Qiu
Hi Yes, there should only files used in checkpoint 8 and 9 and 10 in the checkpoint file, but you can not delete the file which created older than 3 minutes(because checkpoint 8,9, 10 may reuse the file created in the previous checkpoint, this is the how incremental checkpoint works[1]) you can

??????flink on kubernetes ????????????????

2020-05-07 Thread ??????(Jiacheng Jiang)
Map??es ---- ??: "a511955993"

Re: flink on kubernetes 作业卡主现象咨询

2020-05-07 Thread LakeShen
Hi , 你可以看下你的内存配置情况,看看是不是内存配置太小,导致 networkd bufffers 不够。 具体文档参考: https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html Best, LakeShen a511955993 于2020年5月7日周四 下午9:54写道: > hi, all > > > 集群信息: > flink版本是1.10,部署在kubernetes上,kubernetes版本为1.17.4,docker版本为19.03, >

Re: flink 1.10 使用 createTemporaryTable 注册表,SQL 使用 order by 报错

2020-05-07 Thread Jingsong Li
Hi, 就像异常所说,streaming sql不支持非时间字段的order by。 你是怎么来指定时间字段的呢? Best, Jingsong Lee On Fri, May 8, 2020 at 9:52 AM Hito Zhu wrote: > hi all, > flink 1.10 建议使用 createTemporaryTable 方法代替 registerTableSource > 方法,替换后报错,错误信息和SQL如下: > > Exception in thread "main"

flink 1.10 使用 createTemporaryTable 注册表,SQL 使用 order by 报错

2020-05-07 Thread Hito Zhu
hi all, flink 1.10 建议使用 createTemporaryTable 方法代替 registerTableSource 方法,替换后报错,错误信息和SQL如下: Exception in thread "main" org.apache.flink.table.api.TableException: Sort on a non-time-attribute field is not supported. SQL:select code, ...,searchTime from table order by searchTime asc 不使用 order

Re: Correctly implementing of SourceFunction.run()

2020-05-07 Thread Jingsong Li
Hi, Some suggestions from my side: - synchronized (checkpointLock) to some work and ctx.collect? - Put Thread.sleep(interval) out of try catch? Maybe should not swallow interrupt exception (Like cancel the job). Best, Jingsong Lee On Fri, May 8, 2020 at 2:52 AM Senthil Kumar wrote: > I am

Correctly implementing of SourceFunction.run()

2020-05-07 Thread Senthil Kumar
I am implementing a source function which periodically wakes up and consumes data from S3. My currently implementation is like so. Following: org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction Is it safe to simply swallow any and all exceptions in the run method

Re: Rich Function Thread Safety

2020-05-07 Thread tao xiao
As the java doc suggests it seems operator method and snapshot checkpoint are accessed by two different threads https://github.com/apache/flink/blob/release-1.10.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java#L39-L62 On Thu, May 7, 2020

Re: Statefun 2.0 questions

2020-05-07 Thread Wouter Zorgdrager
Hi Igal, Thanks for your quick reply. Getting back to point 2, I was wondering if you could trigger indeed a stateful function directly from Flask and also get the reply there instead of using Kafka in between. We want to experiment running stateful functions behind a front-end (which should be

flink on kubernetes 作业卡主现象咨询

2020-05-07 Thread a511955993
hi, all 集群信息: flink版本是1.10,部署在kubernetes上,kubernetes版本为1.17.4,docker版本为19.03, cni使用的是weave。 现象: 作业运行的时候,偶发会出现operation卡住,下游收不到数据,水位线无法更新,反压上游,作业在一段时间会被kill掉的情况。 通过jstack出来的堆栈信息片段如下: "Map (152/200)" #155 prio=5 os_prio=0 tid=0x7f67a4076800 nid=0x31f waiting on condition

Re: Streaming SQL 的 Source/Sink 在 Append Mode/Upsert Mode 上的疑问

2020-05-07 Thread oliver
> 有其他办法吗 可以尝试group by id并配合UDF:LAST_VALUE, SQL示例如下: insert into sink_es select id,LAST_VALUE(column_name) from binlog group by id; Best, Oliver yunchang > 2020年5月7日 下午8:55,Luan Cooper 写道: > > 自定义 sink 有这么几个疑问 > 1. 自带的 sink 得都改成 upsert 比如 jdbc/es > 2. 这样 append/upsert 代码有大量重复 > 3. 和 flink 对

Re: Streaming SQL 的 Source/Sink 在 Append Mode/Upsert Mode 上的疑问

2020-05-07 Thread Benchao Li
Hi, 现在的ES Sink的确是没有办法做到这种情况下的的update,因为主键信息都是通过SQL推断出来的,你这个SQL是推断不出来主键信息的。 而且你已经找到了正确的方向,我理解FLIP-87之后,我们是可以在DDL里面指定主键信息的,也就是可以达到你想要的效果。 Luan Cooper 于2020年5月7日周四 下午8:39写道: > Hi > > 有这么个场景,需要将 MySQL 的对一张表的更改 同步到 ElasticSearch 中,两个地方表的 PRIMARY KEY 都是 ID,SQL > 如下 > > INSERT INTO sink_es // 将更改同步

Re: Streaming SQL 的 Source/Sink 在 Append Mode/Upsert Mode 上的疑问

2020-05-07 Thread Luan Cooper
自定义 sink 有这么几个疑问 1. 自带的 sink 得都改成 upsert 比如 jdbc/es 2. 这样 append/upsert 代码有大量重复 3. 和 flink 对 append/upsert 流的定义有冲突,有额外 hack 的解释成本 4. 得有地方另外指定 update key 这么做感觉会挖坑 有其他办法吗 lec ssmi 于2020年5月7日 周四20:42写道: > 使用自定义的Table Sink就可以了啊. > > Luan Cooper 于2020年5月7日周四 下午8:39写道: > > > Hi > > > > 有这么个场景,需要将

Re: Streaming SQL 的 Source/Sink 在 Append Mode/Upsert Mode 上的疑问

2020-05-07 Thread lec ssmi
使用自定义的Table Sink就可以了啊. Luan Cooper 于2020年5月7日周四 下午8:39写道: > Hi > > 有这么个场景,需要将 MySQL 的对一张表的更改 同步到 ElasticSearch 中,两个地方表的 PRIMARY KEY 都是 ID,SQL > 如下 > > INSERT INTO sink_es // 将更改同步 upsert 到 ES > SELECT * > FROM binlog // mysql 表的 binlog > > 假设对于 MySQL 中 id = 1 的变更有 10 条,需要在 ES 上都更新 id = 1 的一条记录

Streaming SQL 的 Source/Sink 在 Append Mode/Upsert Mode 上的疑问

2020-05-07 Thread Luan Cooper
Hi 有这么个场景,需要将 MySQL 的对一张表的更改 同步到 ElasticSearch 中,两个地方表的 PRIMARY KEY 都是 ID,SQL 如下 INSERT INTO sink_es // 将更改同步 upsert 到 ES SELECT * FROM binlog // mysql 表的 binlog 假设对于 MySQL 中 id = 1 的变更有 10 条,需要在 ES 上都更新 id = 1 的一条记录 但是上面的 SQL 是做不到的,只会一直 Insert 如果想 Upsert ES 的话,就得让 Source 表变成 Upsert

Re: Window processing in Stateful Functions

2020-05-07 Thread Igal Shilman
Hi all, Data stream windows are not yet supported in statefun, but it seems like the main motivation here is to purge old edges? If this is the case perhaps we need to integrate state TTL [1] into persisted values/persistedtables. An alternative approach would be to implement a thumbling window

Re: checkpointing opening too many file

2020-05-07 Thread David Anderson
With the FsStateBackend you could also try increasing the value of state.backend.fs.memory-threshold [1]. Only those state chunks that are larger than this value are stored in separate files; smaller chunks go into the checkpoint metadata file. The default is 1KB, increasing this should reduce

Re: Broadcast stream causing GC overhead limit exceeded

2020-05-07 Thread Fabian Hueske
Hi Eleanore, Thanks for sharing your findings with us. :-) Cheers, Fabian Am Do., 7. Mai 2020 um 04:56 Uhr schrieb Eleanore Jin < eleanore@gmail.com>: > Hi Fabian, > > I just got confirmation from Apache Beam community, Beam will buffer the > data until there is data from broadcast stream.

flink how to access remote hdfs using namenode nameservice

2020-05-07 Thread wangl...@geekplus.com.cn
According to https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html I am deploying standalone cluster with jobmanager HA and need the hdfs address: high-availability.storageDir: hdfs:///flink/recovery My hadoop is a remote cluster. I can write it

Re: Statefun 2.0 questions

2020-05-07 Thread Igal Shilman
Hi Wouter! Glad to read that you are using Flink for quite some time, and also exploring with StateFun! 1) yes it is correct and you can follow the Dockerhub contribution PR at [1] 2) I’m not sure I understand what do you mean by trigger from the browser. If you mean, for testing / illustration

Re: 回复:在已有 Hadoop 外搭建 standalone 模式 HA flink 集群

2020-05-07 Thread wangl...@geekplus.com.cn
我试了下是可以的,但现在有一个访问 HDFS 的问题。 我用的 hadoop 是阿里云 EMR 管理, 在 EMR 管理的机器上可以以 hdfs://emr-cluster:8020/ 访问 HDFS 但我部署的 Flink 不属于 EMR 管理,这个地址是不能解析的,我只能写成 hdfs://active-namenode-ip:8020/ 的形式,NameNode 丧失了 HA 的功能 有什么方式解决这个问题吗? 谢谢, 王磊 wangl...@geekplus.com.cn Sender: Andrew Send Time: 2020-05-07 12:31

Re: async IO in UDFs

2020-05-07 Thread Benchao Li
Hi, AFAIK, there is no way to do this for now. This needs the operators running UDFs to support async IO. lec ssmi 于2020年5月7日周四 下午3:23写道: > Hi: > Is there any way to implements async IO in UDFs (scalar function, > table function, aggregate function)? > -- Benchao Li School of

async IO in UDFs

2020-05-07 Thread lec ssmi
Hi: Is there any way to implements async IO in UDFs (scalar function, table function, aggregate function)?

Re: Autoscaling vs backpressure

2020-05-07 Thread Arvid Heise
Hi Manish, while you could use backpressure and the resulting consumer lag to throttle the source and keep processing lag to a minimum, I'd personally see only very limited value. It assumes that you have an architecture where you can influence the input rate, which is probably only true if you

Re: Using flink-connector-kafka-1.9.1 with flink-core-1.7.2

2020-05-07 Thread Arvid Heise
Hi Nick, all Flink dependencies are only compatible with the same major version. You can workaround it by checking out the code [1] and manually set the dependency of the respective module to your flink-core version and revert all changes that are not compiling. But there is no guarantee that

Re: flink窗口函数AggregateFunction中,merge的作用和应用场景

2020-05-07 Thread Zhefu PENG
Hi Jingsong, 如你所说,[1]就是一种增量更新方差的方式,这个也是我之前查到的一种方式,并打算在add内使用;但是我的问题是,在merge的时候,怎么让已经在之前算过的两批数据结果的准确性呢?比如,一个节点算过1,2,3的数据,另一个节点算过1,3,4的数据,ACC会保存总和,个数,方差值;但是在merge时候,借用ACC里留存的数据,使用[1]的增量更新方式, 就会造成结果的偏差啊(原因是因为数据需要去重,merge以后实现的其实是1,2,3,4的数据的方差计算,而不是1,2,3,1,3,4的计算)。这个怎么解决呢?

回复:flink-提交jar 隔断时间自己重启问题

2020-05-07 Thread Yun Gao
从日志上来看是有container异常退出了,可以看下这个container对应的TM的日志里面有没有异常,以及Yarn日志里面有没有关于这个container的异常。 -- 发件人:祁森伟<15110596...@163.com> 日 期:2020年05月06日 19:43:55 收件人: 主 题:flink-提交jar 隔断时间自己重启问题 java.util.concurrent.CompletionException: