Re: question on ValueState

2021-02-08 Thread yidan zhao
I have a related question. Since fileStateBackend uses heap as the state storage and the checkpoint is finally stored in the filesystem, so whether the JobManager/TaskManager memory will limit the state size? The state size is limited by TM's memory * number of TMs? or limited by JM's memory.

Re: question on ValueState

2021-02-08 Thread yidan zhao
What I am interested in is whether I should use rocksDB to replace fileBackend. RocksDB's performance is not good, while it's state size can be very large. Currently, my job's state is about 10GB, and I use 10 TaskManagers in different machines, each 100G memory. I do not think I should use

Re: Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem

2021-02-08 Thread Arvid Heise
Hi Jan, Another solution is to insert Heartbeat-events at the source for each sensor. The solution is very similar to how to advance watermarks when there are no elements in the respective source partition. However, it's only easy to implement if you have your own source and know all sensors on

Re: 关于非keyedstream使用定时器问题

2021-02-08 Thread yidan zhao
当然,如果是 randomeKey %30 这样,虽然最终效果差不多,但却导致30个sink batch可能都集中到某几个并发实例上。 yidan zhao 于2021年2月9日周二 下午3:22写道: > 引入随机key最大问题是,本身希望batch方式sink,key太随机压根无法batch。 > 如果randomKey%1024这样分桶到1024的话,也不行,会导致每个bucket中数据量太少,进而基本都是timeout而sink,而不是达到batchSize,换言之,每次sink都会有1024个并发sink次数。 >

Re: 关于非keyedstream使用定时器问题

2021-02-08 Thread yidan zhao
引入随机key最大问题是,本身希望batch方式sink,key太随机压根无法batch。 如果randomKey%1024这样分桶到1024的话,也不行,会导致每个bucket中数据量太少,进而基本都是timeout而sink,而不是达到batchSize,换言之,每次sink都会有1024个并发sink次数。 实际后端的存储可能不期望这么高并发,本身做batch的目的就是希望降低sink次数。 我希望的是按照并发度(比如30并发度)就每次sink30次(或超过30次,因为可能>batchSize,如果都 于2021年2月9日周二 下午3:04写道: >

关于非keyedstream使用定时器问题

2021-02-08 Thread yidan zhao
如题,当前flink不支持非keyedStream使用定时器,不清楚有啥解决方法吗? 目前我实现一个sink,带超时希望用到timerservice。但是不支持。 同时不希望使用keyedStream,因为会导致数据不均衡。 除了引入随机key外还有什么方法吗。

Re: Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem

2021-02-08 Thread Yun Gao
Hi, I also think there should be different ways to achieve the target. For the first option listed previously, the pseudo-code roughly like class MyFunciton extends KeyedProcessFunction { ValueState count; void open() { count = ... // Create the value state } ​void

Re: Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

2021-02-08 Thread Yun Gao
Hi, Have you also include the kakfa-connector related jar in the classpath? Best, Yun --Original Mail -- Sender:joris.vanagtmaal Send Date:Tue Feb 9 03:16:52 2021 Recipients:User-Flink Subject:Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

Re: fink on yarn per job container 被杀

2021-02-08 Thread key lou
谢谢。看了相关文章 和邮件列表类似的问题,中心思路都是调大堆外内存。 还是有几个疑问 1、在 flink 1.10 中 在state 不断增长的情况下 是否没办法控制 rocksdb 内存的增长? 导致 有container 被 kill 的风险。rocksdb 没有当内存不足时就clear 内存刷磁盘的动作? 2、当使用 rocksdbStateBackend 时 如果配置的是 hdfs 路径。rocksdb 是否还会有本地文件生成。在 tm 节点上一直没有找到相关文件。 zhiyezou <1530130...@qq.com> 于2021年2月7日周日 上午9:41写道: >

Re: Flink standalone on k8s HA异常

2021-02-08 Thread Yang Wang
启用HA以后,你需要创建一个有create/watch ConfigMap的权限的service account 然后挂载给JobManager和TaskManager 从你的报错看应该是没有配置service account Best, Yang casel.chen 于2021年2月9日周二 上午12:10写道: > 我试着答k8s上部署flink > standalone集群,做HA之前集群是能够正常work的,在做HA的时候发现在configmap中添加了如下两个HA配置后JM就会抛异常,这是为什么? > > > high-availability: >

Re: Native Kubernetes annotation parsing problem

2021-02-08 Thread Yang Wang
If you are setting the config options in flink-conf.yaml, then you could directly add the following example. *kubernetes.jobmanager.annotations: iam.amazonaws.com/role:'arn:aws:iam:::role/XX/ '* However, if you are

Re: Flink SQL temporal table join with Hive 报错

2021-02-08 Thread macia kk
SELECT *FROM ( SELECT tt.* FROM input_tabe_01 tt FULL OUTER JOIN input_tabe_02 mt ON (mt.transaction_sn = tt.reference_id) and tt.create_time >= mt.create_time + INTERVAL '5' MINUTES and tt.create_time <=

Re: Enabling allowNonRestoredState when resuming from a savepoint causes ClassNotFoundException

2021-02-08 Thread Dongwon Kim
Hi Khachatryan, Thanks for the explanation and the input! 1. Use the State Processor API to create a new snapshot [1] I haven't used it. but does the API prevent the class of a specific serializer from being loaded? 2. If the operator has only this state then changing uid (together with >

Re: Flink SQL temporal table join with Hive 报错

2021-02-08 Thread Rui Li
Hi, 那join的语句是怎么写的呢? On Mon, Feb 8, 2021 at 2:45 PM macia kk wrote: > 图就是哪个报错 > > 建表语句如下,表示公共表,我也没有改的权限. > > CREATE EXTERNAL TABLE `exchange_rate`(`grass_region` string COMMENT > 'country', `currency` string COMMENT 'currency', `exchange_rate` > decimal(25,10) COMMENT 'exchange rate') >

Re: pyFlink UDF Caused by: java.lang.RuntimeException: Failed to create stage bundle factory! INFO:root:Initializing python harness:

2021-02-08 Thread Dian Fu
这个问题应该有人问过,你搜搜看。 另外,如果GC频繁的话,把内存调大看看~ > 在 2021年2月8日,下午5:14,陈康 <844256...@qq.com> 写道: > > 感谢回复...切换了版本...运行报错如下图 。。 > [hadoop@hadoop01 bin]$ pip list | grep flink > apache-flink 1.11.1 > > Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager > with id

Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

2021-02-08 Thread joris.vanagtmaal
Traceback (most recent call last): File "streaming-dms.py", line 309, in anomalies() File "streaming-dms.py", line 142, in anomalies t_env.sql_query(query).insert_into("ark_sink") File "/Users/jag002/anaconda3/lib/python3.7/site-packages/pyflink/table/table_environment.py", line

Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

2021-02-08 Thread joris.vanagtmaal
I'm trying to read data from my eventhub in Azure, but i end up with the Flink error message 'findAndCreateTableSource failed' using Flink 1.13-Snapshot source_ddl = f"""CREATE TABLE dms_source( x_value VARCHAR ) WITH (

Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem

2021-02-08 Thread Khachatryan Roman
Hi, Probably another solution would be to register a timer (using KeyedProcessFunction) once we see an element after keyBy. The timer will fire in windowIntervalMs. Upon firing, it will emit a dummy element which will be ignored (or subtracted) in the end. Upon receiving each new element, the

Any plans to make Flink configurable with pure data?

2021-02-08 Thread Pilgrim Beart
To a naive Flink newcomer (me) it's a little surprising that there is no pure "data" mechanism for specifying a Flink pipeline, only "code" interfaces. With the DataStream interface I can use Java, Scala or Python to set up a pipeline and then execute it - but that doesn't really seem to *need *a

Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

2021-02-08 Thread Khachatryan Roman
Hi, Could you provide the exception stack trace? Regards, Roman On Mon, Feb 8, 2021 at 3:46 PM joris.vanagtmaal < joris.vanagtm...@wartsila.com> wrote: > I'm trying to read data from my eventhub in Azure, but i end up with the > Flink error message 'findAndCreateTableSource failed' > > using

Re: Enabling allowNonRestoredState when resuming from a savepoint causes ClassNotFoundException

2021-02-08 Thread Khachatryan Roman
Hi, I'm pulling Yun Tang who is familiar with StateBackends and RocksDB in particular. >From what I see, the 2nd snapshot (sp2) is built using the same set of states obtained from the starting savepoint/checkpoint (sp1) to write its metadata. This metadata includes serializers snapshots,

Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

2021-02-08 Thread meneldor
Thanks for the quick reply, Timo. Ill test with the row_ts and compaction mode suggestions. However, ive read somewhere in the archives that the append only stream is only possible if i extract "the first" record from the ranking only which in my case is the oldest record. Regards On Mon, Feb

Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

2021-02-08 Thread Timo Walther
Hi, could the problem be that you are mixing OVER and TUMBLE window with each other? The TUMBLE is correctly defined over time attribute `row_ts` but the OVER window is defined using a regular column `upd_ts`. This might be the case why the query is not append-only but updating. Maybe you

Flink standalone on k8s HA异常

2021-02-08 Thread casel.chen
我试着答k8s上部署flink standalone集群,做HA之前集群是能够正常work的,在做HA的时候发现在configmap中添加了如下两个HA配置后JM就会抛异常,这是为什么? high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.storageDir: oss:///odps-prd/rtdp/flink/recovery 2021-02-09 00:03:04,421 ERROR

Re: Native Kubernetes annotation parsing problem

2021-02-08 Thread Kevin Kwon
I think it will be more generic question of how I inject IAM roles in Native Kubernetes pods I'm using Kubeiam and seems the namespace annotation doesn't work On Mon, Feb 8, 2021 at 2:30 PM Kevin Kwon wrote: > Hi team, I'm using Native Kubernetes annotation config > > >

Re: Enabling allowNonRestoredState when resuming from a savepoint causes ClassNotFoundException

2021-02-08 Thread Dongwon Kim
Hi 张静, Q1: By default, a savepoint restore will try to match all state > back to the restored job. `AllowNonRestoredState` cannot avoid > recovery all state from savepoint, but only skip match all of the > restore state back to the restored job. So `ClassNotFoundException ` > could not be

Joining and Grouping Flink Tables with Java API

2021-02-08 Thread Abdelilah CHOUKRI
Hi, We're trying to use Flink 1.11 Java tables API to process a streaming use case: We have 2 streams, each one with different structures. Both events, coming from Kafka, can be: - A new event (not in the system already) - An updated event (updating an event that previously was inserted) so we

Re: Table Cache Problem

2021-02-08 Thread Yongsong He
thanks for your help,Timo,it is very helpful 在 2021年2月8日星期一,Timo Walther 写道: > Hi Yongsong, > > in newer Flink versions we introduced the concept of statament sets, which > are available via `TableEnvironment.createStatementSet()`. They allow you > to opimized a branching pipeline as a whole

Enabling allowNonRestoredState when resuming from a savepoint causes ClassNotFoundException

2021-02-08 Thread Dongwon Kim
Hi, I have an original job (say v1) and I want to start a new job (say v2) from a savepoint of v1. An operator of v1 used to have per-key states of a POJO type, but I want to remove the states together with the definition of the POJO type. When I start v2 from a savepoint of v1, I specified

??????flink????join??????????????????

2021-02-08 Thread Mailbox service
??---- ??:lxk7...@163.com

Native Kubernetes annotation parsing problem

2021-02-08 Thread Kevin Kwon
Hi team, I'm using Native Kubernetes annotation config *kubernetes.jobmanager.annotations* and I'm facing some problem with parsing. I use annotation *iam.amazonaws.com/role:'arn:aws:iam:::role/XX/ '* but seems

flink双流join如何确保数据不丢失

2021-02-08 Thread lxk7...@163.com
目前在使用flink进行双流join,多是使用interval join,根据经验值给定时间间隔,那么该如何保证数据不丢失呢? 如果数据晚于这个时间间隔,那么数据就被丢弃了,而我做的是关于订单的数据,这是不被允许的。 lxk7...@163.com

回复: flink升级hadoop3

2021-02-08 Thread lxk7...@163.com
不知道你的问题具体是指什么意思。 如果是升级hadoop的话,直接将flink下的配置文件中关于hadoop的jar包改成hadoop3的就行了 lxk7...@163.com 发件人: kandy.wang 发送时间: 2021-02-07 10:27 收件人: user-zh 主题: flink升级hadoop3 flink 如何升级hadoop3 ?

Re: Cannot connect to queryable state proxy

2021-02-08 Thread Khachatryan Roman
Hi ChangZhuo, Queryable state is exposed on the same address as the TM RPC. You can change this address by modifying taskmanager.host [1]. However, I'm not sure if setting it to 127.0.0.1 or localhost will not break connectivity with the other components. [1]

Re: pyFlink UDF Caused by: java.lang.RuntimeException: Failed to create stage bundle factory! INFO:root:Initializing python harness:

2021-02-08 Thread 陈康
感谢回复...切换了版本...运行报错如下图 。。 [hadoop@hadoop01 bin]$ pip list | grep flink apache-flink 1.11.1 Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 397c590a9c19b173a83a4476f8eeaca0 timed out. ... 26 more

???? Yarn application state ?? Flink Job status ????????????

2021-02-08 Thread ??????
Yarn App NEW,NEW_SAVING,SUBMITTED,ACCEPTED,RUNNING,FINISHED,FAILED,KILLED ?? Flink Job CREATED,RUNNING,FAILING,FAILED,CANCELLING,CANCELED,FINISHED,RESTARTING,SUSPENDED,RECONCILING ?? Flink Job?? Yarn app ??

Re: question on ValueState

2021-02-08 Thread Khachatryan Roman
Hi, I think Yun Tang is right, HeapStateBackend doesn't (de)serialize the value on update. As for "value()", it may (de)serialize it and return a copy if there is an ongoing async snapshot in progress (to protect from modifications). This shouldn't happen often though. Regards, Roman On Mon,

Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

2021-02-08 Thread Khachatryan Roman
Hi, AFAIK this should be supported in 1.12 via FLINK-19568 [1] I'm pulling in Timo and Jark who might know better. https://issues.apache.org/jira/browse/FLINK-19857 Regards, Roman On Mon, Feb 8, 2021 at 9:14 AM meneldor wrote: > Any help please? Is there a way to use the "Last row" from a

Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem

2021-02-08 Thread Jan Brusch
Hi Yun, thanks for your reply. I do agree with your point about standard windows being for high level operations and the lower-level apis offering a rich toolset for most advanced use cases. I have tried to solve my problem with keyedProcessFunctions also but was not able to get it to work

Re: Table Cache Problem

2021-02-08 Thread Timo Walther
Hi Yongsong, in newer Flink versions we introduced the concept of statament sets, which are available via `TableEnvironment.createStatementSet()`. They allow you to opimized a branching pipeline as a whole with reusing subplans. In older Flink versions, you can convert the Table to a

Re: Jobmanager stopped because uncaught exception

2021-02-08 Thread Khachatryan Roman
Hi, The open issue you mentioned (FLINK-21053) is about preventing potential issues in the future. The issue you are experiencing is most likely FLINK-20992 as Yang Wang said. So upgrading to 1.12.2 should solve the problem. Regards, Roman On Mon, Feb 8, 2021 at 9:05 AM Lei Wang wrote: > I

Re: pyFlink UDF Caused by: java.lang.RuntimeException: Failed to create stage bundle factory! INFO:root:Initializing python harness:

2021-02-08 Thread Dian Fu
看起来似乎是因为Flink集群的版本和PyFlink的版本不一致导致的:集群装的Flink是1.11.1,PyFlink是1.12.0? 先把版本都统一一下,再试试。 > 在 2021年2月8日,上午10:28,陈康 <844256...@qq.com> 写道: > > 请教大佬们: 一个最简单pyflink UDF跑起来,报 Failed to create stage bundle factory! > INFO:root:Initializing python harness: 在IdeaIJ上可以运行、大家有遇到过吗?谢谢~ > >

Re: Flink upset-kaka connector not working with Avro confluent

2021-02-08 Thread Till Rohrmann
Hi Shamit, thanks for reaching out to the community. I am pulling in Timo who might know more about this problem. Cheers, Till On Sun, Feb 7, 2021 at 6:22 AM shamit jain wrote: > Hello Team, > > I am facing issue with "upsert-kafka" connector which should read the Avro > message serialized

Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

2021-02-08 Thread meneldor
Any help please? Is there a way to use the "Last row" from a deduplication in an append-only stream or tell upsert-kafka to not produce *null* records in the sink? Thank you On Thu, Feb 4, 2021 at 1:22 PM meneldor wrote: > Hello, > Flink 1.12.1(pyflink) > I am deduplicating CDC records coming

Re: Jobmanager stopped because uncaught exception

2021-02-08 Thread Lei Wang
I see there's a related issue https://issues.apache.org/jira/browse/FLINK-21053 which is still open. Does it mean the similar issue will still exist even if i upgrade to 1.12.2 ? Thanks, Lei On Mon, Feb 8, 2021 at 3:54 PM Yang Wang wrote: > Maybe it is a known issue[1] and has already been