Flink SQL + savepoint

2019-10-30 Thread Fanbin Bu
Hi, it is highly recommended that we assign the uid to the operator for the sake of savepoint. How do we do this for Flink SQL? According to https://stackoverflow.com/questions/55464955/how-to-add-uid-to-operator-in-flink-table-api, it is not possible. Does that mean, I can't use savepoint to

Preserving (best effort) messages order between operators

2019-10-30 Thread Averell
Hi, I have a source function with parallelism = 1, sending out records ordered by event-time. These records are then re-balanced to the next operator which has parallelism > 1. I observed that within each subtask of the 2nd operator, the order of the messages is not maintained. Is this behaviour

Re: 如何过滤异常的timestamp?

2019-10-30 Thread 邢瑞斌
Hi 唐云, 谢谢指点,我去试一试。我其实不太理解env中设置的TimeCharacteristic,之前理解的是,这个设置会作用于全局。如果将这个设为IngestionTime,后续的Operator可以再使用EventTime吗? Yun Tang 于2019年10月31日周四 上午2:26写道: > Hi 瑞斌 > > 如果你要使用Flink的IngestionTime的话,其实就不存在与Flink提供的EventTime混用的情况了,而source端的IngestionTime,拿的就是source端的系统时间,可以在source端后面记一个filter >

Re: Flink S3 error

2019-10-30 Thread vino yang
Hi Harrison, So did you check whether the file exists or not? And what's your question? Best, Vino Harrison Xu 于2019年10月31日周四 上午5:24写道: > I'm seeing this exception with the S3 uploader - it claims a previously > part file was not found. Full jobmanager logs attached. (Flink 1.8) > >

???????????? state ????????????????

2019-10-30 Thread Jun Zhang
?? https://mp.weixin.qq.com/s/VUEvvoHoupZMpxWQsEeHEA ??2019??10??31?? 10:16??wangl...@geekplus.com.cn

Flink State 过期清除 TTL 问题

2019-10-30 Thread wangl...@geekplus.com.cn
flink-1.7.2, 用下面的清除策略:StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.days(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build(); ValueStateDescriptor descriptor = new

怎样把 state 定时写到外部存储

2019-10-30 Thread wangl...@geekplus.com.cn
消息驱动,QPS 很高, 每一条消息来都会更改 state 值,如果每一条消息来都写外部存储下游撑不住。 有没有什么方式可以定期读 state 写到外部存储? 我现在用的是 Flink1.7.2 版本。 wangl...@geekplus.com.cn

Re: low performance in running queries

2019-10-30 Thread Zhenghua Gao
I think more runtime information would help figure out where the problem is. 1) how many parallelisms actually working 2) the metrics for each operator 3) the jvm profiling information, etc *Best Regards,* *Zhenghua Gao* On Wed, Oct 30, 2019 at 8:25 PM Habib Mostafaei wrote: > Thanks Gao for

Re: [DISCUSS] Semantic and implementation of per-job mode

2019-10-30 Thread tison
Thanks for your attentions! @shixiaoga...@gmail.com Yes correct. We try to avoid jobs affect one another. Also a local ClusterClient in case saves the overhead about retry before leader elected and persist JobGraph before submission in RestClusterClient as well as the net cost. @Paul Lam 1.

Re: Streaming File Sink - Parquet File Writer

2019-10-30 Thread Kostas Kloudas
Hi Vinay, You are correct when saying that the bulk formats only support onCheckpointRollingPolicy. The reason for this has to do with the fact that currently Flink relies on the Hadoop writer for Parquet. Bulk formats keep important details about how they write the actual data (such as

Flink S3 error

2019-10-30 Thread Harrison Xu
I'm seeing this exception with the S3 uploader - it claims a previously part file was not found. Full jobmanager logs attached. (Flink 1.8) java.io.FileNotFoundException: No such file or directory:

Re: How to stream intermediate data that is stored in external storage?

2019-10-30 Thread kant kodali
"I think you would have to implement your own custom operator that would output changes to it’s internal state as a side output" Yes, I am looking for this but I am not sure how to do this? Should I use the processFunction(like the event-driven applications) ? On Wed, Oct 30, 2019 at 8:53 AM

Re: No FileSystem for scheme "file" for S3A in and state processor api in 1.9

2019-10-30 Thread Piotr Nowojski
But from the stack trace that you have posted it looks like you are using Hadoop’s S3 implementation for the checkpointing? If so, can you try using Presto and check whether you still encounter the same issue? Also, could you explain how to reproduce the issue? What configuration are you

Re: 如何过滤异常的timestamp?

2019-10-30 Thread Yun Tang
Hi 瑞斌 如果你要使用Flink的IngestionTime的话,其实就不存在与Flink提供的EventTime混用的情况了,而source端的IngestionTime,拿的就是source端的系统时间,可以在source端后面记一个filter operator,将ingestion的time与message中的event time进行比较,超过一定阈值的可以丢弃掉不传到下游去。 祝好 唐云 From: 邢瑞斌 Sent: Wednesday, October 30, 2019 17:57 To:

Re: The RMClient's and YarnResourceManagers internal state about the number of pending container requests has diverged

2019-10-30 Thread Till Rohrmann
Hi Regina, sorry for not getting back to you earlier. I've gone through the logs and I couldn't find something suspicious. What I can see though is the following: When you start the cluster, you submit a couple of jobs. This starts at 9:20. In total 120 slots are being required to run these jobs.

Re: No FileSystem for scheme "file" for S3A in and state processor api in 1.9

2019-10-30 Thread spoganshev
Actually, I forgot to mention that it happens when there's also a presto library in plugins folder (we are using presto for checkpoints and hadoop for file sinks in the job itself) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: How to stream intermediate data that is stored in external storage?

2019-10-30 Thread Piotr Nowojski
Hi Kant, Checkpointing interval is configurable, but I wouldn’t count on it working well with even 10s intervals. I think what you are this is not supported by Flink generically. Maybe Queryable state I mentioned before? But I have never used it. I think you would have to implement your own

Re: Flink 1.5+ performance in a Java standalone environment

2019-10-30 Thread Jakub Danilewicz
Hi, I can confirm that the performance drop is directly related to FLIP-6 changes. Applying this modification to the code posted above restores the previous graph processing speed under Flink 1.5.6: ---

Re: How to stream intermediate data that is stored in external storage?

2019-10-30 Thread kant kodali
Hi Piotr, I am talking about the internal state. How often this state gets checkpointed? if it is every few seconds then it may not meet our real-time requirement(sub second). The question really is can I read this internal state in a streaming fashion in an update mode? The state processor API

RE: [DISCUSS] Semantic and implementation of per-job mode

2019-10-30 Thread Newport, Billy
We execute multiple job graphs routinely because we cannot submit a single graph without it blowing up. I believe Regina spoke to this in Berlin during her talk. We instead if we are processing a database ingestion with 200 tables in it, we do a job graph per table rather than a single job

Re: low performance in running queries

2019-10-30 Thread Piotr Nowojski
Hi, I would also suggest to just attach a code profiler to the process during those 2 hours and gather some results. It might answer some questions what is taking so long time. Piotrek > On 30 Oct 2019, at 15:11, Chris Miller wrote: > > I haven't run any benchmarks with Flink or even used

Re: How to stream intermediate data that is stored in external storage?

2019-10-30 Thread Piotr Nowojski
Hi, I’m not sure what are you trying to achieve. What do you mean by “state of full outer join”? The result of it? Or it’s internal state? Also keep in mind, that internal state of the operators in Flink is already snapshoted/written down to an external storage during checkpointing mechanism.

Re: Flink 1.5+ performance in a Java standalone environment

2019-10-30 Thread Piotr Nowojski
Hi, In Flink 1.5 there were three big changes, that could affect performance. 1. FLIP-6 changes (As previously Yang and Fabian mentioned) 2. Credit base flow control (especially if you are using SSL) 3. Low latency network changes I would suspect them in that order. First and second you can

Re: low performance in running queries

2019-10-30 Thread Chris Miller
I haven't run any benchmarks with Flink or even used it enough to directly help with your question, however I suspect that the following article might be relevant: http://dsrg.pdos.csail.mit.edu/2016/06/26/scalability-cost/ Given the computation you're performing is trivial, it's possible

Re: PreAggregate operator with timeout trigger

2019-10-30 Thread Piotr Nowojski
Hi, If you want to register a processing/event time trigger in your custom operator, you can take a look how other operators are doing it, by calling AbstractStreamOperator#getInternalTimerService [1]. You can look around the Flink’s code base for usages of this method, there are at least

Re: No FileSystem for scheme "file" for S3A in and state processor api in 1.9

2019-10-30 Thread Piotr Nowojski
Hi, Thanks for reporting the issue, I’ve created the jira ticket for that [1]. We will investigate it and try to address it somehow. Could you try out if the same issue happen when you use flink-s3-fs-presto [2]? Piotrek [1] https://issues.apache.org/jira/browse/FLINK-14574 [2]

Re: [DISCUSS] Semantic and implementation of per-job mode

2019-10-30 Thread Paul Lam
Hi, Thanks for starting the discussion. WRT the per-job semantic, it looks natural to me that per-job means per-job-graph, because in my understanding JobGraph is the representation of a job. Could you share some use case in which a user program should contain multiple job graphs? WRT the

Re: low performance in running queries

2019-10-30 Thread Zhenghua Gao
The reason might be the parallelism of your task is only 1, that's too low. See [1] to specify proper parallelism for your job, and the execution time should be reduced significantly. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html *Best Regards,* *Zhenghua Gao*

Re: low performance in running queries

2019-10-30 Thread Habib Mostafaei
Thanks Gao for the reply. I used the parallelism parameter with different values like 6 and 8 but still the execution time is not comparable with a single threaded python script. What would be the reasonable value for the parallelism? Best, Habib On 10/30/2019 1:17 PM, Zhenghua Gao wrote:

Re: Sending custom statsd tags

2019-10-30 Thread Chesnay Schepler
Not possible, you'll have to extend the StatsDReporter yourself to add arbitrary tags. On 30/10/2019 12:52, Prakhar Mathur wrote: Hi, We are running Flink 1.6.2. We are using flink-metrics-statsd jar in order to send metrics to telegraf. In order to send custom metrics, we are using

Sending custom statsd tags

2019-10-30 Thread Prakhar Mathur
Hi, We are running Flink 1.6.2. We are using flink-metrics-statsd jar in order to send metrics to telegraf. In order to send custom metrics, we are using MetricGroups. Currently, we are trying to send a few custom tags but unable to find any examples or documentation regarding the same. Regards

clear method of Trigger not be called after purge

2019-10-30 Thread Utopia
Hi guys, I set allowLateness on SessionWindow. Then I return FIRE_AND_PURGE in onEventTime method of Trigger, but clear method not be called. While clear will be called, if I return FIRE_AND_PURGE in onElement method. Best regards. Utopia

如何过滤异常的timestamp?

2019-10-30 Thread 邢瑞斌
Hi: 从客户端收集日志信息,总会遇到一些异常的本地Timestamp,有些会超出正确日期很多天。这样的Timestamp会影响Watermark,请问大家是如何处理这样的日志呢? 我现在的想法是: 将日志的时间与Flink的时间相比较,如果超出一个阈值就过滤。但是这样似乎会造成处理的结果是不确定的。我想的改进方法是 ,使用IngestionTime和日志的Timestamp比较。但是我不太确定,是否能混合使用IngestionTime和eventTime呢? 求教,谢谢大家!

Re: flink run jar throw java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

2019-10-30 Thread Alex Wang
Hello Yang, Frank Thank you for your reply. Frank I have created a fat jar called flink-sql-submit.jar , the file size is 8.2M . You can create a fat jar (also called Uber jar) that includes all > dependencies in your application jar. > > I would avoid to put things in the Flink lib directory as

Re: flink run jar throw java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

2019-10-30 Thread Alex Wang
Hello Yang, Frank Thank you for your reply. Frank I have created a fat jar called flink-sql-submit.jar , the file size is 8.2M . You can create a fat jar (also called Uber jar) that includes all > dependencies in your application jar. > > I would avoid to put things in the Flink lib directory as

Re: [DISCUSS] Semantic and implementation of per-job mode

2019-10-30 Thread SHI Xiaogang
Hi Thanks for bringing this. The design looks very nice to me in that 1. In the new per-job mode, we don't need to compile user programs in the client and can directly run user programs with user jars. That way, it's easier for resource isolation in multi-tenant platforms and is much safer. 2.

Re: flink run jar throw java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

2019-10-30 Thread vino yang
Hi Franke, >From the information provided by Alex: >> mvn build jar include com.mysql.jdbc.Driver. it seems he has packaged a fat jar? Best, Vino Jörn Franke 于2019年10月30日周三 下午2:47写道: > > > You can create a fat jar (also called Uber jar) that includes all > dependencies in your application

Re: flink run jar throw java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

2019-10-30 Thread vino yang
Hi Franke, >From the information provided by Alex: >> mvn build jar include com.mysql.jdbc.Driver. it seems he has packaged a fat jar? Best, Vino Jörn Franke 于2019年10月30日周三 下午2:47写道: > > > You can create a fat jar (also called Uber jar) that includes all > dependencies in your application

[DISCUSS] Semantic and implementation of per-job mode

2019-10-30 Thread tison
(CC user list because I think users may have ideas on how per-job mode should look like) Hi all, In the discussion about Flink on k8s[1] we encounter a problem that opinions diverge in how so-called per-job mode works. This thread is aimed at stating a dedicated discussion about per-job semantic

Re: flink run jar throw java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

2019-10-30 Thread Jörn Franke
You can create a fat jar (also called Uber jar) that includes all dependencies in your application jar. I would avoid to put things in the Flink lib directory as it can make maintenance difficult. Eg deployment is challenging, upgrade of flink, providing it on new nodes etc. > Am

Re: flink1.9.1 on yarn sql 部署问题

2019-10-30 Thread Dian Fu
问题1: 只需要在提交作业的机器上有flink即可 问题2: 两种方式都可以 问题3: 是的。lib目录下的jar和用户的jar会一起提交到YARN > 在 2019年10月30日,上午10:30,hb <343122...@163.com> 写道: > > hello: > > > 环境: flink1.9.1, on yarn hadoop2.6 > flink只安装在了一台提交的机器上, > > > lib目录下有文件: > flink-dist_2.11-1.9.1.jar > flink-json-1.9.0-sql-jar.jar >

Re: flink run jar throw java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

2019-10-30 Thread Jörn Franke
You can create a fat jar (also called Uber jar) that includes all dependencies in your application jar. I would avoid to put things in the Flink lib directory as it can make maintenance difficult. Eg deployment is challenging, upgrade of flink, providing it on new nodes etc. > Am