Re: Recommended approach to debug this

2019-09-23 Thread Dian Fu
Hi Debasish, In which case will the exception occur? Does it occur when you submit one job at a time or when multiple jobs are submitted at the same time? I'm asking this because I noticed that you used Future to execute the job unblocking. I guess ThreadLocal doesn't work well in this case.

Re: How to prevent from launching 2 jobs at the same time

2019-09-23 Thread David Morin
Thanks Till, Perfect. I gonna use RestClusterClient with listJobs It should work perfectly for my need Cheers David On 2019/09/23 12:36:46, Till Rohrmann wrote: > Hi David, > > you could use Flink's RestClusterClient and call #listJobs to obtain the > list of jobs being executed on the

Re: Recommended approach to debug this

2019-09-23 Thread Debasish Ghosh
Hi tison - Please find my response below in >>. regards. On Mon, Sep 23, 2019 at 6:20 PM Zili Chen wrote: > Hi Debasish, > > The OptimizerPlanEnvironment.ProgramAbortException should be caught at > OptimizerPlanEnvironment#getOptimizedPlan > in its catch (Throwable t) branch. > >> true but

Is there a lifecycle listener that gets notified when a topology starts/stops on a task manager

2019-09-23 Thread Stephen Connolly
We are using a 3rd party library that allocates some resources in one of our topologies. Is there a listener or something that gets notified when the topology starts / stops running in the Task Manager's JVM? The 3rd party library uses a singleton, so I need to initialize the singleton when the

Re: NoClassDefFoundError in failing-restarting job that uses url classloader

2019-09-23 Thread Dian Fu
Hi Subbu, The issue you encountered is very similar to the issue which has been fixed in FLINK-10455 [1]. Could you check if that fix could solve your problem? The root cause for that issue is that the method close() has not closed all things. After the method "close()" is called, the

How to use thin JAR instead of fat JAR when submitting Flink job?

2019-09-23 Thread Qi Kang
Hi, According to the documentation of Flink, it seems that fat JAR is recommended when submitting a Flink job. However, the Flink dependencies (as well as other dependencies like Hadoop) are too big in size, thus producing a fat JAR which exceeds 100MB. Is there some way to separate the

Re: Recommended approach to debug this

2019-09-23 Thread Dian Fu
Hi Debasish, As I said before, the exception is caught in [1]. It catches the Throwable and so it could also catch "OptimizerPlanEnvironment.ProgramAbortException". Regarding to the cause of this exception, I have the same feeling with Tison and I also think that the wrong

Approach to match join streams to create unique streams.

2019-09-23 Thread srikanth flink
Hi there, I've two streams source Kafka. Stream1 is a continuous data and stream2 is a periodic update. Stream2 contains only one column. *Use case*: Every entry from stream1 should verify if the stream2 has any match. The matched and unmatched records should be separated into new unique

Question about reading ORC file in Flink

2019-09-23 Thread ShuQi
Hi Guys, The Flink version is 1.9.0. I use OrcTableSource to read ORC file in HDFS and the job is executed successfully, no any exception or error. But some fields(such as tagIndustry) are always null, actually these fields are not null. I can read these fields by direct reading it. Below is

Re: How to use thin JAR instead of fat JAR when submitting Flink job?

2019-09-23 Thread Dian Fu
Hi Qi Kang, You don't need and also should not package the dependencies of Flink to the job jar. Only application specific dependencies are enough. Regards, Dian > 在 2019年9月23日,下午5:17,Qi Kang 写道: > > Hi, > > According to the documentation of Flink, it seems that fat JAR is recommended >

Re: Recommended approach to debug this

2019-09-23 Thread Debasish Ghosh
ah .. Ok .. I get the Throwable part. I am using import org.apache.flink.streaming.api.scala._ val env = StreamExecutionEnvironment.getExecutionEnvironment How can this lead to a wrong StreamExecutionEnvironment ? Any suggestion ? regards. On Mon, Sep 23, 2019 at 3:53 PM Dian Fu wrote: > Hi

RE: NoClassDefFoundError in failing-restarting job that uses url classloader

2019-09-23 Thread Subramanyam Ramanathan
Hi, I was able to simulate the issue again and understand the cause a little better. The issue occurs when : -One of the RichMapFunction transformations uses a third party library in the open() method that spawns a thread. -The thread doesn’t get properly closed in the close()

Re: What is the right way to use the physical partitioning strategy in Data Streams?

2019-09-23 Thread Biao Liu
Hi Felipe, Flink job graph is DAG based. It seems that you set an "edge property" (partitioner) several times. Flink does not support multiple partitioners on one edge. The later one overrides the priors. That means the "keyBy" overrides the "rebalance" and "partitionByPartial". You could insert

Re: Recommended approach to debug this

2019-09-23 Thread Debasish Ghosh
Hi Tison - This is the code that builds the computation graph. readStream reads from Kafka and writeStream writes to Kafka. override def buildExecutionGraph = { val rides: DataStream[TaxiRide] = readStream(inTaxiRide) .filter { ride ⇒ ride.getIsStart().booleanValue }

Re: Recommended approach to debug this

2019-09-23 Thread Debasish Ghosh
Hi Dian - We submit one job through the operator. We just use the following to complete a promise when the job completes .. Try { createLogic.executeStreamingQueries(ctx.env) }.fold( th ⇒ completionPromise.tryFailure(th), _ ⇒ completionPromise.trySuccess(Dun)

Re: Is there a lifecycle listener that gets notified when a topology starts/stops on a task manager

2019-09-23 Thread Stephen Connolly
Currently the best I can see is to make *everything* a Rich... and hook into the open and close methods... but feels very ugly. On Mon 23 Sep 2019 at 15:45, Stephen Connolly < stephen.alan.conno...@gmail.com> wrote: > We are using a 3rd party library that allocates some resources in one of >

Flink job manager doesn't remove stale checkmarks

2019-09-23 Thread Clay Teeter
I'm trying to get my standalone cluster to remove stale checkmarks. The cluster is composed of a single job and task manager backed by rocksdb with high availability. The configuration on both the job and task manager are: state.backend: rocksdb state.checkpoints.dir:

Flink child job running on a kerberized cluster

2019-09-23 Thread Imami,Taariq
I am trying to run a flink application through oozie on a kerberized Hadoop cluster (Flink version 1.7.2 and the hadoop jar we run with is hadoop-common-2.6.0-cdh5.14.0.jar). We are getting a GSS exception when a child job is launched. We confirmed through shell actions that we have valid

Re: [SURVEY] How many people are using customized RestartStrategy(s)

2019-09-23 Thread Steven Wu
When we setup alert like "fullRestarts > 1" for some rolling window, we want to use counter. if it is a Gauge, "fullRestarts" will never go below 1 after a first full restart. So alert condition will always be true after first job restart. If we can apply a derivative to the Gauge value, I guess

Re: Per Key Grained Watermark Support

2019-09-23 Thread Sameer Wadkar
You could still handle late data. Just keep state around longer ( within a predefined lateness interval). Say your time window is a tumbling window of 5 mins and your events for a key are allowed to arrive 30 mins late, keep events around for 35 mins before evicting them from state. It means

Re: Recommended approach to debug this

2019-09-23 Thread Biao Liu
Hi Zili, Thanks for pointing that out. I didn't realize that it's a REST API based case. Debasish's case has been discussed not only in this thread... It's really hard to analyze the case without the full picture. I think the reason of why `ProgramAbortException` is not caught is that he did

Re: Is there a lifecycle listener that gets notified when a topology starts/stops on a task manager

2019-09-23 Thread Dian Fu
AFAIK, RichFunction is the only way you could take for this purpose. It's designed for life cycle management of functions. Regards, Dian > 在 2019年9月24日,上午2:13,Stephen Connolly 写道: > > Currently the best I can see is to make *everything* a Rich... and hook into > the open and close methods...

Re: Is there a lifecycle listener that gets notified when a topology starts/stops on a task manager

2019-09-23 Thread Zhu Zhu
Hi Stephen, I think disposing static components in the closing stage of a task is required. This is because your code(operators/UDFs) is part of the task, namely that it can only be executed when the task is not disposed. Thanks, Zhu Zhu Stephen Connolly 于2019年9月24日周二 上午2:13写道: > Currently

Re: Recommended approach to debug this

2019-09-23 Thread Zili Chen
Hi Biao, The log below already infers that the job was submitted via REST API and I don't think it matters. at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$ JarHandlerContext.toJobGraph(JarHandlerUtils.java:126) at

Re: Recommended approach to debug this

2019-09-23 Thread Biao Liu
> We submit the code through Kubernetes Flink Operator which uses the REST API to submit the job to the Job Manager So you are submitting job through REST API, not Flink client? Could you explain more about this? Thanks, Biao /'bɪ.aʊ/ On Tue, 24 Sep 2019 at 03:44, Debasish Ghosh wrote: > Hi

Re: [SURVEY] How many people are using customized RestartStrategy(s)

2019-09-23 Thread Zhu Zhu
Steven, In my mind, Flink counter only stores its accumulated count and reports that value. Are you using an external counter directly? Maybe Flink Meter/MeterView is what you need? It stores the count and calculates the rate. And it will report its "count" as well as "rate" to external metric

Re: Flink job manager doesn't remove stale checkmarks

2019-09-23 Thread Biao Liu
Hi Clay, Sorry I don't get your point. I'm not sure what the "stale checkmarks" exactly means. The HA storage and checkpoint directory left after shutting down cluster? Thanks, Biao /'bɪ.aʊ/ On Tue, 24 Sep 2019 at 03:12, Clay Teeter wrote: > I'm trying to get my standalone cluster to remove

Re: How to prevent from launching 2 jobs at the same time

2019-09-23 Thread David Morin
Hi, Thanks for your replies. Yes, it could be useful to have a way to define jobid. Thus, I would have been able to define the jbid based on the name for example. At the moment we do not use the REST API but the cli to submit our jobs on Yarn. Nevertheless, I can implement a little trick: at

Job name in logs

2019-09-23 Thread Gaël Renoux
Hello everyone, Is there a way to specify the job name in the logging pattern (since the logging configuration is global for the cluster)? We have two different jobs running on our Flink cluster, and when there's a message it's not obvious which of the two is logging. We can figure it out most of

Re: Recommended approach to debug this

2019-09-23 Thread Debasish Ghosh
Can it be the case that the threadLocal stuff in https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1609 does not behave deterministically when we submit job through a Kubernetes Flink

Re: What is the right way to use the physical partitioning strategy in Data Streams?

2019-09-23 Thread Felipe Gutierrez
I`ve implemented a combiner [1] in Flink by extending OneInputStreamOperator in Flink. I call my operator using "transform". It works well and I guess it is useful if I import this operator in the DataStream.java. I just need more to check if I need to touch other parts of the source code. But

Re: What is the right way to use the physical partitioning strategy in Data Streams?

2019-09-23 Thread Felipe Gutierrez
thanks Biao, I see. To achieve what I want to do I need to work with KeyedStream. I downloaded the Flink source code to learn and alter the KeyedStream to my needs. I am not sure but it is a lot of work because as far as I understood the key-groups have to be predictable [1]. and altering this

Re: Recommended approach to debug this

2019-09-23 Thread Dian Fu
Regarding to the code you pasted, personally I think nothing is wrong. The problem is how it's executed. As you can see from the implementation of of StreamExecutionEnvironment.getExecutionEnvironment, it may created different StreamExecutionEnvironment implementations under different

Re: Recommended approach to debug this

2019-09-23 Thread Debasish Ghosh
This is the complete stack trace which we get from execution on Kubernetes using the Flink Kubernetes operator .. The boxed error comes from the fact that we complete a Promise with Success when it returns a JobExecutionResult and with Failure when we get an exception. And here we r getting an

Re: What is the right way to use the physical partitioning strategy in Data Streams?

2019-09-23 Thread Biao Liu
Wow, that's really cool! There are indeed a lot works you have done. IMO it's beyond the scope of user group somewhat. Just one small concern, I'm not sure I have fully understood your way of "tackle data skew by altering the way Flink partition keys using KeyedStream". >From my understanding,

Re: Per Key Grained Watermark Support

2019-09-23 Thread Congxian Qiu
Hi There was a discussion about this issue[1], as the previous discussion said at the moment this is not supported out of the box by Flink, I think you can try keyed process function as Lasse said. [1]

Re: What is the right way to use the physical partitioning strategy in Data Streams?

2019-09-23 Thread Biao Liu
Hi Felipe, If I understand correctly, you want to solve data skew caused by imbalanced key? There is a common strategy to solve this kind of problem, pre-aggregation. Like combiner of MapReduce. But sadly, AFAIK Flink does not support pre-aggregation currently. I'm afraid you have to implement

Re: How to prevent from launching 2 jobs at the same time

2019-09-23 Thread Till Rohrmann
Hi David, you could use Flink's RestClusterClient and call #listJobs to obtain the list of jobs being executed on the cluster (note that it will also report finished jobs). By providing a properly configured Configuration (e.g. loading flink-conf.yaml via GlobalConfiguration#loadConfiguration) it

Re: Recommended approach to debug this

2019-09-23 Thread Zili Chen
Hi Debasish, The OptimizerPlanEnvironment.ProgramAbortException should be caught at OptimizerPlanEnvironment#getOptimizedPlan in its catch (Throwable t) branch. It should always throw a ProgramInvocationException instead of OptimizerPlanEnvironment.ProgramAbortException if any exception thrown

Re: Per Key Grained Watermark Support

2019-09-23 Thread bupt_ljy
Hi Congxian, Thanks but by doing that, we will lose some features like output of the late data. Original Message Sender: Congxian Qiu Recipient: Lasse Nedergaard Cc: 廖嘉逸; user@flink.apache.org; d...@flink.apache.org Date: Monday, Sep 23, 2019 19:56 Subject: Re: Per Key Grained Watermark

Can I cross talk between environments

2019-09-23 Thread srikanth flink
Hi, I'm using Java code to source from Kafka, streaming to table and registered the table. I understand that I have started the StreamExecutionEnvironment and execution. Is there a way that I could access the registered table/temporal function from SQL client? Thanks Srikanth

consumer并发与kafka并发不一致的情况

2019-09-23 Thread gaofeilong198...@163.com
1. 当kafka consumer的并发大于kafka partition的情况,多余的并发会怎么样? 2. 当kafka consumer并发等于kafka partition个数,但是kafka partition个别分区没有数据的情况,这个空的partition的consumer线程会怎么样?对barrier和watermark的生成有什么影响? gaofeilong198...@163.com

Re: Flink ORC 读取问题

2019-09-23 Thread Terry Wang
能否起一个本地程序,设置断点,看看读取数据那块儿逻辑是不是有问题 Best, Terry Wang > 在 2019年9月23日,下午5:11,ShuQi 写道: > > Flink版本为1.9.0,基于OrcTableSource进行ORC文件的读取,碰到一个问题,程序没有任何异常,顺利执行完毕,但又部分字段读出来始终为null,但实际是有值得,通过直接读取文件的方式可以读取到全部字段。 > > > 请问大家是否有什么好的建议,谢谢! >

Re: 请教初始化系统缓存的问题

2019-09-23 Thread Terry Wang
你好,可以考虑在open方法里启动一个定时的线程去取mysql里去数据和进行缓存更新。 当有新数据流入到你的系统中时,可以判断定时线程数据加载是否完成,当数据加载完成后再进行数据处理。 希望能有帮助~ Best, Terry Wang > 在 2019年9月24日,上午10:45,haoxin...@163.com 写道: > >

Re:consumer并发与kafka并发不一致的情况

2019-09-23 Thread 陈赋赟
HI 1. 当kafka consumer的并发大于kafka partition的情况,多余的并发会怎么样?->这句话是想问当flink slot数大于kafka partition的时候多的slot(其实就是在slot上创建的kafka consumer)会空跑。 如图:kafka partiton为10个,但是flink slot为15,则有5个节点是空跑的状态。 2. 当kafka consumer并发等于kafka partition个数,但是kafka

Re: 关于Async I/O的exactly-once

2019-09-23 Thread Biao Liu
1. 首先你描述的场景,不只存在于 Async IO operator,其他 operator 也会有类似问题 2. Flink 的 exactly once 是针对 Flink 内部而言,例如 state 等,[1] 3. 如果你想针对外部系统也保证 exactly once 语义,需要对应的 connector 支持 [2] 1. https://ci.apache.org/projects/flink/flink-docs-release-1.9/internals/stream_checkpointing.html 2.

Re: Flink大state读取磁盘,磁盘IO打满,任务相互影响的问题探讨

2019-09-23 Thread Congxian Qiu
Hi 像你描述的,单盘对单任务还存在 IO 瓶颈,这里是单 container 吗?像前面大家说的,你需要确认这么大的 IO 访问是符合预期的,如果符合预期的话,你可以尝试增加 blockcache 和 memtable 的大小,将更多的数据放到内存。 另外,你使用的是什么 state 类型,valuestate 和 liststate 的话,能否换成 mapstate 来处理。同时,你可以看下 rocksdb 的 log,看看是否有什么可以优化的地方 Best, Congxian Biao Liu 于2019年9月23日周一 下午2:39写道: > Hello, > >