Re: Re: 实现 KafkaUpsertTableSink

2020-03-30 Thread wangl...@geekplus.com.cn
我只保留 KafkaRetractTableSourceSinkFactory 一个, KafkaRetractTableSinkBase 实现 RetractStreamTableSink 接口,在 consumeDataStream 实现只有 True 才发送,最终 work 了。 @Override public DataStreamSink consumeDataStream(DataStream> dataStream) { DataStream dtNeed = dataStream.filter(x -> x.f0 == Boolean.TRUE).map(x

Re: keyby的乱序处理

2020-03-30 Thread jun su
hi, keyby后的watermark应该是上游多个线程中最小的watermark , 所以数据虽然可能乱序, 但是watermark并不会乱, 不会影响后续的窗口触发 tingli ke 于2020年3月31日周二 上午9:54写道: > 您好, > 针对您的回复,现在的场景是这样子的 > 1、kafka存在多个partition,针对多个Partiton,flink watermark assiger会对每个Partiton > 发射 watermark; > 2、在第一个前提下,水位已经设置好了,还可以在keyby之后在次设置watermark吗? >

Re: Re: 实现 KafkaUpsertTableSink

2020-03-30 Thread Benchao Li
我理解你可以让KafkaRetractTableSourceSinkFactory的参数跟KafkaTableSourceSinkFactory的参数有不同的地方, 然后通过这个参数来区分两个不同的factory。比如加一个参数,表示这个sink是retract还是append类型之类的? wangl...@geekplus.com.cn 于2020年3月31日周二 上午11:17写道: > 这个应该是得到两个 tableFactories 了。 我直接平行的把 KafkaTableSourceSinkFactory 那一套变成 >

Re: [ANNOUNCE] Flink on Zeppelin (Zeppelin 0.9 is released)

2020-03-30 Thread Jingsong Li
Thanks Jeff very much, that is very impressive. Zeppelin is very convenient development platform. Best, Jingsong Lee On Tue, Mar 31, 2020 at 11:58 AM Zhijiang wrote: > > Thanks for the continuous efforts for engaging in Flink ecosystem Jeff! > Glad to see the progressive achievement. Wish

Re: [ANNOUNCE] Flink on Zeppelin (Zeppelin 0.9 is released)

2020-03-30 Thread Zhijiang
Thanks for the continuous efforts for engaging in Flink ecosystem Jeff! Glad to see the progressive achievement. Wish more users try it out in practice. Best, Zhijiang -- From:Dian Fu Send Time:2020 Mar. 31 (Tue.) 10:15 To:Jeff

Re: Issue with single job yarn flink cluster HA

2020-03-30 Thread Dinesh J
Hi Yang, I am attaching one full jobmanager log for a job which I reran today. This a job that tries to read from savepoint. Same error message "leader election onging" is displayed. And this stays the same even after 30 minutes. If I leave the job without yarn kill, it stays the same forever.

Re: Re: 实现 KafkaUpsertTableSink

2020-03-30 Thread wangl...@geekplus.com.cn
这个应该是得到两个 tableFactories 了。 我直接平行的把 KafkaTableSourceSinkFactory 那一套变成 KafkaRetractTableSourceSinkFactory 写了一遍 但这个应该怎样改才合适呢? 137 private static T findSingleInternal( 138 Class factoryClass, 139 Map properties, 140

Re: RE: 实现 KafkaUpsertTableSink

2020-03-30 Thread wangl...@geekplus.com.cn
我以相同的方式 实现了一个 KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client: org.apache.flink.table.planner.delegation.BlinkExecutorFactory at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146) at

Re: [ANNOUNCE] Flink on Zeppelin (Zeppelin 0.9 is released)

2020-03-30 Thread Dian Fu
Hi Jeff, Thanks for the great work and sharing it with the community! Very impressive and will try it out. Regards, Dian > 在 2020年3月30日,下午9:16,Till Rohrmann 写道: > > This is great news Jeff! Thanks a lot for sharing it with the community. > Looking forward trying Flink on Zeppelin out :-) >

Re: Issue with single job yarn flink cluster HA

2020-03-30 Thread Yang Wang
I think your problem is not about akka timeout. Increase the timeout could help in a heavy load cluster, especially for the network is not very good. However, that is not your case now. I am not sure about the "never recovery". Do you mean the logs "Connection refused" keep going and do not have

Re: keyby的乱序处理

2020-03-30 Thread tingli ke
您好, 针对您的回复,现在的场景是这样子的 1、kafka存在多个partition,针对多个Partiton,flink watermark assiger会对每个Partiton 发射 watermark; 2、在第一个前提下,水位已经设置好了,还可以在keyby之后在次设置watermark吗? 3、是否存在可以不用经过第一个前提的方案,直接在keyby之后设置watermark? Jimmy Wong 于2020年3月30日周一 下午9:13写道: > Hi, > watermark 可以在 keyBy 后分配,但是最好紧跟 SourceFunction。经过 KeyBy >

Re: keyby的乱序处理

2020-03-30 Thread tingli ke
您好, 非常感谢您的回复! key化就是keyby之后的,个人理解为keyed(key 化),和您回答的“watermark 可以在 keyBy 后分配”是同一个话题。 Jimmy Wong 于2020年3月30日周一 下午9:13写道: > Hi, > watermark 可以在 keyBy 后分配,但是最好紧跟 SourceFunction。经过 KeyBy > 或其他分配策略,可能导致数据更大的延迟(EventTime)。 > > > “想做key化的乱序处理” 这句没太理解,麻烦解释下。 > > > | | > Jimmy Wong > | > | >

some subtask taking too long

2020-03-30 Thread Fanbin Bu
Hi, I m running flink 1.9 on EMR using flink sql blink planner reading and writing to JDBC input/output. my sql is just a listagg over window for the last 7 days. However, i notice that there are one or two subtasks that take too long to finish. In this thread

Complex graph-based sessionization (potential use for stateful functions)

2020-03-30 Thread Krzysztof Zarzycki
Hi! Interesting problem to solve ahead :) I need to implement a streaming sessionization algorithm (split stream of events into groups of correlated events). It's pretty non-standard as we DON'T have a key like user id which separates the stream into substreams which we just need to chunk based

Re: flink-shaded-hadoop2 for flink 1.10

2020-03-30 Thread Chesnay Schepler
flink-shaded-hadoop2 was released as part of Flink until 1.8 (hence why it followed the Flink version scheme), after which it was renamed to flink-shaded-hadoop-2 and is now being released separately from Flink as part of flink-shaded (a project that bundles various dependencies to be used by

Re: How to enforce ACLs on Flink JobManager/ApplicationMaster URL on Yarn

2020-03-30 Thread Ethan Li
Thanks for sharing! Aaron. Your comment is very helpful. Our end goal is to support multi-tenancy and also share the yarn cluster with MapReduce, Spark and other jobs. We probably need something else. --- I wonder if there is any builtin functionalities in Flink or Yarn that already supports

Re: flink-shaded-hadoop2 for flink 1.10

2020-03-30 Thread Vitaliy Semochkin
Thank you very much Sivaprasanna! It worked! PS Does anyone know what's the difference between flink-shaded-hadoop2 and flink-shaded-hadoop-? Regards, Vitaliy On Mon, Mar 30, 2020 at 8:21 PM Sivaprasanna wrote: > Hi Vitaliy, > > Check for "flink-shaded-hadoop-2". It has dependencies with

Re: How to enforce ACLs on Flink JobManager/ApplicationMaster URL on Yarn

2020-03-30 Thread Aaron Langford
I'd be curious to see how others have done this, but our setup restricts network access to machines in the YARN cluster to a jump box. Access to Flink job manager is limited to whoever can ssh to that box, and that is controlled with an Ansible playbook. Additionally, we have a list of users

How to enforce ACLs on Flink JobManager/ApplicationMaster URL on Yarn

2020-03-30 Thread Ethan Li
Hi Team, I am evaluating Flink on yarn. I can submit a flink job to a secured Yarn cluster and the job can run correctly. But flink jobmanager UI seems accessibly by everyone. Is there anyway in Flink or Yarn to secure it with ACLs? Thanks, Ethan

Re: Issue with single job yarn flink cluster HA

2020-03-30 Thread Dinesh J
HI Yang, Thanks for the clarification and suggestion. But my problem was that recovery never happens and the message "leader election ongoing" is what the message displayed forever. Do you think increasing akka.ask.timeout and akka.tcp.timeout will help in case of a heavy/highload cluster as this

Re: flink-shaded-hadoop2 for flink 1.10

2020-03-30 Thread Sivaprasanna
Hi Vitaliy, Check for "flink-shaded-hadoop-2". It has dependencies with various hadoop versions. https://search.maven.org/artifact/org.apache.flink/flink-shaded-hadoop-2 On Mon, Mar 30, 2020 at 10:13 PM Vitaliy Semochkin wrote: > Hi, > > I can not find flink-shaded-hadoop2 for flink 1.10 in

Re: Lack of KeyedBroadcastStateBootstrapFunction

2020-03-30 Thread Tzu-Li (Gordon) Tai
Thanks! Looking forward to that. On Tue, Mar 31, 2020 at 1:02 AM Mark Niehe wrote: > Hi Gordan and Seth, > > Thanks for explanation and opening up the ticket. I'll add some details in > the ticket to explain what we're trying to do which will hopefully add some > context. > > -- >

Re: Lack of KeyedBroadcastStateBootstrapFunction

2020-03-30 Thread Mark Niehe
Hi Gordan and Seth, Thanks for explanation and opening up the ticket. I'll add some details in the ticket to explain what we're trying to do which will hopefully add some context. -- Mark Niehe · Software Engineer Integrations

flink-shaded-hadoop2 for flink 1.10

2020-03-30 Thread Vitaliy Semochkin
Hi, I can not find flink-shaded-hadoop2 for flink 1.10 in maven repositories. According to maven central https://search.maven.org/artifact/org.apache.flink/flink-shaded-hadoop The latest released version was was 1.8.3 Is it going to be leased soon or one should build it for himself or i'm

Re: Log file environment variable 'log.file' is not set.

2020-03-30 Thread Vitaliy Semochkin
Hello Robert, Thank you for quick response! Indeed logs says the hadoop version is 2.4.1 this is probably because of https://github.com/apache/flink/blob/b17a597dec80e590db2beedda446aa3cae9920dd/pom.xml#L96 How can I make 1.10 to work with my current hadoop version? Regarding flink reporting in

Re: Run several jobs in parallel in same EMR cluster?

2020-03-30 Thread Gary Yao
Can you try to set config option taskmanager.numberOfTaskSlots to 2? By default the TMs only offer one slot [1] independent from the number of CPU cores. Best, Gary [1]

Re: [ANNOUNCE] Flink on Zeppelin (Zeppelin 0.9 is released)

2020-03-30 Thread Till Rohrmann
This is great news Jeff! Thanks a lot for sharing it with the community. Looking forward trying Flink on Zeppelin out :-) Cheers, Till On Mon, Mar 30, 2020 at 2:47 PM Jeff Zhang wrote: > Hi Folks, > > I am very excited to announce the integration work of flink on apache > zeppelin notebook is

回复:keyby的乱序处理

2020-03-30 Thread Jimmy Wong
Hi, watermark 可以在 keyBy 后分配,但是最好紧跟 SourceFunction。经过 KeyBy 或其他分配策略,可能导致数据更大的延迟(EventTime)。 “想做key化的乱序处理” 这句没太理解,麻烦解释下。 | | Jimmy Wong | | wangzmk...@163.com | 签名由网易邮箱大师定制 在2020年03月30日 20:58,tingli ke 写道: 请教一个问题:kafka-per-partition 的watermark的分配,可以在keyby之后分配吗,想做key化的乱序处理能支持吗

keyby的乱序处理

2020-03-30 Thread tingli ke
请教一个问题:kafka-per-partition 的watermark的分配,可以在keyby之后分配吗,想做key化的乱序处理能支持吗

[ANNOUNCE] Flink on Zeppelin (Zeppelin 0.9 is released)

2020-03-30 Thread Jeff Zhang
Hi Folks, I am very excited to announce the integration work of flink on apache zeppelin notebook is completed. You can now run flink jobs via datastream api, table api, sql, pyflink in apache apache zeppelin notebook. Download it here http://zeppelin.apache.org/download.html), Here's some

Flink(≥1.9) Table/SQL Trigger

2020-03-30 Thread Jimmy Wong
Hi,all: 我记得 Flink ( ≥1.9) 的 SQL/Table 是不支持 CountTrigger.of(1),这种自定义Trigger的吧 请问对于 Flink ( ≥1.9) 的 SQL/Table 如何实现自定义 Trigger?比如 CountTrigger (per-record Trigger),ContinuousEventTimeTrigger(specifical-time Trigger) 等。 | | Jimmy Wong | | wangzmk...@163.com | 签名由网易邮箱大师定制

Re: flink 1.10 support LONG as watermark?

2020-03-30 Thread jingjing bai
Hi jarkWu! Is there a FLIP to do so? I'm very glad to learn from idea. Best, jing Jark Wu 于2020年3月30日周一 下午6:52写道: > Hi Jingjing, > > Event time field must be a TIMESTAMP(3) type. You can convert your Long > type value into TIMESTAMP(3) using user-defined function. > I'm sorry that Flink

Run several jobs in parallel in same EMR cluster?

2020-03-30 Thread Antonio Martínez Carratalá
Hello I'm running Flink over Amazon EMR and I'm trying to send several different batch jobs to the cluster after creating it. This is my cluster creation code: StepConfig copyJarStep = new StepConfig()

Re: flink 1.10 support LONG as watermark?

2020-03-30 Thread Jark Wu
Hi Jingjing, Event time field must be a TIMESTAMP(3) type. You can convert your Long type value into TIMESTAMP(3) using user-defined function. I'm sorry that Flink doesn't provide built-in function for this purpose, but will have one soon. For example: CREATE TABLE myTable ( log_ts bigint,

Re: End to End Latency Tracking in flink

2020-03-30 Thread Guanghui Zhang
Hi. At flink source connector, you can send $source_current_time - $event_time metric. In the meantime, at flink sink connector, you can send $sink_current_time - $event_time metric. Then you use $sink_current_time - $event_time - ($source_current_time - $event_time) = $sink_current_time -

flink 1.10 support LONG as watermark?

2020-03-30 Thread jingjing bai
Hi: flinkers! I try to upgrade our production to 1.10V from 1.9 which is our current product version. in our case,the event_time is Long ,and we had implement this function which support long type as a watermark in our inner version, it is a different from the official version on 1.10. on 1.10

Re: Flink YARN app terminated before the client receives the result

2020-03-30 Thread Aljoscha Krettek
I think we have to take a step back here. For per-job (YARN) mode, the general problem is that there are two systems that can do shutdown (and other things) and two clients. There is YARN and there is Flink, and Flink is YARN inside YARN, in a way. The solution, I think, is that cancellation

Re: End to End Latency Tracking in flink

2020-03-30 Thread Oscar Westra van Holthe - Kind
On Mon, 30 Mar 2020 at 05:08, Lu Niu wrote: > $current_processing - $event_time works for event time. How about > processing time? Is there a good way to measure the latency? > To measure latency you'll need some way to determine the time spent between the start and end of your pipeline. To

Re: Log file environment variable 'log.file' is not set.

2020-03-30 Thread Robert Metzger
Hey, which Flink version are you using? Where exactly are you seeing the "Log file environment variable 'log.file' is not set." message? Can you post some context around it? (is this shown from the command line? what are the arguments? is it shown in a file? Usually, the "log.file" property is

Re: Issue with single job yarn flink cluster HA

2020-03-30 Thread Yang Wang
Hi Dinesh, First, i think the error message your provided is not a problem. It just indicates that the leader election is still ongoing. When it finished, the new leader will start the a new dispatcher to provide the webui and rest service. >From your jobmanager logs "Connection refused:

Re: Re:Re: flink savepoint问题

2020-03-30 Thread Yun Tang
Hi 首先,如果这个问题很容易复现的话,我们需要定位到是什么导致了OOMkilled。 1. 打开block-cache usage [1] 观察metrics中block cache的使用量。 2. 麻烦回答一下几个问题,有助于进一步定位 * 单个TM有几个slot * 单个TM的managed memory配置了多少 * 一共声明了多少个keyed state,(如果使用了window,也相当于会使用一个state),其中有多少个map state,是否经常遍历那个map state *

Re: Windows on SinkFunctions

2020-03-30 Thread Robert Metzger
Hey, In your original email, you wrote: Because if I have multiple sinks that that only for one of them I need a > Window, the second solution might be problematic. You can also send the data of an operator to multiple sinks Source --> MyComputationProcessFunction --> DataBatcher -->

Fwd: Lack of KeyedBroadcastStateBootstrapFunction

2020-03-30 Thread Tzu-Li (Gordon) Tai
It seems like Seth's reply didn't make it to the mailing lists somehow. Forwarding his reply below: -- Forwarded message - From: Seth Wiesman Date: Thu, Mar 26, 2020 at 5:16 AM Subject: Re: Lack of KeyedBroadcastStateBootstrapFunction To: Dawid Wysakowicz Cc: , Tzu-Li (Gordon)

Re: Testing RichAsyncFunction with TestHarness

2020-03-30 Thread Gary Yao
> > Additionally even though I add all necessary dependencies defiend in [1] I > cannot see ProcessFunctionTestHarnesses class. > That class was added in Flink 1.10 [1]. [1]

请教两个关于 kafka sink 的问题

2020-03-30 Thread whirly
Hi,大家好: 我在使用 flink kafka sink 时遇到几个问题/疑惑,请教大家。 1. kafka sink 没有像 elasticsearch sink 一样提供一个 ActionRequestFailureHandler,在遇到异常怎么办呢? 而且不确定到底会有哪些异常? 在 FlinkKafkaProducer 的 open中的回调是这样的,onCompletion 只有 RecordMetadata 和 Exception ,不能拿到 Record,而且 callback 是private的,无法通过继承重写 if (logFailuresOnly) {

flink无法提交jars

2020-03-30 Thread Jerome
在local模式下也无法提交jar, 一提交就出错了,请问什么原因呢?unbuntu18.04版本,请教一下,谢谢~!