Re: ctx.timestamp() returning null when using Processing Time

2019-11-06 Thread Yun Tang
] https://github.com/apache/flink/blob/9b43f13a50848382fbd634081b82509f464e62ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BaseBroadcastProcessFunction.java#L50 Best Yun Tang From: Komal Mariam Date: Wednesday, November 6, 2019 at 6:19 PM To: user Subject

Re: Flink savepoint(checkpoint) recovery dev debug

2019-11-06 Thread Yun Tang
Best Yun Tang On 11/6/19, 5:18 PM, "qq" <471237...@qq.com> wrote: Hi all. I want to simulation the shell command which “flink -s savepoint” , this command only can run with shell command, I want to debug it on dev, local development environment, anyone could help

Re: 从 state 中恢复数据,更改 yarn container 个数会有影响吗

2019-11-04 Thread Yun Tang
Hi 首先先判断作业是否在不断地failover,是否有“maximum parallelism” 相关的异常,如果有,说明因为改了并发度而不兼容,实际作业一直都没有从checkpoint正常恢复。 如果作业成功地从checkpoint恢复了,再判断是不是因为task端正在因为正在改并发而导致恢复数据中,如果你的state比较大,这一步骤可能会比较耗时,一般这种情况是source端消费了数据,但是无法向下游发送,整个作业看上去像是一直卡在那边。可以通过task端的jstak看调用栈,看是否有restore相关的栈hang住。

Re: Checkpoint in FlinkSQL

2019-11-04 Thread Yun Tang
://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#checkpointing Best Yun Tang From: Simon Su Date: Tuesday, November 5, 2019 at 10:38 AM To: dev , user Subject: Checkpoint in FlinkSQL Hi All Does current Flink support to set checkpoint properties while using Flink SQL ? For example

Re: Using RocksDB as lookup source in Flink

2019-11-04 Thread Yun Tang
Hi Srikanth As RocksDB is a single node DB which just like InfluxDB, I recommend you could refer to an implementation of InfluxDB sink. [1] [1] https://github.com/apache/bahir-flink/tree/master/flink-connector-influxdb Best Yun Tang From: OpenInx Date: Monday, November 4, 2019 at 6:28 PM

Re: Checkpoint failed all the time

2019-11-03 Thread Yun Tang
Sure, this feature has been implemented in FLINK-12364 [1], all you need do is set the tolerable checkpoint failure numbers via like env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5); [1] https://issues.apache.org/jira/browse/FLINK-12364 Best Yun Tang From: "

Re: Flink State 过期清除 TTL 问题

2019-10-31 Thread Yun Tang
Hi 王磊 从你的配置以及使用Flink-1.7版本看,并没有启用主动清理过期数据的功能 [1],建议对StateTtlConfig 配置 cleanupFullSnapshot,这样你在执行full snapshot(也就是Savepoint)的时候,会把过期的数据不保存在你的Savepoint内容中。如果你不启用主动清理,只有当原先的数据被读取时,才会进行清理,这可能就导致你的Savepoint内容不断增长的原因。 另外,建议升级到Flink-1.8+,对state TTL的特性支持更好,可以参考中文文章 [2] [1]

Re: 如何过滤异常的timestamp?

2019-10-31 Thread Yun Tang
个设为IngestionTime,后续的Operator可以再使用EventTime吗? Yun Tang 于2019年10月31日周四 上午2:26写道: > Hi 瑞斌 > > 如果你要使用Flink的IngestionTime的话,其实就不存在与Flink提供的EventTime混用的情况了,而source端的IngestionTime,拿的就是source端的系统时间,可以在source端后面记一个filter > operator,将ingestion的time与message中的e

Re: Flink SQL + savepoint

2019-10-31 Thread Yun Tang
node. [1] Already CC Kurt as he could provide more detail information of this. [1] https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/util/resource/StreamNodeUtil.java#L44 Best Yun Tang From: Fanbin Bu Date: Thursday, October 31, 2019

Re: 如何过滤异常的timestamp?

2019-10-30 Thread Yun Tang
Hi 瑞斌 如果你要使用Flink的IngestionTime的话,其实就不存在与Flink提供的EventTime混用的情况了,而source端的IngestionTime,拿的就是source端的系统时间,可以在source端后面记一个filter operator,将ingestion的time与message中的event time进行比较,超过一定阈值的可以丢弃掉不传到下游去。 祝好 唐云 From: 邢瑞斌 Sent: Wednesday, October 30, 2019 17:57 To:

Re: standalone flink savepoint restoration

2019-10-16 Thread Yun Tang
/runtime/scheduler/LegacyScheduler.java#L190 Best Yun Tang From: Matt Anger Sent: Thursday, October 17, 2019 5:46 To: user@flink.apache.org Subject: standalone flink savepoint restoration Hello everyone, I am running a flink job in k8s as a standalone HA job. Now I

Re: Kinesis Connector and Savepoint/Checkpoint restore.

2019-10-15 Thread Yun Tang
/2c411686d23f456cdc502abf1c6b97a61070a17d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L307 Best Yun Tang From: Steven Nelson Sent: Wednesday, October 16, 2019 4:31 To: user Subject: Kinesis Connector

Re: Flink restoring a job from a checkpoint

2019-10-11 Thread Yun Tang
-docs-stable/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint Best Yun Tang From: Congxian Qiu Sent: Friday, October 11, 2019 19:47 To: Flavio Pompermaier Cc: Yun Tang ; theo.diefent...@scoop-software.de ; user Subject: Re: Flink restoring a job from

Re: 关于使用RocksDBStateBackend 启用state.backend.rocksdb.ttl.compaction.filter.enabled 配置的问题

2019-10-11 Thread Yun Tang
Hi 我觉得你的担心是在TTL尚未过期的周期内,数据就已经写满磁盘了,这个肯定不是TTL能涵盖的问题,从作业规模上尝试限制写入量,或者增大并发,降低单个rocksDB需要承担的数据量(前提是你的所有机器的磁盘空间是大于你的数据量的)。另外如果真的很担心的话,换一个压缩率更小的算法 也有一些帮助(代价是更耗时更耗CPU, rocksDB 官方推荐ZTSD或者Zlib)[1],设置compression type可以参考rocksdb ColumnFamilyOptions的setCompressionType 方法 [2] [1]

Re: 回复: flink 缓存本地文件被删除疑问

2019-10-11 Thread Yun Tang
ent: Friday, October 11, 2019 15:17 To: user-zh Subject: Re: 回复: flink 缓存本地文件被删除疑问 Hi 这是早上发生异常后,我下载的日志,请麻烦查看一下。 taskmanager.log <https://drive.google.com/file/d/17nP8yxSpdAnDDgBEbEUrDXYx-rwosi52/view?usp=drive_web> Yun Tang 于2019年10月11日周五 下午2:56写道: > Hi 戴嘉诚 > > 你的异常发生在failove

Re: 回复: flink 缓存本地文件被删除疑问

2019-10-11 Thread Yun Tang
Hi 戴嘉诚 你的异常发生在failover时,rocksDB恢复的时候,会从hdfs下载文件到临时restore目录,然后再做硬链到DB目录下。原先已经下载到临时restore目录的文件在做硬链的时候,突然报出原始文件找不到了 [1]。这个异常的出现比较奇怪,能否先share一下出问题的taskmanager的所有日志? [1]

Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-10-10 Thread Yun Tang
not rely on HA to do job migration things. Best Yun Tang From: Hao Sun Sent: Friday, October 11, 2019 8:33 To: Yun Tang Cc: Vijay Bhaskar ; Yang Wang ; Sean Hester ; Aleksandar Mastilovic ; Yuval Itzchakov ; user Subject: Re: Challenges Deploying Flink

Re: Flink restoring a job from a checkpoint

2019-10-10 Thread Yun Tang
your job from kafka offset before that problematic message. Best Yun Tang From: Flavio Pompermaier Sent: Friday, October 11, 2019 5:50 To: Yun Tang Cc: Congxian Qiu ; theo.diefent...@scoop-software.de ; user Subject: Re: Flink restoring a job from a checkpoint

Re: Where are uploaded Job jars stored?

2019-10-10 Thread Yun Tang
Hi John The jar is not stored in HA path, I think the answer [1] could help you. [1] https://stackoverflow.com/questions/51936608/where-can-i-find-my-jar-on-apache-flink-server-which-i-submitted-using-apache-fl Best Yun Tang From: John Smith Sent: Friday

Re: Best coding practises guide while programming using flink apis

2019-10-10 Thread Yun Tang
an important role. [1] https://flink.apache.org/contributing/code-style-and-quality-preamble.html [2] https://ci.apache.org/projects/flink/flink-docs-stable/concepts/programming-model.html#levels-of-abstraction Best Yun Tang From: Deepak Sharma Sent: Monday

Re: Async and checkpointing

2019-10-10 Thread Yun Tang
st restore from last latest checkpoint. If failing to send record would not cause that sub-task to fail, nothing would happen and job continues to run but this might be not what you want. Best Yun Tang From: anurag Sent: Friday, October 11, 2019 2:0

Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-10-10 Thread Yun Tang
Just a minor supplement @Hao Sun<mailto:ha...@zendesk.com>, if you decided to drop a operator, don't forget to add --allowNonRestoredState (short: -n) option [1] [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state Best Yu

Re: Flink restoring a job from a checkpoint

2019-10-10 Thread Yun Tang
not use this? Best Yun Tang From: Congxian Qiu Sent: Thursday, October 10, 2019 11:52 To: theo.diefent...@scoop-software.de Cc: user Subject: Re: Flink restoring a job from a checkpoint Hi Vishwas Sorry for the confusing, what Theo said previous is the meaning I

Re: Re:Memory constrains running Flink on Kubernetes

2019-10-10 Thread Yun Tang
could contact me in private to get this jar package and rebuild your Flink runtime to enable write buffer manager future. [1] https://github.com/dataArtisans/frocksdb/pull/4 [2] https://github.com/facebook/rocksdb/wiki/Write-Buffer-Manager#limit-total-memory-of-memtables Best Yun Tang

Re: Difference between windows in Spark and Flink

2019-10-10 Thread Yun Tang
-td34776.html#a34779 [5] https://github.com/chermenin/spark-states [6] https://docs.databricks.com/spark/latest/structured-streaming/production.html#optimize-performance-of-stateful-streaming-queries [7] https://issues.apache.org/jira/browse/FLINK-12692 Best Yun Tang

Re: Flink restoring a job from a checkpoint

2019-10-08 Thread Yun Tang
Hi Vishwas This because Flink's checkpoint mechanism could offer you more ability. You could resume from offset within specific checkpoint instead of last committed offset not to mention you could benefit from restoring from last timer state, operator state and keyed state. Best Yun Tang

Re: Flink restoring a job from a checkpoint

2019-10-08 Thread Yun Tang
/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint Best Yun Tang From: Vishwas Siravara Sent: Wednesday, October 9, 2019 0:54 To: user Subject: Flink restoring a job from a checkpoint Hi guys, I have a flink streaming job which streams from a ka

Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-09-25 Thread Yun Tang
service. [1] https://issues.apache.org/jira/browse/FLINK-11105 [2] https://issues.apache.org/jira/browse/FLINK-12884 Best Yun Tang From: Aleksandar Mastilovic Sent: Thursday, September 26, 2019 1:57 To: Sean Hester Cc: Hao Sun ; Yuval Itzchakov ; user Subject: Re

Re: 回复: flink使用StateBackend问题

2019-09-03 Thread Yun Tang
Hi 1. Checkpoint 超时时间设置是多少(是默认的10min么),如果超时时间太短,容易checkpoint failure 2. 所有的subtask都是n/a 么,source task的checkpoint没有rocksDb的参与,与使用默认的MemoryStateBackend其实是一样的,不应该source task也没有完成checkpoint(除非一直都拿不到StreamTask里面的锁,一直都在process element) 3. 作业的反压情况如何,是不是使用RocksDB时候存在严重的反压(back

Re: Re:使用flink 1.8.1 部署yarn集群 , 始终有报错

2019-09-01 Thread Yun Tang
Hi 向0.0.0.0:8030 尝试提交作业是因为提交作业时找不到正确的YARN配置,就会向默认的本地8030端口提交,检查一下HADOOP_CONF_DIR 或者 HADOOP_HOME 这些环境变量有没有设置正确。可以设置一下这些配置文件的目录地址就可以提交作业了。 BTW,这个不是一个Flink的问题,是所有使用YARN管理作业的大数据计算引擎都有可能遇到的问题。 祝好 唐云 From: 周��岗 Sent: Sunday, September 1, 2019 15:31 To:

Re: Non incremental window function accumulates unbounded state with RocksDb

2019-09-01 Thread Yun Tang
have been stayed in the window but not triggered, we might meet larger state. (However, it seems still cannot be acted a 400 factor larger) Best Yun Tang From: William Jonsson Sent: Friday, August 30, 2019 18:22 To: Yun Tang ; user@flink.apache.org Subject: Re: Non

Re: Non incremental window function accumulates unbounded state with RocksDb

2019-08-30 Thread Yun Tang
failover? Could you take a savepoint for the job with different state backends and compare the size of the savepoints? What's more, what version of Flink did you use? Best Yun Tang From: William Jonsson Sent: Friday, August 30, 2019 17:04 To: user@flink.apache.org

Re: best practices on getting flink job logs from Hadoop history server?

2019-08-30 Thread Yun Tang
note that the temporary files of the YARN session in the home directory will not be removed. Best Yun Tang From: Zhu Zhu Sent: Friday, August 30, 2019 16:24 To: Yu Yang Cc: user Subject: Re: best practices on getting flink job logs from Hadoop history server? Hi

Re: checkpoint failure suddenly even state size less than 1 mb

2019-08-30 Thread Yun Tang
://github.com/apache/flink/blob/ccc7eb431477059b32fb924104c17af953620c74/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L758 Best Yun Tang From: Sushant Sawant Sent: Tuesday, August 27, 2019 15:01 To: user Subject: Re

Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论

2019-08-28 Thread Yun Tang
Sent: Thursday, August 29, 2019 11:45 To: user-zh@flink.apache.org Subject: Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论 Hi Yun Tang, 其实默认情况下,我其实是默认想从checkpoint恢复kafka当前消费的进度的,但是遇到特别情况下,从某个时间点开始消费数据,就像您说的想要主要恢复keyed state相关数据,如果把setCommitOffsetsOnCheckpoints(false),kakfa properties里面设置

Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论

2019-08-28 Thread Yun Tang
Hi 蒋涛涛 Flink的kafka consumer一共有三种offset commit模式: 1. OffsetCommitMode.DISABLED 完全disable offset的commit 2. OffsetCommitMode.ON_CHECKPOINTS Flink的默认行为,只有当Flink checkpoint完成时,才会将offset commit到Kafka 3. OffsetCommitMode.KAFKA_PERIODIC 使用Kafka的internal

Re: StateMigrationException thrown by state processor api

2019-08-27 Thread Yun Tang
space serializer. Best Yun Tang From: Paul Lam Sent: Tuesday, August 27, 2019 17:14 To: user Cc: Tzu-Li (Gordon) Tai Subject: StateMigrationException thrown by state processor api Hi, I was using the new state processor api to read a savepoint produced

Re: 任务内存增长

2019-08-25 Thread Yun Tang
hi 张坤 使用的是RocksDBStateBackend么,一般被YARN的node manager内存超用而kill是native 内存超用导致的。可以在Flink 参数env.java.opts.taskmanager里面加上 -XX:NativeMemoryTracking=detail [1],这样可以观察内存是否增长。另外你使用的内存配置和被kill时候的YARN的日志分别是什么呢,可以考虑增大JVM heap 申请的资源来变相加大向YARN申请的总内存,某种程度上可以缓解被kill的概率。 [1]

Re: [ANNOUNCE] Apache Flink 1.9.0 released

2019-08-22 Thread Yun Tang
Glad to hear this and really appreciate Gordon and Kurt's drive on this release, and thanks for everyone who ever contributed to this release. Best Yun Tang From: Becket Qin Sent: Friday, August 23, 2019 0:19 To: 不常用邮箱 Cc: Yang Wang ; user Subject: Re

Re: 回复: flink启动等待10分钟问题

2019-08-21 Thread Yun Tang
he.flink.runtime.executiongraph.ExecutionGraph- Source: Custom Source -> Flat Map -> Filter -> Filter -> Map -> Timestamps/Watermarks -> from: (request, curuserid, timelong, rowtime) -> select: (rowtime, 0 AS $f1, curuserid) -> time attribute: (rowtime) (3/4) (4756a0450

Re: flink启动等待10分钟问题

2019-08-21 Thread Yun Tang
Hi Flink on YARN作业启动时间长,有很多原因,例如资源不够在等待,container申请的时候又退出了。默认的slot request的timeout时间是5min,感觉你的作业应该是可能遇到了一个slot request timeout,然后又重新申请。最好能提供一下jobmanager的日志才好进一步分析。 祝好 唐云 From: 々守护々 <346531...@qq.com> Sent: Thursday, August 22, 2019 11:04 To: user-zh Subject:

Re: [ANNOUNCE] Andrey Zagrebin becomes a Flink committer

2019-08-14 Thread Yun Tang
Congratulations Andrey. Best Yun Tang From: Xintong Song Sent: Wednesday, August 14, 2019 21:40 To: Oytun Tez Cc: Zili Chen ; Till Rohrmann ; dev ; user Subject: Re: [ANNOUNCE] Andrey Zagrebin becomes a Flink committer Congratulations Andery~! Thank you

Re: Flink 1.8: Using the RocksDB state backend causes "NoSuchMethodError" when trying to stop a pipeline

2019-08-13 Thread Yun Tang
-statebackend-rocksdb_2.11 in your pom.xml should be enough as it already includes the dependency of rocksdbjni. [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#classloader-resolve-order Best Yun Tang From: Kaymak, Tobias Sent: Tuesday, August

Re: 恢复savepoint,除了命令行,能通过代码获取吗?

2019-08-09 Thread Yun Tang
Hi 中锋 恐怕不能通过代码来回复savepoint,目前一共只有有两个地方可以传入savepoint path,分别是 1. CliFrontendParser#createSavepointRestoreSettings [1] 2. JarRunHandler#getSavepointRestoreSettings [2] 分别对应命令行,网页(REST)提交,没办法在代码里面进行恢复请求,其实我理解REST或者网页提交应该也满足你们的需求。 [1]

Re: Capping RocksDb memory usage

2019-08-08 Thread Yun Tang
and filter usage. Since Flink would use per state per column family, and the write buffer number increase when more column families created. [1] https://issues.apache.org/jira/browse/FLINK-7289 [2] https://github.com/dataArtisans/frocksdb/pull/4 Best Yun Tang From

Re: Operator state

2019-08-08 Thread Yun Tang
Hi When talking about sharing state, broadcast state [1][2] might be a choice. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html#provided-apis [2] https://flink.apache.org/2019/06/26/broadcast-state.html Best Yun Tang

Re: [ANNOUNCE] Hequn becomes a Flink committer

2019-08-07 Thread Yun Tang
Congratulations Hequn. Best Yun Tang From: Rong Rong Sent: Thursday, August 8, 2019 0:41 Cc: dev ; user Subject: Re: [ANNOUNCE] Hequn becomes a Flink committer Congratulations Hequn, well deserved! -- Rong On Wed, Aug 7, 2019 at 8:30 AM mailto:xingc

Re: Re:Re: Re: Flink RocksDBStateBackend 问题

2019-08-06 Thread Yun Tang
般会将数据保存在HDFS上,那么这个是,我的命令是 flink cancel -s 。做完savepoint就退出,那么这个时候rocksdb还需要去写数据吗?因为做完savepoint,整个任务就结束了。 望解答,谢谢老师! 在 2019-08-06 13:44:18,"Yun Tang" 写道: >@lvwenyuan<mailto:lvwenyuan...@163.com> >首先需要明确的一点是,你这里的“FileSystem”指的究竟是checkpoint时存储数据的file >system,还是FsS

Re: Re: Flink RocksDBStateBackend 问题

2019-08-05 Thread Yun Tang
@lvwenyuan 首先需要明确的一点是,你这里的“FileSystem”指的究竟是checkpoint时存储数据的file system,还是FsStateBackend,建议下次提问前可以把需要咨询的内容表述清楚一些。 * 如果指的是存储checkpoint数据的远程file system,在incremental

Re: Memory constrains running Flink on Kubernetes

2019-08-05 Thread Yun Tang
://github.com/facebook/rocksdb/blob/30edf1874c11762a6cacf4434112ce34d13100d3/java/src/main/java/org/rocksdb/MutableColumnFamilyOptionsInterface.java#L24 Best Yun Tang From: wvl Sent: Monday, August 5, 2019 17:55 To: Yu Li Cc: Yun Tang ; Yang Wang ; Xintong Song ; user

Re: Savepoint process recovery in Jobmanager HA setup

2019-07-27 Thread Yun Tang
savepoint. Best Yun Tang From: Bajaj, Abhinav Sent: Saturday, July 27, 2019 7:25 To: user@flink.apache.org Subject: Savepoint process recovery in Jobmanager HA setup Hi, I am trying to test a scenario that triggers a savepoint on a Flink 1.7.1 Job deployed

Re: Re: Flink checkpoint 并发问题

2019-07-25 Thread Yun Tang
time >= next.getValue()) { iterator.remove(); --stateSize; ++removeState; } } if (stateSize == 0) { accumulateStateMap.clear(); } //把这个定时器删除掉 ctx.timerService().deleteEventTimeTimer(time

Re: Memory constrains running Flink on Kubernetes

2019-07-25 Thread Yun Tang
Prometheus with k8s. Best Yun Tang From: wvl Sent: Thursday, July 25, 2019 17:50 To: Yang Wang Cc: Yun Tang ; Xintong Song ; user Subject: Re: Memory constrains running Flink on Kubernetes Thanks for all the answers so far. Especially clarifying was that RocksDB me

Re: Re: Flink checkpoint 并发问题

2019-07-25 Thread Yun Tang
包的, operator state descriptor是使用MapStateDescriptor, 谢谢! Yun Tang 于2019年7月25日周四 下午7:10写道: > Hi all > > 你们讨论的已经越来越偏了,出问题的是operator state > backend,实际上与RocksDB没有关系,与MaxConcurrentCheckpoints 也不应该有关系。 > > To 戴嘉诚 > 你使用的Flink版本是什么?这个operator 里面的operator state descr

Re: Memory constrains running Flink on Kubernetes

2019-07-24 Thread Yun Tang
-oom-behavior [2] https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#indexes-and-filter-blocks [3] https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/config.html#rocksdb-native-metrics Best Yun Tang From: Xintong Song Sent: Wednesday

Re: FsStateBackend,hdfs rpc api too much,FileCreated and FileDeleted is for what?

2019-07-23 Thread Yun Tang
/FSDirMkdirOp.java#L178 [2] https://github.com/apache/hadoop/blob/377f95bbe8d2d171b5d7b0bfa7559e67ca4aae46/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java#L799 Best Yun Tang From: 陈Darling Sent: Tuesday, July

Re: checkpoint 文件夹Chk-no 下面文件个数是能计算出来的吗?

2019-07-18 Thread Yun Tang
是另外一个话题 请问这个话题有没有什么优化点可以启发一下吗? 在 2019年7月18日,下午9:34,Yun Tang mailto:myas...@live.com>> 写道: hi 首先先要确定是否是大量创造文件导致你的namenode RPC相应堆积多,RPC请求有很多种,例如每个task创建checkpoint目录也是会向namenode发送大量RPC请求的(参见 [https://issues.apache.org/jira/browse/FLINK-11696]);也有可能是你的checkpoint interval太小,导致文件不断被创建和删除

Re: checkpoint 文件夹Chk-no 下面文件个数是能计算出来的吗?

2019-07-18 Thread Yun Tang
hi 首先先要确定是否是大量创造文件导致你的namenode RPC相应堆积多,RPC请求有很多种,例如每个task创建checkpoint目录也是会向namenode发送大量RPC请求的(参见 [https://issues.apache.org/jira/browse/FLINK-11696]);也有可能是你的checkpoint interval太小,导致文件不断被创建和删除(subsume old checkpoint),先找到NN压力大的root cause吧。

Re: checkpoint 文件夹Chk-no 下面文件个数是能计算出来的吗?

2019-07-18 Thread Yun Tang
com> Sent: Thursday, July 18, 2019 15:34 To: user-zh@flink.apache.org Subject: Fwd: checkpoint 文件夹Chk-no 下面文件个数是能计算出来的吗? 谢谢您的解答, 那些文件的数量是只和operator的并行度相关吗?是不是还有key 的个数等相关?有没有具体的公式呢?我没有在源码里找到这块的逻辑 还有一个最重要的问题,这些文件即然不能合并,state小文件合并指的是那些文件呢? 祝安 Andrew > 下面是被转发的邮件: > > 发件

Re: checkpoint 文件夹Chk-no 下面文件个数是能计算出来的吗?

2019-07-18 Thread Yun Tang
Hi A1: chk-x文件下面的文件个数是跟operator个数并行度是有关系的,主要是operator state的文件。对于checkpoint场景,_metadata只是元数据,真实的operator数据都是在其他文件内。 A2: 不可以将这些文件合并在一起。因为_metadata内主要记录了文件路径,如果合并的话,找不到原始路径会有问题,无法从checkpoint进行restore 祝好 唐云 From: 陈冬林 <874269...@qq.com> Sent: Thursday, July 18,

Re: Flink queryable state - io.netty4 errors

2019-07-07 Thread Yun Tang
Hi Shivam Did this reproduce each time? Would you please share the full stack trace when you get this exception. Moreover, task manager log of that value state is also very welcome. Best Yun Tang From: Shivam Dubey Sent: Sunday, July 7, 2019 17:35 To: user

Re: Flink中对于state不是很大,但是需要频繁checkpoint的任务,backendstate是选file还是rockdb?

2019-07-02 Thread Yun Tang
hi 首先,就算选择rocksDB statebackend,也是需要写HDFS的,只是在开启了incremental checkpoint方式情况下可以减少每次hdfs数据写入。 我觉得这个问题核心是一个trade off。不做checkpoint的时候,RocksDBStateBackend的读写性能不如纯内存的FsStateBackend。而在checkpoint的同步阶段,RocksDB stateBackend需要全量写本地磁盘,比FsStateBackend的内存操作可能要慢一些,也会影响吞吐。在checkpoint的异步阶段,由于RocksDB

Re: Flink 1.9 进度跟踪方法

2019-07-01 Thread Yun Tang
hi Luan 你可以在 Flink的邮件列表里面去追踪 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Features-for-Apache-Flink-1-9-0-td28701.html,据我所知,在JIRA上似乎没有一个总览的issue去追踪进度。 祝好 唐云 From: Luan Cooper Sent: Monday, July 1, 2019 16:17 To:

Re: Providing external files to flink classpath

2019-06-28 Thread Yun Tang
Best Yun Tang From: Vishwas Siravara Sent: Saturday, June 29, 2019 0:43 To: user Subject: Providing external files to flink classpath Hi , I am trying to add external property files to the flink classpath for my application. These files are not a part of the fat jar

Re: Re:Flink1.8+Hadoop3.1.2 编译问题

2019-06-28 Thread Yun Tang
你好 因为从Flink-1.8 开始,flink的默认编译选项里面就不再带上hadoop依赖了。可以参考[1] 了解更多信息。实际上从官方的下载链接[2]里面也说明了从Flink-1.8开始shaded-hadoop的相关jar包需要单独下载并放置在lib目录下。 如果需要shaded-hadoop jar包,可以单独去编译好的 flink-shaded-hadoop 子项目目录下找到相关的jar包。 [1] https://issues.apache.org/jira/browse/FLINK-11266 [2]

Re: checkpoint stage size的问题

2019-06-26 Thread Yun Tang
你好 这个问题问得有点稍微宽泛,因为并没有描述你所认为的checkpoint state size越来越大的周期。checkpoint state size变大有几个原因: 1. 上游数据量增大。 2. window设置时间较长,尚未触发,导致window内积攒的数据比较大。 3. window的类型决定了所需要存储的state size较大。 可以参考社区的文档[1] window state的存储空间问题。另外,在上游数据量没有显著变化的时候,若干窗口周期后的checkpoint state

Re: Process Function's timers "postponing"

2019-06-25 Thread Yun Tang
From: Andrea Spina Sent: Tuesday, June 25, 2019 23:40 To: Yun Tang Cc: user Subject: Re: Process Function's timers "postponing" Hi Yun, thank you for your answer. I'm not sure I got your point. My question is: for the same key K, I process two records R1 at t1 and R2 at

Re: Process Function's timers "postponing"

2019-06-25 Thread Yun Tang
the "postpone" delay at [2]. When the scheduled runnable is triggered, it would poll from the 'processingTimeTimersQueue' [3] which means the timer would finally be removed. Hope this could help you. Best Yun Tang [1] https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c02

Re: Building some specific modules in flink

2019-06-24 Thread Yun Tang
, a quick solution is using `jar uf` [1] command to update the dist jar package with your changed classes. Otherwise, you need to build flink-dist module from scratch. [1] https://docs.oracle.com/javase/tutorial/deployment/jar/update.html Best Yun Tang From: syed

Re: Linkage Error RocksDB and flink-1.6.4

2019-06-23 Thread Yun Tang
(MemorySize.parseBytes(properties.writeBufferSize)) } You just need to serialize the properties via closure to TMs. Hope this could help you. Best Yun Tang From: Andrea Spina Sent: Monday, June 24, 2019 2:20 To: user Subject: Linkage Error RocksDB and flink

Re: Checkpointing & File stream with

2019-06-18 Thread Yun Tang
://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/datastream_api.html#data-sources Best Yun Tang From: Sung Gon Yi Sent: Tuesday, June 18, 2019 14:13 To: user@flink.apache.org Subject: Checkpointing & File stream with Hello, I work on joining two streams,

Re: Apache Flink - Question about metric registry and reporter and context information

2019-06-15 Thread Yun Tang
by #getAllVariables() within MetricGroup. [1] https://github.com/apache/flink/blob/8558548a37437ab4f8049b82eb07d1b3aa6ed1f5/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java#L139 Best Yun Tang From: M Singh Sent: Saturday

Re: What happens when: high-availability.storageDir: is not available?

2019-06-13 Thread Yun Tang
/FLIP-19%3A+Improved+BLOB+storage+architecture Best Yun Tang From: John Smith Sent: Thursday, June 13, 2019 21:18 To: Yun Tang Cc: user Subject: Re: What happens when: high-availability.storageDir: is not available? Thanks. Does this folder need to available for task

Re: java.lang.NoClassDefFoundError --- FlinkKafkaConsumer

2019-06-13 Thread Yun Tang
#appendix-template-for-building-a-jar-with-dependencies Best Yun Tang From: syed Sent: Thursday, June 13, 2019 16:28 To: user@flink.apache.org Subject: java.lang.NoClassDefFoundError --- FlinkKafkaConsumer hi I am trying to add a kafka source to the standard WordCount

Re: What happens when: high-availability.storageDir: is not available?

2019-06-12 Thread Yun Tang
failed. The frequency of this directory updated mainly depends on the checkpoint interval. Best Yun Tang From: John Smith Sent: Monday, June 10, 2019 23:55 To: user Subject: Re: What happens when: high-availability.storageDir: is not available? Or even how often

Re: Savepoint status check fails with error Operation not found under key

2019-06-12 Thread Yun Tang
. If possible, you could search job manager log during that time when you trigger the savepoint and share it here. Best Yun Tang From: anaray Sent: Wednesday, June 12, 2019 5:35 To: user@flink.apache.org Subject: Savepoint status check fails with error Operation not found

Re: [External] Flink 1.7.1 on EMR metrics

2019-05-30 Thread Yun Tang
Hi Padarn If you want to verify why no metrics sending out, how about using the built-in Slf4j reporter [1] which would record metrics in logs. If you could view the metrics after enabled slf4j-reporter, you could then compare the configurations. Best Yun Tang [1] https://ci.apache.org

Re: flink metrics的 Reporter 问题

2019-05-15 Thread Yun Tang
Hi 嘉诚 不清楚你使用的Flink具体版本,不过这个显示host-name第一部分的逻辑是一直存在的,因为大部分场景下host-name只需要取第一部分即可表征。具体实现代码可以参阅 [1] 和 [2] 。 受到你的启发,我创建了一个JIRA [3] 来追踪这个问题,解法是提供一个metrics options,使得你们场景下可以展示metrics的完整hostname 祝好 唐云 [1]

Re: Flink and Prometheus setup in K8s

2019-05-14 Thread Yun Tang
the cluster in k8s. Best Yun Tang From: Wouter Zorgdrager Sent: Monday, May 13, 2019 20:16 To: user Subject: Flink and Prometheus setup in K8s Hey all, I'm working on a deployment setup with Flink and Prometheus on Kubernetes. I'm running into the

Re: Rocksdb作为状态后端启动时报错

2019-05-10 Thread Yun Tang
Hi Root cause其实是最后一行 ”Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.hdfs.protocol.HdfsConstants“ 与rocksDB无关,检查一下运行时classpath里面有没有这个类,可以先确认一下flink-shaded-hadoop2-xx.jar 在不在你的classpath里面。 祝好 唐云 From: zhang yue Sent:

Re: IllegalArgumentException with CEP & reinterpretAsKeyedStream

2019-05-06 Thread Yun Tang
Hi Averell Would you please share the Flink web graph UI to illustrate the change after you append a map operator? Best Yun Tang From: Le-Van Huyen Sent: Monday, May 6, 2019 11:15 To: Yun Tang Cc: user@flink.apache.org Subject: Re: IllegalArgumentException

Re: IllegalArgumentException with CEP & reinterpretAsKeyedStream

2019-05-05 Thread Yun Tang
stream contains exactly what down stream task conatains. Best Yun Tang [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream [2] https://github.com/apache/flink/blob

Re:[State Backend] 请教个问题,checkpoint恢复失败

2019-05-02 Thread Yun Tang
Hi 错误栈是恢复state时候,读取的stream被关闭了,如果HDFS本身没有出问题的话,这个应该不是root cause,日志里面还有其他异常么? 祝好 唐云 发自我的小米手机 在 eric ,2019年4月30日 16:30写道: 大家好: 刚接触flink, 跑了个测试state checkpoint的程序: 1) 数据源是socket模式,用的是keyed state backend; 提交job跑一会 2) 然后关闭数据源的socket,这时job会进入failed状态 3) 停几秒,把数据源socket重新打开 4) 这时flink会重连socket,

Re: 使用hdfs保存checkpoint一段时间后报错

2019-04-30 Thread Yun Tang
Hi 志鹏 核心原因是HDFS的问题 Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /flink/checkpoints/PaycoreContextHopJob/cbb3a580d0323fbace80e71a25c966d0/chk-11352/fc4b8b08-2c32-467c-a1f4-f384eba246ff could only be replicated to 0 nodes instead of minReplication (=1). There are

Re: How to let Flink 1.7.X run Flink session cluster on YARN in Java 7 default environment

2019-04-27 Thread Yun Tang
Hi Zhangjun Thanks for your reply! However, Flink user mailing list is tracked in English and user-zh mailing list is specific for Chinese. Reply in Chinese in flink user mailing list would be somehow unfriendly for those non-Chinese speakers. I think your reply could be translated as

Re: Hbase Connector failed when deployed to yarn

2019-04-11 Thread Yun Tang
Hi I believe this is the same problem which reported in https://issues.apache.org/jira/browse/FLINK-12163 , current work around solution is to put flink-hadoop-compatibility jar under FLINK_HOME/lib. Best Yun Tang From: hai Sent: Thursday, April 11, 2019 21:06

Re: Metrics for received records per TaskManager

2019-04-02 Thread Yun Tang
is `tm_id` (task manager id). And if you group this metrics by `tm_id` to a specific task manager node, you would view received bytes from local at that task manager. [1] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter Best Yun Tang

Re: Metrics for received records per TaskManager

2019-04-02 Thread Yun Tang
to query I'm afraid you cannot directly know the received records per task manager, and you have to gather these metrics per task. Best Yun Tang From: Benjamin Burkhardt Sent: Tuesday, April 2, 2019 21:56 To: user@flink.apache.org; Yun Tang Subject: Re: Metrics

Re: Metrics for received records per TaskManager

2019-04-02 Thread Yun Tang
the information by each TaskManager, please group the metrics by tag 'tm_id'. You could refer to https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#io for more information. Best Yun Tang From: Benjamin Burkhardt Sent: Tuesday, April 2

Re: flink ha hdfs目录权限问题

2019-04-01 Thread Yun Tang
0 2019-04-01 15:12 /flink/ha/dee74bc7-d450-4fb4-a9f2-4983d1f9949f drwxrwxrwx - hdfs hdfs 0 2019-04-01 15:13 /flink/ha/edd59fcf-8413-4ceb-92cf-8dcd637803f8 drwxrwxrwx - hdfs hdfs 0 2019-04-01 15:13 /flink/ha/f6329551-56fb-4c52-a028-51fd838c4af6 > 在 2019年4月1日,下午4:02,

Re: flink ha hdfs目录权限问题

2019-04-01 Thread Yun Tang
Hi 孙森, 将提交用户root加到hadoop的hdfs用户组内,或者使用hadoop的hdfs用户提交程序[1],或者修改整个目录HDFS:///flink/ha的权限[2] 放开到任意用户应该可以解决问题,记得加上 -R ,保证对子目录都生效。 [1] https://stackoverflow.com/questions/11371134/how-to-specify-username-when-putting-files-on-hdfs-from-a-remote-machine [2]

Re: [DISCUSS] Remove forceAvro() and forceKryo() from the ExecutionConfig

2019-03-26 Thread Yun Tang
ExecutionConfig. Best Yun Tang [1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/custom_serializers.html From: Konstantin Knauf Sent: Tuesday, March 26, 2019 17:37 To: Stephan Ewen Cc: Yun Tang; dev; user Subject: Re: [DISCUSS] Remove forceAvro

Re: RocksDB中指定nameNode 的高可用

2019-03-26 Thread Yun Tang
Hi Flink高可用相关配置的存储目录,当存储路径配置成HDFS时,相关namenode高可用性由HDFS支持,对上层完全透明。 祝好 唐云 From: 戴嘉诚 Sent: Tuesday, March 26, 2019 16:57 To: user-zh@flink.apache.org Subject: RocksDB中指定nameNode 的高可用   嘿,我想询问一下,flink中的RocksDB位置

Re: What is Flinks primary API language?

2019-03-26 Thread Yun Tang
Scala free. But for API level, from my point of view, I have never heard any plan to stop supporting Scala. Best Yun Tang From: Ilya Karpov Sent: Tuesday, March 26, 2019 15:21 To: user@flink.apache.org Subject: What is Flinks primary API language? Hello, our dev

Re: [DISCUSS] Remove forceAvro() and forceKryo() from the ExecutionConfig

2019-03-26 Thread Yun Tang
for enableForceAvro() and enableForceKryo()? I think if https://issues.apache.org/jira/browse/FLINK-11917 merged, we could support to migrate state which was POJO but serialized using Kryo. Best Yun Tang From: Stephan Ewen Sent: Tuesday, March 26, 2019 2:31

Re: Reserving Kafka offset in Flink after modifying app

2019-03-26 Thread Yun Tang
/projects/flink/flink-docs-stable/ops/state/savepoints.html#assigning-operator-ids [2] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state Best Yun Tang From: Son Mai Sent: Tuesday, March 26, 2019 9:51

Re: flink 1.7.2集群异常退出

2019-03-12 Thread Yun Tang
Hi 你是不是没有配置checkpoint path,且没有显式的配置FsStateBackend或者RocksDBStateBackend,这应该是一个MemoryStateBackend 在配置HA却没有配置checkpoint path时候的bug,参见我之前创建的JIRA https://issues.apache.org/jira/browse/FLINK-11107 相关PR已经提交了,不过社区认为MemoryStateBackend更多的是debug用 或者

Re: estimate number of keys on rocks db

2019-03-10 Thread Yun Tang
Hi Avi Unfortunately, we cannot see the attached images. By the way, did you ever use window in this job? Best Yun Tang From: Avi Levi Sent: Sunday, March 10, 2019 19:41 To: user Subject: estimate number of keys on rocks db Hi, I am trying to estimate number

Re: Is taskmanager.heap.mb a valid configuration parameter in 1.7?

2019-03-07 Thread Yun Tang
Hi Yes, `taskmanager.heap.mb` is deprecated but still useful to keep backward comparability. I have already created an issue https://issues.apache.org/jira/browse/FLINK-11860 to move these deprecated options in documentation. Best Yun Tang From: anaray

Re: How to monitor Apache Flink in AWS EMR (ElasticMapReduce)?

2019-03-06 Thread Yun Tang
solutions. [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html#custom-emr-installation Best Yun Tang From: Jack Tuck Sent: Thursday, March 7, 2019 3:39 To: user@flink.apache.org Subject: How to monitor Apache Flink in AWS EMR

<    1   2   3   4   5   6   >