Hi,
it is highly recommended that we assign the uid to the operator for the
sake of savepoint. How do we do this for Flink SQL? According to
https://stackoverflow.com/questions/55464955/how-to-add-uid-to-operator-in-flink-table-api,
it is not possible.
Does that mean, I can't use savepoint to
Hi,
I have a source function with parallelism = 1, sending out records ordered
by event-time. These records are then re-balanced to the next operator which
has parallelism > 1. I observed that within each subtask of the 2nd
operator, the order of the messages is not maintained. Is this behaviour
Hi 唐云,
谢谢指点,我去试一试。我其实不太理解env中设置的TimeCharacteristic,之前理解的是,这个设置会作用于全局。如果将这个设为IngestionTime,后续的Operator可以再使用EventTime吗?
Yun Tang 于2019年10月31日周四 上午2:26写道:
> Hi 瑞斌
>
> 如果你要使用Flink的IngestionTime的话,其实就不存在与Flink提供的EventTime混用的情况了,而source端的IngestionTime,拿的就是source端的系统时间,可以在source端后面记一个filter
>
Hi Harrison,
So did you check whether the file exists or not? And what's your question?
Best,
Vino
Harrison Xu 于2019年10月31日周四 上午5:24写道:
> I'm seeing this exception with the S3 uploader - it claims a previously
> part file was not found. Full jobmanager logs attached. (Flink 1.8)
>
>
??
https://mp.weixin.qq.com/s/VUEvvoHoupZMpxWQsEeHEA
??2019??10??31??
10:16??wangl...@geekplus.com.cn
flink-1.7.2, 用下面的清除策略:StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();
ValueStateDescriptor descriptor = new
消息驱动,QPS 很高, 每一条消息来都会更改 state 值,如果每一条消息来都写外部存储下游撑不住。
有没有什么方式可以定期读 state 写到外部存储?
我现在用的是 Flink1.7.2 版本。
wangl...@geekplus.com.cn
I think more runtime information would help figure out where the problem is.
1) how many parallelisms actually working
2) the metrics for each operator
3) the jvm profiling information, etc
*Best Regards,*
*Zhenghua Gao*
On Wed, Oct 30, 2019 at 8:25 PM Habib Mostafaei
wrote:
> Thanks Gao for
Thanks for your attentions!
@shixiaoga...@gmail.com
Yes correct. We try to avoid jobs affect one another. Also a local
ClusterClient
in case saves the overhead about retry before leader elected and persist
JobGraph before submission in RestClusterClient as well as the net cost.
@Paul Lam
1.
Hi Vinay,
You are correct when saying that the bulk formats only support
onCheckpointRollingPolicy.
The reason for this has to do with the fact that currently Flink
relies on the Hadoop writer for Parquet.
Bulk formats keep important details about how they write the actual
data (such as
I'm seeing this exception with the S3 uploader - it claims a previously
part file was not found. Full jobmanager logs attached. (Flink 1.8)
java.io.FileNotFoundException: No such file or directory:
"I think you would have to implement your own custom operator that would
output changes to it’s internal state as a side output"
Yes, I am looking for this but I am not sure how to do this? Should I use
the processFunction(like the event-driven applications) ?
On Wed, Oct 30, 2019 at 8:53 AM
But from the stack trace that you have posted it looks like you are using
Hadoop’s S3 implementation for the checkpointing? If so, can you try using
Presto and check whether you still encounter the same issue?
Also, could you explain how to reproduce the issue? What configuration are you
Hi 瑞斌
如果你要使用Flink的IngestionTime的话,其实就不存在与Flink提供的EventTime混用的情况了,而source端的IngestionTime,拿的就是source端的系统时间,可以在source端后面记一个filter
operator,将ingestion的time与message中的event time进行比较,超过一定阈值的可以丢弃掉不传到下游去。
祝好
唐云
From: 邢瑞斌
Sent: Wednesday, October 30, 2019 17:57
To:
Hi Regina, sorry for not getting back to you earlier. I've gone through the
logs and I couldn't find something suspicious. What I can see though is the
following:
When you start the cluster, you submit a couple of jobs. This starts at
9:20. In total 120 slots are being required to run these jobs.
Actually, I forgot to mention that it happens when there's also a presto
library in plugins folder (we are using presto for checkpoints and hadoop
for file sinks in the job itself)
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Hi Kant,
Checkpointing interval is configurable, but I wouldn’t count on it working well
with even 10s intervals.
I think what you are this is not supported by Flink generically. Maybe
Queryable state I mentioned before? But I have never used it.
I think you would have to implement your own
Hi,
I can confirm that the performance drop is directly related to FLIP-6 changes.
Applying this modification to the code posted above restores the previous graph
processing speed under Flink 1.5.6:
---
Hi Piotr,
I am talking about the internal state. How often this state gets
checkpointed? if it is every few seconds then it may not meet our real-time
requirement(sub second).
The question really is can I read this internal state in a streaming
fashion in an update mode? The state processor API
We execute multiple job graphs routinely because we cannot submit a single
graph without it blowing up. I believe Regina spoke to this in Berlin during
her talk. We instead if we are processing a database ingestion with 200 tables
in it, we do a job graph per table rather than a single job
Hi,
I would also suggest to just attach a code profiler to the process during those
2 hours and gather some results. It might answer some questions what is taking
so long time.
Piotrek
> On 30 Oct 2019, at 15:11, Chris Miller wrote:
>
> I haven't run any benchmarks with Flink or even used
Hi,
I’m not sure what are you trying to achieve. What do you mean by “state of full
outer join”? The result of it? Or it’s internal state? Also keep in mind, that
internal state of the operators in Flink is already snapshoted/written down to
an external storage during checkpointing mechanism.
Hi,
In Flink 1.5 there were three big changes, that could affect performance.
1. FLIP-6 changes (As previously Yang and Fabian mentioned)
2. Credit base flow control (especially if you are using SSL)
3. Low latency network changes
I would suspect them in that order. First and second you can
I haven't run any benchmarks with Flink or even used it enough to
directly help with your question, however I suspect that the following
article might be relevant:
http://dsrg.pdos.csail.mit.edu/2016/06/26/scalability-cost/
Given the computation you're performing is trivial, it's possible
Hi,
If you want to register a processing/event time trigger in your custom
operator, you can take a look how other operators are doing it, by calling
AbstractStreamOperator#getInternalTimerService [1]. You can look around the
Flink’s code base for usages of this method, there are at least
Hi,
Thanks for reporting the issue, I’ve created the jira ticket for that [1]. We
will investigate it and try to address it somehow.
Could you try out if the same issue happen when you use flink-s3-fs-presto [2]?
Piotrek
[1] https://issues.apache.org/jira/browse/FLINK-14574
[2]
Hi,
Thanks for starting the discussion.
WRT the per-job semantic, it looks natural to me that per-job means
per-job-graph,
because in my understanding JobGraph is the representation of a job. Could you
share some use case in which a user program should contain multiple job graphs?
WRT the
The reason might be the parallelism of your task is only 1, that's too low.
See [1] to specify proper parallelism for your job, and the execution time
should be reduced significantly.
[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html
*Best Regards,*
*Zhenghua Gao*
Thanks Gao for the reply. I used the parallelism parameter with
different values like 6 and 8 but still the execution time is not
comparable with a single threaded python script. What would be the
reasonable value for the parallelism?
Best,
Habib
On 10/30/2019 1:17 PM, Zhenghua Gao wrote:
Not possible, you'll have to extend the StatsDReporter yourself to add
arbitrary tags.
On 30/10/2019 12:52, Prakhar Mathur wrote:
Hi,
We are running Flink 1.6.2. We are using flink-metrics-statsd jar in
order to send metrics to telegraf. In order to send custom metrics, we
are using
Hi,
We are running Flink 1.6.2. We are using flink-metrics-statsd jar in order
to send metrics to telegraf. In order to send custom metrics, we are
using MetricGroups.
Currently, we are trying to send a few custom tags but unable to find
any examples or documentation regarding the same.
Regards
Hi guys,
I set allowLateness on SessionWindow. Then I return FIRE_AND_PURGE in
onEventTime method of Trigger, but clear method not be called. While clear will
be called, if I return FIRE_AND_PURGE in onElement method.
Best regards.
Utopia
Hi:
从客户端收集日志信息,总会遇到一些异常的本地Timestamp,有些会超出正确日期很多天。这样的Timestamp会影响Watermark,请问大家是如何处理这样的日志呢?
我现在的想法是:
将日志的时间与Flink的时间相比较,如果超出一个阈值就过滤。但是这样似乎会造成处理的结果是不确定的。我想的改进方法是
,使用IngestionTime和日志的Timestamp比较。但是我不太确定,是否能混合使用IngestionTime和eventTime呢?
求教,谢谢大家!
Hello Yang, Frank
Thank you for your reply.
Frank I have created a fat jar called flink-sql-submit.jar , the file size
is 8.2M .
You can create a fat jar (also called Uber jar) that includes all
> dependencies in your application jar.
>
> I would avoid to put things in the Flink lib directory as
Hello Yang, Frank
Thank you for your reply.
Frank I have created a fat jar called flink-sql-submit.jar , the file size
is 8.2M .
You can create a fat jar (also called Uber jar) that includes all
> dependencies in your application jar.
>
> I would avoid to put things in the Flink lib directory as
Hi
Thanks for bringing this.
The design looks very nice to me in that
1. In the new per-job mode, we don't need to compile user programs in the
client and can directly run user programs with user jars. That way, it's
easier for resource isolation in multi-tenant platforms and is much safer.
2.
Hi Franke,
>From the information provided by Alex:
>> mvn build jar include com.mysql.jdbc.Driver.
it seems he has packaged a fat jar?
Best,
Vino
Jörn Franke 于2019年10月30日周三 下午2:47写道:
>
>
> You can create a fat jar (also called Uber jar) that includes all
> dependencies in your application
Hi Franke,
>From the information provided by Alex:
>> mvn build jar include com.mysql.jdbc.Driver.
it seems he has packaged a fat jar?
Best,
Vino
Jörn Franke 于2019年10月30日周三 下午2:47写道:
>
>
> You can create a fat jar (also called Uber jar) that includes all
> dependencies in your application
(CC user list because I think users may have ideas on how per-job mode
should look like)
Hi all,
In the discussion about Flink on k8s[1] we encounter a problem that opinions
diverge in how so-called per-job mode works. This thread is aimed at stating
a dedicated discussion about per-job semantic
You can create a fat jar (also called Uber jar) that includes all dependencies
in your application jar.
I would avoid to put things in the Flink lib directory as it can make
maintenance difficult. Eg deployment is challenging, upgrade of flink,
providing it on new nodes etc.
> Am
问题1: 只需要在提交作业的机器上有flink即可
问题2: 两种方式都可以
问题3: 是的。lib目录下的jar和用户的jar会一起提交到YARN
> 在 2019年10月30日,上午10:30,hb <343122...@163.com> 写道:
>
> hello:
>
>
> 环境: flink1.9.1, on yarn hadoop2.6
> flink只安装在了一台提交的机器上,
>
>
> lib目录下有文件:
> flink-dist_2.11-1.9.1.jar
> flink-json-1.9.0-sql-jar.jar
>
You can create a fat jar (also called Uber jar) that includes all dependencies
in your application jar.
I would avoid to put things in the Flink lib directory as it can make
maintenance difficult. Eg deployment is challenging, upgrade of flink,
providing it on new nodes etc.
> Am
42 matches
Mail list logo