Re: Flink AskTimeoutException killing the jobs

2020-07-03 Thread M Singh
Hi Xintong/LakeShen: We have the following setting in flink-conf.yaml akka.ask.timeout: 180 s akka.tcp.timeout: 180 s But still see this exception.  Are there multiple akka.ask.timeout or additional settings required ? Thanks Mans On Friday, July 3, 2020, 01:08:05 AM EDT, Xintong Song

Re: Integrating prometheus

2020-07-03 Thread Manish G
Hi, I am basically looking for : throughput, success rate, error rate. For experimental purposes I could complete all configurations as explained in the official documentation. But somehow my custom metrics(a simple Counter) is still not shown on the prometheus board, though default metrics I

Re: Integrating prometheus

2020-07-03 Thread Manish G
Also, it seems custom metrics can only be recorded if we extend RichFunction, as it allows us to override open wherein we can get hold of context and metrics constructs. Please let me know if there are other ways too. On Fri, Jul 3, 2020 at 10:05 PM Manish G wrote: > Hi, > > I am basically

Re: Stateful Functions: Routing to remote functions

2020-07-03 Thread Igal Shilman
Hi Jan, Judging by the exception message it seems like the function type "demo/eventCounterPython" is not known to stateful functions. This could happen if the module.yaml (provided in your email) was accidentally excluded from the resulting artifact (Docker image or a jar-with-dependencies) Can

Re: Task recovery?

2020-07-03 Thread John Smith
Here is one log https://www.dropbox.com/s/s8uom5uto708izf/flink-job-001.log?dl=0 If I understand correctly on June 23rd it suspended the jobs? So at that point they would no longer show in the UI or be restarted? On Fri, 3 Jul 2020 at 12:05, John Smith wrote: > I didn't restart the job

Re: Stateful Functions: Routing to remote functions

2020-07-03 Thread Jan Brusch
Hi Igal, thanks for your reply. Initially I thought the same thing, but it turns out I am able to call the remote function from an embedded "wrapper" function using the exact same setup (Relevant Code below). So that's one kind of solution to that Problem. But to me it seems like it's a bit

Re: Task recovery?

2020-07-03 Thread John Smith
I didn't restart the job manager. Let me see if I can dig up the logs... Also I just realised it's possible that the retry attempts to recover may have been exhausted.

Re: Integrating prometheus

2020-07-03 Thread Chesnay Schepler
Yes, you do need to extend RichFunction; there's no way around that. As for the missing metric, the usual cause is that the job/task finishes so quickly that the metric is never reported. If this is not the cause I would recommend enabling DEBUG logging and searching for warnings from the

Re: Timeout when using RockDB to handle large state in a stream app

2020-07-03 Thread Yun Tang
Hi Felipe, I noticed my previous mail has a typo: RocksDB is executed in task main thread which does not take the role to respond to heart beat. Sorry for previous typo, and the key point I want to clarify is that RocksDB should not have business for heartbeat problem. Best Yun Tang

Re: Question about RocksDB performance tunning

2020-07-03 Thread Yun Tang
Hi Peter This is a general problem and you could refer to RocksDB's tuning guides[1][2], you could also refer to Flink built-in PredefinedOptions.java [3]. Generally speaking, increase write buffer size to reduce write amplification, increase the parallelism of keyed operator to share the

Re: Timeout when using RockDB to handle large state in a stream app

2020-07-03 Thread Felipe Gutierrez
yes. I agree. because RocsDB will spill data to disk if there is not enough space in memory. Thanks -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Fri, Jul 3, 2020 at 8:27 AM Yun Tang wrote: > > Hi Felipe, > > I noticed my previous mail has a

Re: Parquet data stream group converter error

2020-07-03 Thread Khachatryan Roman
Hi, > MessageType schema = reader.getFooter().getFileMetaData().getSchema(); The first thing I'd suggest is to verify that the file contains a valid schema and can be read by some other program, e.g. parquet-tools schema or cat [1]. Regards, Roman On Thu, Jul 2, 2020 at 11:36 PM Jesse Lord

Re: Flink Kafka connector in Python

2020-07-03 Thread Manas Kale
Hi Xingbo, Thanks for the reply, I didn't know that a table schema also needs to be declared after the connect or but I understand now. I have another question: how do I write the parsing schemas for a field that itself is a valid JSON string? For example: { "monitorId": 865, "deviceId":

Integrating prometheus

2020-07-03 Thread Manish G
Hi, I am following this link on how to integrate prometheus with flink. Going by the code sample, I would need to insert related metrics code in the main logic. Is it avoidable, like by using some annotations on

Re: Checkpoint is disable, will history data in rocksdb be leak when job restart?

2020-07-03 Thread SmileSmile
Hi,yun tang I dont open checkpoint,so when my job restart,flink how to clean history state? my pod be killed only happend after the job restart again and again, in this case ,I have to rebuild the flink cluster 。 | | a511955993 | | 邮箱:a511955...@163.com | 签名由 网易邮箱大师 定制 On 07/03/2020

Re: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?

2020-07-03 Thread wangl...@geekplus.com.cn
Hi Xintong, Yes, initializing the metric in the `open` method works, but it doesn't solve my problem. I want to initialize the metric with a name that is extracted from the record content. Only in the `invoke` method i can do it. Actually my scenario is as follows. The record is MySQL binlog

Re: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?

2020-07-03 Thread wangl...@geekplus.com.cn
Seems there's no direct solution. Perhaps i can implement this by initializing a HashMap with all the possible value of tableName in `open` mehtod and get the corresponding Meter according to tableName in the `invoke` method. Thanks, Lei wangl...@geekplus.com.cn Sender:

Re: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?

2020-07-03 Thread Xintong Song
Ok, I see your problem. And yes, keeping a map of metrics should work. Just for double checking, I assume there's an upper bound of your map keys (table names)? Because if not, an infinitely increasing in-memory map that is not managed by Flink's state might become problematic. Thank you~

Checkpoint is disable, will history data in rocksdb be leak when job restart?

2020-07-03 Thread SmileSmile
Hi My job work on flink 1.10.1 with event time , container memory usage will rise 2G after one restart,then pod will be killed by os after some times restart。 I find history data will be cleared when new data arrive, call the function onEventTime() to clearAllState.But my job no need

Re: Checkpoint is disable, will history data in rocksdb be leak when job restart?

2020-07-03 Thread Yun Tang
Hi If your job does not need checkpoint, why you would still restore your job with checkpoints? Actually, I did not total understand what you want, are you afraid that the state restored from last checkpoint would not be cleared? Since the event timer is also stored in checkpoint, after you

Re: Question about RocksDB performance tunning

2020-07-03 Thread Peter Huang
Hi Yun, Thanks for the info. These materials help a lot. Best Regards Peter Huang On Thu, Jul 2, 2020 at 11:36 PM Yun Tang wrote: > Hi Peter > > This is a general problem and you could refer to RocksDB's tuning > guides[1][2], you could also refer to Flink built-in PredefinedOptions.java >

Re: [DISCUSS] Drop connectors for 5.x and restart the flink es source connector

2020-07-03 Thread Robert Metzger
The discussion on dropping the ES5 connector was not conclusive, when we discussed it in February 2020. We wanted to revisit it for the 1.12 release. >From maven central, we have the following download numbers ES2: 500 downloads ES5: 10500 downloads (the es5_2.10:1.3.1 had 8000 downloads last

flink interval join????????????????

2020-07-03 Thread ????(Bob Hu)
flink?? interval join??window groupleft join ??select * from a,b where a.id=b.id and b.rowtime between a.rowtime and a.rowtime + INTERVAL '1' HOUR

Re: Checkpoint is disable, will history data in rocksdb be leak when job restart?

2020-07-03 Thread Yun Tang
Hi If you do not enable checkpoint and have you ever restored checkpoint for the new job. As what I have said, the timer would also be restored and the event time would also be triggered so that following onEventTime() could also be triggered to clean history data. For the 2nd question, why

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

2020-07-03 Thread Si-li Liu
Thanks for your help 1. I started the job from scratch, not a savepoint or externalized checkpoint 2. No job graph change 3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 4. My Flink version is 1.9.1 Khachatryan Roman 于2020年7月3日周五 下午4:49写道: > I still wasn't able to reproduce

Re: Avro from avrohugger still invalid

2020-07-03 Thread Aljoscha Krettek
Hi, I don't think there's a workaround, except copying the code and manually fixing it. Did you check out my comment on the Jira issue and the new one I created? Best, Aljoscha On 03.07.20 07:19, Georg Heiler wrote: But would it be possible to somehow use AvroSerializer for now? Best,

Custom service configs in flink

2020-07-03 Thread Jaswin Shah
I have multiple flink jobs and have custom business configs which are shared between the job. Is it possible if one flink job loads configs in memory and all the flink jobs share the same configs? Basically, I am thinking to fetch configs in one flink job in memory via rest call which is one

Re: Issue with job status

2020-07-03 Thread Robert Metzger
Hi Bhaskar, The definition of when a job is marked as RUNNING in Flink is debatable. For a streaming job, RUNNING is when all tasks are running, however for a batch job, if some tasks are running, it is RUNNING already. Since the scheduler does not distinguish between these types of jobs,

Re: Task recovery?

2020-07-03 Thread Robert Metzger
Hi John, did you also restart the JobManager, or just the TaskManagers? In either case, they should recover. Do you still have the JobManager logs around, so that we can analyze them? On Thu, Jun 25, 2020 at 6:40 PM John Smith wrote: > Hi running 1.10.0 > > 3 Zookeepers > 3 Job Nodes > 3 Task

Re: Dockerised Flink 1.8 with Hadoop S3 FS support

2020-07-03 Thread Yang Wang
Hi Lorenzo, Since Flink 1.8 does not support plugin mechanism to load filesystem, you need to copy flink-s3-fs-hadoop-*.jar from opt to lib directory. The dockerfile could be like following. FROM flink:1.8-scala_2.11 RUN cp /opt/flink/opt/flink-s3-fs-hadoop-*.jar /opt/flink/lib Then build you

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

2020-07-03 Thread Khachatryan Roman
I still wasn't able to reproduce the issue. Can you also clarify: - Are you starting the job from a savepoint or externalized checkpoint? - If yes, was the job graph changed? - What StreamTimeCharacteristic is set, if any? - What exact version of Flink do you use? Regards, Roman On Fri, Jul 3,

Re: Dynamic source and sink.

2020-07-03 Thread C DINESH
Hi paul, Thanks for the response. Can you point out an example of how to create a dynamic client or wrapper operator. Thanks and Regards, Dinesh. On Thu, Jul 2, 2020 at 12:28 PM Paul Lam wrote: > Hi Doinesh, > > I think the problem you meet is quite common. > > But with the current Flink

Re: Reading and updating rule-sets from a file

2020-07-03 Thread Lorenzo Nicora
Thanks Till, I understand making my FileInputFormat "unsplittable" guarantees a file is always read by a single task. But how can I produce a single record for the entire file? As my file is a CSV with some idiosyncrasies, I am extending CsvInputFormat not to reinvent the wheel of the CSV

Re: Custom service configs in flink

2020-07-03 Thread Robert Metzger
(oops, I accidentally responded to you personally only. The emails are supposed to go onto the list. I added the thread back to the list) But is the config so big that memory usage is a concern here? Also note, that the stuff that runs in main() is just generating a streaming execution plan,

Re: Integrating prometheus

2020-07-03 Thread Robert Metzger
Hi Manish, Currently, Flink's metric system does not support metrics via annotations. You need to go with the documented approach. But of course, you can try to build your own metrics abstraction based on Flink's metric system. On Fri, Jul 3, 2020 at 9:35 AM Manish G wrote: > Hi, > > I am

Re: Integrating prometheus

2020-07-03 Thread Chesnay Schepler
What metrics specifically are you interested in? On 03/07/2020 17:22, Robert Metzger wrote: Hi Manish, Currently, Flink's metric system does not support metrics via annotations. You need to go with the documented approach. But of course, you can try to build your own metrics abstraction based

Re: Dynamic partitioner for Flink based on incoming load

2020-07-03 Thread Robert Metzger
> This will mean 2 shuffles, and 1 node might bottleneck if 1 topic has too much data? Yes > Is there a way to avoid shuffle at all (or do only 1) and avoid a situation when 1 node will become a hotspot? Do you know the amount of data per kafka topic beforehand, or does this have to be dynamic?

Re: datadog failed to send report

2020-07-03 Thread Robert Metzger
Hi, could this be another symptom of this issue: https://issues.apache.org/jira/browse/FLINK-16611? I guess you'll have to ask DataDog to check at their end, maybe you are running into some rate limit there? On Fri, Jun 26, 2020 at 5:42 PM seeksst wrote: > > > 原始邮件 > *发件人:* seeksst > *收件人:*

Re:

2020-07-03 Thread Robert Metzger
Hi, For the others checking the user@ mailing list: It seems this problem will get resolved through [1]. [1] https://issues.apache.org/jira/browse/FLINK-18478 On Mon, Jun 29, 2020 at 1:23 PM Georg Heiler wrote: > Hi, > > I try to use the confluent schema registry in an interactive Flink Scala

Re: flink interval join后按窗口聚组问题

2020-07-03 Thread Benchao Li
Hi Bob, This is Flink user mailing list. Please send to this mailing list using english. If you want to use Chinese, you can send it to user...@flink.apache.org 元始(Bob Hu) <657390...@qq.com> 于2020年7月3日周五 下午3:29写道: > 您好,我想请教一个问题: > flink双流表 interval join后再做window group是不是有问题呢,有些left

回复: Flink sql 主动使数据延时一段时间有什么方案

2020-07-03 Thread Sun.Zhu
感谢benchao和forideal的方案, 方法1.使用udf,查不到 sleep 等一下在查 --这个可以尝试 方法2.在 join operator处数据等一会再去查 —我们使用的是flink sql,不是streaming,所以该方案可能行不通 方法3.如果没有 join 上,就把数据发到source,循环join。 --我们这个维表join的场景类似filter的功能,如果关联上则主流数据就不处理了,所以不一定非要join上,只是想延迟一会提升准确率 方法4.如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了

flink1.9读取阿里Mq问题

2020-07-03 Thread guanyq
flink1.9读取阿里RocketMQ 如何设置AccessKey,SecretKey 参数 finalRMQConnectionConfigconnectionConfig=newRMQConnectionConfig.Builder().setHost("localhost").setPort(5000)build();

回复:flink1.9读取阿里Mq问题

2020-07-03 Thread 李军
您好! 自定义source继承RichSourceFuntion.open() 里去构建Conumer 可以设置AccessKey,SecretKey 参数; 2020-7-4 | | 李军 | | hold_li...@163.com | 签名由网易邮箱大师定制 在2020年7月3日 23:44,guanyq 写道: flink1.9读取阿里RocketMQ 如何设置AccessKey,SecretKey 参数

Re: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?

2020-07-03 Thread zhisheng
我猜你是想要将 table name 作为一个标签方便后期分组查询过滤? wangl...@geekplus.com.cn 于2020年7月3日周五 上午10:24写道: > public void invoke(ObjectNode node, Context context) throws Exception { > > String tableName = node.get("metadata").get("topic").asText(); > Meter meter =

Re: flink1.9读取阿里Mq问题

2020-07-03 Thread zhisheng
hi,guanyq 社区版本的 Flink 应该默认没有和 RocketMQ 连接的 Connector,在 RocketMQ 的社区项目中看到和 Flink 整合的模块: https://github.com/apache/rocketmq-externals/tree/master/rocketmq-flink 你说的 AccessKey,SecretKey 参数应该是 ACL 权限校验,看了代码应该是不支持的,不过可以自己去进行扩展。 Best! zhisheng guanyq 于2020年7月3日周五 下午11:44写道: > flink1.9读取阿里RocketMQ

Re: 如何在窗口关闭的时候清除状态

2020-07-03 Thread zhisheng
你试试在 clear 方法中清理 18579099...@163.com <18579099...@163.com> 于2020年7月3日周五 下午2:02写道: > > 大家好,我有一个需求,我在ProcessWindowFunction算子中定义了一个valueState,我希望在窗口关闭的时候能够将状态清理。我应该在哪里清理呢? > > 1.刚开始我选择在ProcessWindowFunction算子的process方法中清理,但是这里会有一个问题,我事件时间窗口开1天,我写了一个trigger,每隔一个小时输出一次结果。 > >

Re: 回复:flink1.10 checkpoint 运行一段时间空指针异常

2020-07-03 Thread zhisheng
我们也有遇到过这个异常,但是不是很常见 Congxian Qiu 于2020年7月3日周五 下午2:08写道: > 你可以看看是否 FLINK-17479[1] 和你的问题一样,是的话,可以尝试修改一下 jdk 版本试试 > [1] https://issues.apache.org/jira/browse/FLINK-17479 > Best, > Congxian > > > 程龙 <13162790...@163.com> 于2020年7月1日周三 下午9:09写道: > > > > > > > > > > > > > > > > > > > > > > > > > > >

Re: Flink job不定期就会重启,版本是1.9

2020-07-03 Thread zhisheng
我们集群一般出现这种异常大都是因为 Full GC 次数比较多,然后最后伴随着就是 TaskManager 挂掉的异常 Xintong Song 于2020年7月3日周五 上午11:06写道: > 从报错信息看是 Akka 的 RPC 调用超时,因为是 LocalFencedMessage 所以基本上可以排除网络问题。 > 建议看一下 JM 进程的 GC 压力以及线程数量,是否存在压力过大 RPC 来不及响应的情况。 > > Thank you~ > > Xintong Song > > > > On Fri, Jul 3, 2020 at 10:48 AM noon cjihg

回复:kafkaf To mysql 写入问题

2020-07-03 Thread 郑斌斌
谢谢了,查了下jira, No.1在1.11中才用修复, 另外,目前我用的版本就是1.10 https://issues.apache.org/jira/browse/FLINK-15396 -- 发件人:Jingsong Li 发送时间:2020年7月3日(星期五) 14:29 收件人:user-zh ; 郑斌斌 主 题:Re: kafkaf To mysql 写入问题 Hi, 估计需要使用Flink 1.11。 1.JSON Format有参数控制

Re: 回复:rocksdb的block cache usage应该如何使用

2020-07-03 Thread Yun Tang
Hi 观察block cache usage的显示数值是否超过你的单个slot的managed memory,计算方法是 managed memory / slot数目,得到一个slot的managed memory,将该数值与block cache usage比较,看内存是否超用。重启之后容易被os kill,使用的是从savepoint恢复数据么? 祝好 唐云 From: SmileSmile Sent: Friday, July 3, 2020 14:20 To: Yun Tang Cc: Flink

回复:rocksdb的block cache usage应该如何使用

2020-07-03 Thread SmileSmile
Hi 作业只配置了重启策略,作业如果fail了,只会重启,没有恢复历史数据。 【作业一旦发生failover,state backend的数据都需要清空然后再启动的时候进行加载。】 我目前遇到的情况是作业fail重启,pod就很容易被os kill,只能重构集群解决。 详情可见 http://apache-flink.147419.n8.nabble.com/Checkpoint-td4406.html | | a511955993 | | 邮箱:a511955...@163.com | 签名由 网易邮箱大师 定制 在2020年07月03日 15:13,Yun Tang

Re: 回复:flink1.10 checkpoint 运行一段时间空指针异常

2020-07-03 Thread Congxian Qiu
你可以看看是否 FLINK-17479[1] 和你的问题一样,是的话,可以尝试修改一下 jdk 版本试试 [1] https://issues.apache.org/jira/browse/FLINK-17479 Best, Congxian 程龙 <13162790...@163.com> 于2020年7月1日周三 下午9:09写道: > > > > > > > > > > > > > 都是分配不到资源(slot)的错误,应该还是checkpoint 为空导致的,不知道为啥为空 > > > > > > 在 2020-07-01 20:51:34,"JasonLee"

回复:在没有开启Checkpoint的情况下,当作业重启时,历史数据是否会残留在内存中?

2020-07-03 Thread SmileSmile
作业运行在k8s上,这个现象可以重现,目前我这边有多份数据join的作业基本都会有这个问题。步骤如下: 1. 使用eventtime,水位线设置为数据时间-3分钟,状态使用rocksdb,不开启checkpoint,设置内存limit 2. 作业运行一段时间。 3. kill 其中一个pod,作业fail 4. k8s自动拉起该pod,观察其他pod的内存使用,会上涨。运行一段时间然后很容易超过limit被os kill 5. 陷入被重复kill的死循环。 解决方法:销毁集群,重构即可。 观察过heap的内存,没有问题。 被os

Re: kafkaf To mysql 写入问题

2020-07-03 Thread Jingsong Li
Hi, 估计需要使用Flink 1.11。 1.JSON Format有参数控制 [1] 2.是之前的bug,Flink 1.11应该是不会存在了,不确定1.10.1有没有修。 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/json.html#json-ignore-parse-errors Best, Jingsong On Fri, Jul 3, 2020 at 1:38 PM 郑斌斌 wrote: > dear: >请教两个问题 >

Re: 回复:rocksdb的block cache usage应该如何使用

2020-07-03 Thread Yun Tang
Hi 如果是没有开启Checkpoint,作业是如何做到failover的?failover的时候一定需要从Checkpoint加载数据的。还是说你其实开启了Checkpoint,但是Checkpoint的interval设置的很大,所以每次failover相当于作业重新运行。 如果都没有从checkpoint加载数据,哪里来的历史数据呢?作业一旦发生failover,state backend的数据都需要清空然后再启动的时候进行加载。 祝好 唐云 From: SmileSmile Sent: Friday, July

Re: table execution-options 能否通过 -yd 生效

2020-07-03 Thread Yang Wang
我理解在Yarn上运行,通过-yD传入和写在flink-conf.yaml里面都是可以生效的 Best, Yang liangji 于2020年7月2日周四 下午6:12写道: > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html#execution-options > // instantiate table environment > TableEnvironment tEnv = ... > > // access flink configuration

回复:rocksdb的block cache usage应该如何使用

2020-07-03 Thread SmileSmile
hi yun tang! 因为网络或者不可抗力导致pod重生,作业会重启,目前作业没有开启checkpoint,恢复等价继续消费最新数据计算,运行一段时间很容易内存超用被os kill,然后重启,再运行一段时间,间隔变短,死的越来越频繁。 从现象上看很像是内存没有释放,这种场景下,上一次作业残留的未到水位线还没有被触发计算的数据是否在作业重启过程中被清除了? | | a511955993 | | 邮箱:a511955...@163.com | 签名由 网易邮箱大师 定制 在2020年07月03日 14:59,Yun Tang 写道: Hi 观察block cache

flink 1.10 kafka collector topic 配置pattern

2020-07-03 Thread Peihui He
hello 请教大家,flink 1.10里面kafka connector 不能配置topic pattern,后续会支持吗? best wishes

Re: 在没有开启Checkpoint的情况下,当作业重启时,历史数据是否会残留在内存中?

2020-07-03 Thread Congxian Qiu
理论上作业重启后,会释放内存,这里的问题从描述看,重启后有内存没有释放。能否在重启后 dump 一下内存看看呢? 或者你这个问题能够完全重现吗?可否告知一下如何复现这个问题呢 Best, Congxian SmileSmile 于2020年7月3日周五 下午12:23写道: > 这种现象只会出现在on rocksdb中。 > > > > > | | > a511955993 > | > | > 邮箱:a511955...@163.com > | > > 签名由 网易邮箱大师 定制 > > 在2020年07月03日 11:21,SmileSmile 写道: > > Hi > >

如何在窗口关闭的时候清除状态

2020-07-03 Thread 18579099...@163.com
大家好,我有一个需求,我在ProcessWindowFunction算子中定义了一个valueState,我希望在窗口关闭的时候能够将状态清理。我应该在哪里清理呢? 1.刚开始我选择在ProcessWindowFunction算子的process方法中清理,但是这里会有一个问题,我事件时间窗口开1天,我写了一个trigger,每隔一个小时输出一次结果。 如果我在process方法中清理,每隔一个小时就会被清理,而valueState中存的是我的中间结果,应该在窗口关闭的时候被清理(即一天之后)。这应该怎么办呢? 18579099...@163.com

Re: rocksdb的block cache usage应该如何使用

2020-07-03 Thread Yun Tang
Hi 默认Flink启用了rocksDB 的managed memory,这里涉及到这个功能的实现原理,简单来说,一个slot里面的所有rocksDB实例底层“托管”内存的LRU block cache均是一个,这样你可以根据taskmanager和subtask_index 作为tag来区分,你会发现在同一个TM里面的某个subtask对应的不同column_family 的block cache的数值均是完全相同的。所以不需要将这个数值进行求和统计。 祝好 唐云 From: SmileSmile Sent:

回复:rocksdb的block cache usage应该如何使用

2020-07-03 Thread SmileSmile
thanks yun tang! 那如果想通过block cache usage判断是否超过managed memory,该如何配置呢? 最近遇到作业只要重启后很容易被os kill的情况,想对比下 | | a511955993 | | 邮箱:a511955...@163.com | 签名由 网易邮箱大师 定制 在2020年07月03日 14:10,Yun Tang 写道: Hi 默认Flink启用了rocksDB 的managed memory,这里涉及到这个功能的实现原理,简单来说,一个slot里面的所有rocksDB实例底层“托管”内存的LRU block

回复: Re: flink1.10 用flinksql 写hbase,报错:UpsertStreamTableSink requires that Table has a full primary keys if it is updated.

2020-07-03 Thread tiantingting5...@163.com
Hello, 你试下 MD5(real_income_no) as rowkey 放在query的里层,最外层的group by直接用 rowkey试下, Flink 1.11 之后支持在 table里声明 PK, 1.11后就不用推导了。 祝好, Leonard Xu > 在 2020年7月1日,13:51,tiantingting5...@163.com 写道: > > MD5(real_income_no) as rowkey,

Re: table execution-options 能否通过 -yd 生效

2020-07-03 Thread Jingsong Li
Hi, 如果你是写代码来使用TableEnvironment的, 你要显示的在代码中塞进TableConfig中: Configuration configuration = tEnv.getConfig().getConfiguration(); configuration.addAll(GlobalConfiguration.loadConfiguration()); CC: @Yang Wang GlobalConfiguration是个internal的类,有没有public API获取对应的Configuration? Best, Jingsong On Fri,

flink 1.9 中 StreamTableEnvironment 注册 registerDataStream处理嵌套别名

2020-07-03 Thread Jun Zou
Hi, 我在使用flink 1.9版本的 StreamTableEnvironment 注册 table 时,想指定一个嵌套字段的 cloumns alianame, 例如: String fieldExprsStr = "modbus.parsedResponse,timestamp"; tableEnv.registerDataStream(src.getName(), srcStream, fieldExprsStr); 在对 modbus.parsedResponse 进行校验的时候 抛出了如下错误:

Re: flink 1.10 kafka collector topic 配置pattern

2020-07-03 Thread Leonard Xu
Hello 我了解到社区有人在做了,1.12 应该会支持 祝好 Leonard Xu > 在 2020年7月3日,16:02,Peihui He 写道: > > hello > > 请教大家,flink 1.10里面kafka connector 不能配置topic pattern,后续会支持吗? > > best wishes

Re: flink 1.10 kafka collector topic 配置pattern

2020-07-03 Thread Peihui He
好的,感谢珞 Leonard Xu 于2020年7月3日周五 下午4:07写道: > Hello > > 我了解到社区有人在做了,1.12 应该会支持 > > 祝好 > Leonard Xu > > > 在 2020年7月3日,16:02,Peihui He 写道: > > > > hello > > > > 请教大家,flink 1.10里面kafka connector 不能配置topic pattern,后续会支持吗? > > > > best wishes > >

Re: flink asynctablefunction调用异常

2020-07-03 Thread Jark Wu
可以分享下你的 AsyncTableFunction 的实现吗? Best, Jark > 2020年7月2日 15:56,sunfulin 写道: > > hi, > 我在使用flink 1.10.1 blink > planner,通过扩展tablesourcesinkfactory和asynctablefunction扩展了一个维表,维表会初始化并调用rpc服务。 > 遇到一个比较诡异的问题,作业在执行如下join的sql时,没有任何输出,等一段时间后,抛出了异常:Caused by : > java.lang.Exception: Could not complete

Re:flink asynctablefunction调用异常

2020-07-03 Thread forideal
Hi sunfulin: 我这么实现是可以的。 public void eval(CompletableFuture> result, String key) { executorService.submit(() -> { try { Row row = fetchdata(key); if (row != null) { result.complete(Collections.singletonList(row)); } else { result.complete(Collections.singletonList(new

Flink sql 主动使数据延时一段时间有什么方案

2020-07-03 Thread admin
Hi,all 我们有这样一个场景,双流join,一个快流,一个慢流,想让快流等一段时间,目的是能提高join的命中率。 FLink sql有什么方案实现吗? 感谢您的回复

Re: Flink sql 主动使数据延时一段时间有什么方案

2020-07-03 Thread admin
补充一下:明确的说是维表的join,A表关联B表(维表),想让A表延迟一会再关联B表 > 2020年7月3日 下午5:53,admin <17626017...@163.com> 写道: > > Hi,all > 我们有这样一个场景,双流join,一个快流,一个慢流,想让快流等一段时间,目的是能提高join的命中率。 > FLink sql有什么方案实现吗? > > 感谢您的回复

回复:Flink sql 主动使数据延时一段时间有什么方案

2020-07-03 Thread kcz
设置一个窗口时间,如果有需要取最新的,可以再做一下处理。 -- 原始邮件 -- 发件人: admin <17626017...@163.com 发送时间: 2020年7月3日 18:01 收件人: user-zh

?????? ??????FLINKSQL1.10????????????UV

2020-07-03 Thread x
checkpoint?? tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),??key ---- ??:"Jark

Re: flink 1.10 kafka collector topic 配置pattern

2020-07-03 Thread Jark Wu
可以关注下:https://issues.apache.org/jira/browse/FLINK-18449 预计1.12会支持。 Best, Jark On Fri, 3 Jul 2020 at 16:27, Peihui He wrote: > 好的,感谢珞 > > Leonard Xu 于2020年7月3日周五 下午4:07写道: > > > Hello > > > > 我了解到社区有人在做了,1.12 应该会支持 > > > > 祝好 > > Leonard Xu > > > > > 在 2020年7月3日,16:02,Peihui He 写道: > > > > >

Re: table execution-options 能否通过 -yd 生效

2020-07-03 Thread Yang Wang
其实是没有Public的API去从文件load Configuration的,因为我理解这是个Client端的内部逻辑 在用户调用了flink run以后,client会把conf/flink-conf.yaml加载,并apply上dynamic options, 然后会把这个Configuration传给各个Environment去使用 如果TableEnvironment在构建的时候没有使用传过来的Configuration,那-yD就没有办法生效了 只能用户在代码里面再设置一次 Best, Yang Jingsong Li 于2020年7月3日周五 下午3:19写道:

Re: 回复:rocksdb的block cache usage应该如何使用

2020-07-03 Thread Yun Tang
hi 有采集过内存使用情况么,推荐使用jemalloc的预先加载方式[1][2]来sample JVM的内存使用,观察是否有malloc的内存存在超用的场景。需要配置相关参数 containerized.taskmanager.env.MALLOC_CONF 和 containerized.taskmanager.env.LD_PRELOAD [1] https://github.com/jemalloc/jemalloc/wiki/Use-Case%3A-Heap-Profiling [2]

Re: Flink sql 主动使数据延时一段时间有什么方案

2020-07-03 Thread Benchao Li
我们也遇到过类似场景。 如果你的数据里面有事件时间,可以写个udf来判断下,如果事件时间-当前时间 小于某个阈值,可以sleep一下。 如果没有事件时间,那就不太好直接搞了,我们是自己搞了一个延迟维表,就是保证每条数据进到维表join算子后等固定时间后再去join。 admin <17626017...@163.com> 于2020年7月3日周五 下午5:54写道: > Hi,all > 我们有这样一个场景,双流join,一个快流,一个慢流,想让快流等一段时间,目的是能提高join的命中率。 > FLink sql有什么方案实现吗? > > 感谢您的回复 -- Best,

Re: Flink sql 主动使数据延时一段时间有什么方案

2020-07-03 Thread Benchao Li
还有一种很有意思的思路。 如果你不考虑数据是否会有乱序,而且保证维表中一定能join到结果,那就可以正常join,如果join不到,就把这条数据再发送到source的topic里,实现了一种类似于for循环的能力。。 admin <17626017...@163.com> 于2020年7月3日周五 下午5:54写道: > Hi,all > 我们有这样一个场景,双流join,一个快流,一个慢流,想让快流等一段时间,目的是能提高join的命中率。 > FLink sql有什么方案实现吗? > > 感谢您的回复 -- Best, Benchao Li

Re: Flink sql 主动使数据延时一段时间有什么方案

2020-07-03 Thread Benchao Li
奥,对,还有一种思路。如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了。 admin <17626017...@163.com> 于2020年7月3日周五 下午5:54写道: > Hi,all > 我们有这样一个场景,双流join,一个快流,一个慢流,想让快流等一段时间,目的是能提高join的命中率。 > FLink sql有什么方案实现吗? > > 感谢您的回复 -- Best, Benchao Li

Re:Re: Flink sql 主动使数据延时一段时间有什么方案

2020-07-03 Thread forideal
Hi 刚刚本超说了四种方法, 方法1.使用udf,查不到 sleep 等一下在查 方法2.在 join operator处数据等一会再去查 方法3.如果没有 join 上,就把数据发到source,循环join。 方法4.如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了 上述方法应该都能实现相同的效果。 我们也实现了一种方法。这种方法是扩展了下 Flink 的 Source。比如在 kafka connector 中加了一个 time.wait 的属性,当用户设置了这个属性,就让source

Re:Re:flink asynctablefunction调用异常

2020-07-03 Thread sunfulin
hi 抱歉忘记回复了。经过进一步调试发现,是因为定义的schema的column类型,与实际获取到的字段类型不一致导致。主要是在调试的过程中,ComplettedFuture.complete会吃掉这种类型不一致的异常,也不下发数据。看源码发现只会在timeout的时候才调用future.completeException。记录下。 在 2020-07-03 17:01:19,"forideal" 写道: >Hi sunfulin: > > 我这么实现是可以的。 >public void eval(CompletableFuture>

flink1.9自定义实现source的问题

2020-07-03 Thread guanyq
附件图片,想把listener出来的数据,传给ctx。 如何实现这个数据的传递。 public class RMQRichParallelSource extends RichParallelSourceFunction implements MessageOrderListener { @Override public void open(Configuration parameters) throws Exception { super.open(parameters); Properties properties = new Properties(); //

回复:Flink sql 主动使数据延时一段时间有什么方案

2020-07-03 Thread Sun.Zhu
窗口得用group by,字段会丢失 在2020年07月03日 19:11,kcz 写道: 设置一个窗口时间,如果有需要取最新的,可以再做一下处理。 -- 原始邮件 -- 发件人: admin <17626017...@163.com 发送时间: 2020年7月3日 18:01 收件人: user-zh

Re: 求助:FLINKSQL1.10实时统计累计UV

2020-07-03 Thread Benchao Li
你用的是哪个版本?之前是存在一个类似问题的[1],是在window里面做count distinct会有这个问题, 这个已经在1.11中修复了。 [1] https://issues.apache.org/jira/browse/FLINK-17942 x <35907...@qq.com> 于2020年7月3日周五 下午4:34写道: > 您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期, > >

Re: table execution-options 能否通过 -yd 生效

2020-07-03 Thread Benchao Li
这个应该可以生效的,我们就是这样用的。 如果没理解错,在`PlannerBase#mergeParameters`会把ExecutionEnvironment中的参数和TableConfig的参数合并的。 Yang Wang 于2020年7月3日周五 下午5:10写道: > 其实是没有Public的API去从文件load Configuration的,因为我理解这是个Client端的内部逻辑 > > 在用户调用了flink run以后,client会把conf/flink-conf.yaml加载,并apply上dynamic options, >