Hi Debasish,
In which case will the exception occur? Does it occur when you submit one job
at a time or when multiple jobs are submitted at the same time? I'm asking this
because I noticed that you used Future to execute the job unblocking. I guess
ThreadLocal doesn't work well in this case.
Thanks Till,
Perfect. I gonna use RestClusterClient with listJobs
It should work perfectly for my need
Cheers
David
On 2019/09/23 12:36:46, Till Rohrmann wrote:
> Hi David,
>
> you could use Flink's RestClusterClient and call #listJobs to obtain the
> list of jobs being executed on the
Hi tison -
Please find my response below in >>.
regards.
On Mon, Sep 23, 2019 at 6:20 PM Zili Chen wrote:
> Hi Debasish,
>
> The OptimizerPlanEnvironment.ProgramAbortException should be caught at
> OptimizerPlanEnvironment#getOptimizedPlan
> in its catch (Throwable t) branch.
>
>> true but
We are using a 3rd party library that allocates some resources in one of
our topologies.
Is there a listener or something that gets notified when the topology
starts / stops running in the Task Manager's JVM?
The 3rd party library uses a singleton, so I need to initialize the
singleton when the
Hi Subbu,
The issue you encountered is very similar to the issue which has been fixed in
FLINK-10455 [1]. Could you check if that fix could solve your problem? The root
cause for that issue is that the method close() has not closed all things.
After the method "close()" is called, the
Hi,
According to the documentation of Flink, it seems that fat JAR is recommended
when submitting a Flink job. However, the Flink dependencies (as well as other
dependencies like Hadoop) are too big in size, thus producing a fat JAR which
exceeds 100MB. Is there some way to separate the
Hi Debasish,
As I said before, the exception is caught in [1]. It catches the Throwable and
so it could also catch "OptimizerPlanEnvironment.ProgramAbortException".
Regarding to the cause of this exception, I have the same feeling with Tison
and I also think that the wrong
Hi there,
I've two streams source Kafka. Stream1 is a continuous data and stream2 is
a periodic update. Stream2 contains only one column.
*Use case*: Every entry from stream1 should verify if the stream2 has any
match.
The matched and unmatched records should be separated into new unique
Hi Guys,
The Flink version is 1.9.0. I use OrcTableSource to read ORC file in HDFS and
the job is executed successfully, no any exception or error. But some
fields(such as tagIndustry) are always null, actually these fields are not
null. I can read these fields by direct reading it. Below is
Hi Qi Kang,
You don't need and also should not package the dependencies of Flink to the job
jar. Only application specific dependencies are enough.
Regards,
Dian
> 在 2019年9月23日,下午5:17,Qi Kang 写道:
>
> Hi,
>
> According to the documentation of Flink, it seems that fat JAR is recommended
>
ah .. Ok .. I get the Throwable part. I am using
import org.apache.flink.streaming.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment
How can this lead to a wrong StreamExecutionEnvironment ? Any suggestion ?
regards.
On Mon, Sep 23, 2019 at 3:53 PM Dian Fu wrote:
> Hi
Hi,
I was able to simulate the issue again and understand the cause a little better.
The issue occurs when :
-One of the RichMapFunction transformations uses a third party library
in the open() method that spawns a thread.
-The thread doesn’t get properly closed in the close()
Hi Felipe,
Flink job graph is DAG based. It seems that you set an "edge property"
(partitioner) several times.
Flink does not support multiple partitioners on one edge. The later one
overrides the priors. That means the "keyBy" overrides the "rebalance" and
"partitionByPartial".
You could insert
Hi Tison -
This is the code that builds the computation graph. readStream reads from
Kafka and writeStream writes to Kafka.
override def buildExecutionGraph = {
val rides: DataStream[TaxiRide] =
readStream(inTaxiRide)
.filter { ride ⇒ ride.getIsStart().booleanValue }
Hi Dian -
We submit one job through the operator. We just use the following to
complete a promise when the job completes ..
Try {
createLogic.executeStreamingQueries(ctx.env)
}.fold(
th ⇒ completionPromise.tryFailure(th),
_ ⇒ completionPromise.trySuccess(Dun)
Currently the best I can see is to make *everything* a Rich... and hook
into the open and close methods... but feels very ugly.
On Mon 23 Sep 2019 at 15:45, Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:
> We are using a 3rd party library that allocates some resources in one of
>
I'm trying to get my standalone cluster to remove stale checkmarks.
The cluster is composed of a single job and task manager backed by rocksdb
with high availability.
The configuration on both the job and task manager are:
state.backend: rocksdb
state.checkpoints.dir:
I am trying to run a flink application through oozie on a kerberized Hadoop
cluster (Flink version 1.7.2 and the hadoop jar we run with is
hadoop-common-2.6.0-cdh5.14.0.jar). We are getting a GSS exception when a child
job is launched. We confirmed through shell actions that we have valid
When we setup alert like "fullRestarts > 1" for some rolling window, we
want to use counter. if it is a Gauge, "fullRestarts" will never go below 1
after a first full restart. So alert condition will always be true after
first job restart. If we can apply a derivative to the Gauge value, I guess
You could still handle late data. Just keep state around longer ( within a
predefined lateness interval). Say your time window is a tumbling window of 5
mins and your events for a key are allowed to arrive 30 mins late, keep events
around for 35 mins before evicting them from state.
It means
Hi Zili,
Thanks for pointing that out.
I didn't realize that it's a REST API based case. Debasish's case has been
discussed not only in this thread...
It's really hard to analyze the case without the full picture.
I think the reason of why `ProgramAbortException` is not caught is that he
did
AFAIK, RichFunction is the only way you could take for this purpose. It's
designed for life cycle management of functions.
Regards,
Dian
> 在 2019年9月24日,上午2:13,Stephen Connolly 写道:
>
> Currently the best I can see is to make *everything* a Rich... and hook into
> the open and close methods...
Hi Stephen,
I think disposing static components in the closing stage of a task is
required.
This is because your code(operators/UDFs) is part of the task, namely that
it can only be executed when the task is not disposed.
Thanks,
Zhu Zhu
Stephen Connolly 于2019年9月24日周二 上午2:13写道:
> Currently
Hi Biao,
The log below already infers that the job was submitted via REST API and I
don't think it matters.
at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$
JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
at
> We submit the code through Kubernetes Flink Operator which uses the REST
API to submit the job to the Job Manager
So you are submitting job through REST API, not Flink client? Could you
explain more about this?
Thanks,
Biao /'bɪ.aʊ/
On Tue, 24 Sep 2019 at 03:44, Debasish Ghosh
wrote:
> Hi
Steven,
In my mind, Flink counter only stores its accumulated count and reports
that value. Are you using an external counter directly?
Maybe Flink Meter/MeterView is what you need? It stores the count and
calculates the rate. And it will report its "count" as well as "rate" to
external metric
Hi Clay,
Sorry I don't get your point. I'm not sure what the "stale checkmarks"
exactly means. The HA storage and checkpoint directory left after shutting
down cluster?
Thanks,
Biao /'bɪ.aʊ/
On Tue, 24 Sep 2019 at 03:12, Clay Teeter wrote:
> I'm trying to get my standalone cluster to remove
Hi,
Thanks for your replies.
Yes, it could be useful to have a way to define jobid. Thus, I would have been
able to define the jbid based on the name for example. At the moment we do not
use the REST API but the cli to submit our jobs on Yarn.
Nevertheless, I can implement a little trick: at
Hello everyone,
Is there a way to specify the job name in the logging pattern (since the
logging configuration is global for the cluster)? We have two different
jobs running on our Flink cluster, and when there's a message it's not
obvious which of the two is logging. We can figure it out most of
Can it be the case that the threadLocal stuff in
https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1609
does
not behave deterministically when we submit job through a Kubernetes Flink
I`ve implemented a combiner [1] in Flink by extending
OneInputStreamOperator in Flink. I call my operator using "transform".
It works well and I guess it is useful if I import this operator in the
DataStream.java. I just need more to check if I need to touch other parts
of the source code.
But
thanks Biao,
I see. To achieve what I want to do I need to work with KeyedStream. I
downloaded the Flink source code to learn and alter the KeyedStream to my
needs. I am not sure but it is a lot of work because as far as I understood
the key-groups have to be predictable [1]. and altering this
Regarding to the code you pasted, personally I think nothing is wrong. The
problem is how it's executed. As you can see from the implementation of of
StreamExecutionEnvironment.getExecutionEnvironment, it may created different
StreamExecutionEnvironment implementations under different
This is the complete stack trace which we get from execution on Kubernetes
using the Flink Kubernetes operator .. The boxed error comes from the fact
that we complete a Promise with Success when it returns a
JobExecutionResult and with Failure when we get an exception. And here we r
getting an
Wow, that's really cool! There are indeed a lot works you have done. IMO
it's beyond the scope of user group somewhat.
Just one small concern, I'm not sure I have fully understood your way of
"tackle data skew by altering the way Flink partition keys using
KeyedStream".
>From my understanding,
Hi
There was a discussion about this issue[1], as the previous discussion said
at the moment this is not supported out of the box by Flink, I think you
can try keyed process function as Lasse said.
[1]
Hi Felipe,
If I understand correctly, you want to solve data skew caused by imbalanced
key?
There is a common strategy to solve this kind of problem, pre-aggregation.
Like combiner of MapReduce.
But sadly, AFAIK Flink does not support pre-aggregation currently. I'm
afraid you have to implement
Hi David,
you could use Flink's RestClusterClient and call #listJobs to obtain the
list of jobs being executed on the cluster (note that it will also report
finished jobs). By providing a properly configured Configuration (e.g.
loading flink-conf.yaml via GlobalConfiguration#loadConfiguration) it
Hi Debasish,
The OptimizerPlanEnvironment.ProgramAbortException should be caught at
OptimizerPlanEnvironment#getOptimizedPlan
in its catch (Throwable t) branch.
It should always throw a ProgramInvocationException instead of
OptimizerPlanEnvironment.ProgramAbortException if any
exception thrown
Hi Congxian,
Thanks but by doing that, we will lose some features like output of the late
data.
Original Message
Sender: Congxian Qiu
Recipient: Lasse Nedergaard
Cc: 廖嘉逸; user@flink.apache.org;
d...@flink.apache.org
Date: Monday, Sep 23, 2019 19:56
Subject: Re: Per Key Grained Watermark
Hi,
I'm using Java code to source from Kafka, streaming to table and registered
the table. I understand that I have started the StreamExecutionEnvironment
and execution.
Is there a way that I could access the registered table/temporal function
from SQL client?
Thanks
Srikanth
1. 当kafka consumer的并发大于kafka partition的情况,多余的并发会怎么样?
2. 当kafka consumer并发等于kafka partition个数,但是kafka
partition个别分区没有数据的情况,这个空的partition的consumer线程会怎么样?对barrier和watermark的生成有什么影响?
gaofeilong198...@163.com
能否起一个本地程序,设置断点,看看读取数据那块儿逻辑是不是有问题
Best,
Terry Wang
> 在 2019年9月23日,下午5:11,ShuQi 写道:
>
> Flink版本为1.9.0,基于OrcTableSource进行ORC文件的读取,碰到一个问题,程序没有任何异常,顺利执行完毕,但又部分字段读出来始终为null,但实际是有值得,通过直接读取文件的方式可以读取到全部字段。
>
>
> 请问大家是否有什么好的建议,谢谢!
>
你好,可以考虑在open方法里启动一个定时的线程去取mysql里去数据和进行缓存更新。
当有新数据流入到你的系统中时,可以判断定时线程数据加载是否完成,当数据加载完成后再进行数据处理。
希望能有帮助~
Best,
Terry Wang
> 在 2019年9月24日,上午10:45,haoxin...@163.com 写道:
>
>
HI
1. 当kafka consumer的并发大于kafka partition的情况,多余的并发会怎么样?->这句话是想问当flink
slot数大于kafka partition的时候多的slot(其实就是在slot上创建的kafka consumer)会空跑。
如图:kafka partiton为10个,但是flink slot为15,则有5个节点是空跑的状态。
2. 当kafka consumer并发等于kafka partition个数,但是kafka
1. 首先你描述的场景,不只存在于 Async IO operator,其他 operator 也会有类似问题
2. Flink 的 exactly once 是针对 Flink 内部而言,例如 state 等,[1]
3. 如果你想针对外部系统也保证 exactly once 语义,需要对应的 connector 支持 [2]
1.
https://ci.apache.org/projects/flink/flink-docs-release-1.9/internals/stream_checkpointing.html
2.
Hi
像你描述的,单盘对单任务还存在 IO 瓶颈,这里是单 container 吗?像前面大家说的,你需要确认这么大的 IO
访问是符合预期的,如果符合预期的话,你可以尝试增加 blockcache 和 memtable 的大小,将更多的数据放到内存。
另外,你使用的是什么 state 类型,valuestate 和 liststate 的话,能否换成 mapstate 来处理。同时,你可以看下
rocksdb 的 log,看看是否有什么可以优化的地方
Best,
Congxian
Biao Liu 于2019年9月23日周一 下午2:39写道:
> Hello,
>
>
47 matches
Mail list logo