回复:Flink1.11-release编译部署后sql-client的bug

2020-06-01 Thread 夏帅
好的,感谢


--
发件人:godfrey he 
发送时间:2020年6月2日(星期二) 12:32
收件人:user-zh 
抄 送:夏帅 
主 题:Re: Flink1.11-release编译部署后sql-client的bug

Hi, 夏帅

感谢反馈问题,我建了一个issue https://issues.apache.org/jira/browse/FLINK-18055,应该今天就可以fix

Best,
Godfrey
Leonard Xu  于2020年6月2日周二 下午12:13写道:
Hi, 夏帅

 感谢反馈,这应该是个bug,我 这边本地也复现了,我先看下哈

 祝好,
 Leonard Xu

 > 在 2020年6月2日,11:57,夏帅  写道:
 > 
 > 是我编译的问题么,在window下编译的



Re: Flink1.11-release编译部署后sql-client的bug

2020-06-01 Thread godfrey he
Hi, 夏帅

感谢反馈问题,我建了一个issue https://issues.apache.org/jira/browse/FLINK-18055
,应该今天就可以fix

Best,
Godfrey

Leonard Xu  于2020年6月2日周二 下午12:13写道:

> Hi, 夏帅
>
> 感谢反馈,这应该是个bug,我 这边本地也复现了,我先看下哈
>
> 祝好,
> Leonard Xu
>
> > 在 2020年6月2日,11:57,夏帅  写道:
> >
> > 是我编译的问题么,在window下编译的
>
>


Re: Flink1.11-release编译部署后sql-client的bug

2020-06-01 Thread Leonard Xu
Hi, 夏帅

感谢反馈,这应该是个bug,我 这边本地也复现了,我先看下哈

祝好,
Leonard Xu

> 在 2020年6月2日,11:57,夏帅  写道:
> 
> 是我编译的问题么,在window下编译的



Flink1.11-release编译部署后sql-client的bug

2020-06-01 Thread 夏帅

大家好,有人编译部署过flink-1.11-release么,为什么我使用sql-client时设置了catalog
但是并不生效,顺带自动补全也不太好使
是我编译的问题么,在window下编译的
编译步骤见链接
https://jxeditor.github.io/2020/06/01/Flink1.11.0%E7%BC%96%E8%AF%91/
$ mvn -version
Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 
2018-06-18T02:33:14+08:00)

Flink SQL> show catalogs;
default_catalog
hive

Flink SQL> use  catalog hive;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.catalog.exceptions.CatalogException: A catalog with name 
[`hive`] does not exist.




Re: Native kubernetes integration for standalone job clusters.

2020-06-01 Thread Yang Wang
Hi Akshay,

Currently, zookeeper HA service could be used for both session cluster and
job cluster
when deploying Flink on K8s. If you mean to using the native K8s HA(i.g.
native leader
election and configmap to store meta)[1], i think it could not be supported
now.


[1]. https://issues.apache.org/jira/browse/FLINK-12884

Best,
Yang

Akshay Iyangar  于2020年6月2日周二 上午3:46写道:

>
>
> Hi
>
>
>
> So we actually run Flink in Kubernetes as a standalone Flink job for each
> Flink pipeline currently.
>
>
>
> We wanted to take advantage of Flink HA using K8 but looks like it only
> supports Flink session  clusters currently for version 1.10.
>
>
>
> Any ideas when will it have support for standalone job clusters?
>
>
>
> Thanks
>
> Akshay I
>
>
>


Re: kerberos integration with flink

2020-06-01 Thread Yangze Guo
It sounds good to me. If your job keeps running (longer than the
expiration time), I think it implies that Krb5LoginModule will use
your newly generated cache. It's my pleasure to help you.

Best,
Yangze Guo

On Mon, Jun 1, 2020 at 10:47 PM Nick Bendtner  wrote:
>
> Hi Guo,
> The auto renewal happens fine, however I want to generate a new ticket with a 
> new renew until period so that the job can run longer than 7 days, I am 
> talking about the second paragraph your email, I have set a custom cache by 
> setting KRB5CCNAME . Just want to make sure that Krb5LoginModule does a 
> re-login like you said. I think it does because I generated a new ticket when 
> the flink job was running and the job continues to auto renew the new ticket. 
> Let me know if you can think of any pit falls. Once again i really want to 
> thank you for your help and your time.
>
> Best,
> Nick.
>
> On Mon, Jun 1, 2020 at 12:29 AM Yangze Guo  wrote:
>>
>> Hi, Nick.
>>
>> Do you mean that you manually execute "kinit -R" to renew the ticket cache?
>> If that is the case, Flink already sets the "renewTGT" to true. If
>> everything is ok, you do not need to do it yourself. However, it seems
>> this mechanism has a bug and this bug is not fixed in all JDK
>> versions. Please refer to [1].
>>
>> If you mean that you generate a new ticket cache in the same place(by
>> default /tmp/krb5cc_uid), I'm not sure will Krb5LoginModule re-login
>> with your new ticket cache. I'll try to do a deeper investigation.
>>
>> [1] https://bugs.openjdk.java.net/browse/JDK-8058290.
>>
>> Best,
>> Yangze Guo
>>
>> On Sat, May 30, 2020 at 3:07 AM Nick Bendtner  wrote:
>> >
>> > Hi Guo,
>> > Thanks again for your inputs. If I periodically renew the kerberos cache 
>> > using an external process(kinit) on all flink nodes in standalone mode, 
>> > will the cluster still be short lived or will the new ticket in the cache 
>> > be used and the cluster can live till the end of the new expiry ?
>> >
>> > Best,
>> > Nick.
>> >
>> > On Sun, May 24, 2020 at 9:15 PM Yangze Guo  wrote:
>> >>
>> >> Yes, you can use kinit. But AFAIK, if you deploy Flink on Kubernetes
>> >> or Mesos, Flink will not ship the ticket cache. If you deploy Flink on
>> >> Yarn, Flink will acquire delegation tokens with your ticket cache and
>> >> set tokens for job manager and task executor. As the document said,
>> >> the main drawback is that the cluster is necessarily short-lived since
>> >> the generated delegation tokens will expire (typically within a week).
>> >>
>> >> Best,
>> >> Yangze Guo
>> >>
>> >> On Sat, May 23, 2020 at 1:23 AM Nick Bendtner  wrote:
>> >> >
>> >> > Hi Guo,
>> >> > Even for HDFS I don't really need to set 
>> >> > "security.kerberos.login.contexts" . As long as there is the right 
>> >> > ticket in the ticket cache before starting the flink cluster it seems 
>> >> > to work fine. I think even [4] from your reference seems to do the same 
>> >> > thing. I have defined own ticket cache specifically for flink cluster 
>> >> > by setting this environment variable. Before starting the cluster I 
>> >> > create a ticket by using kinit.
>> >> > This is how I make flink read this cache.
>> >> > export KRB5CCNAME=/home/was/Jaas/krb5cc . I think even flink tries to 
>> >> > find the location of ticket cache using this variable [1].
>> >> > Do you see any problems in setting up hadoop security module this way ? 
>> >> > And thanks a lot for your help.
>> >> >
>> >> > [1] 
>> >> > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
>> >> >
>> >> > Best,
>> >> > Nick
>> >> >
>> >> >
>> >> >
>> >> > On Thu, May 21, 2020 at 9:54 PM Yangze Guo  wrote:
>> >> >>
>> >> >> Hi, Nick,
>> >> >>
>> >> >> From my understanding, if you configure the
>> >> >> "security.kerberos.login.keytab", Flink will add the
>> >> >> AppConfigurationEntry of this keytab to all the apps defined in
>> >> >> "security.kerberos.login.contexts". If you define
>> >> >> "java.security.auth.login.config" at the same time, Flink will also
>> >> >> keep the configuration in it. For more details, see [1][2].
>> >> >>
>> >> >> If you want to use this keytab to interact with HDFS, HBase and Yarn,
>> >> >> you need to set "security.kerberos.login.contexts". See [3][4].
>> >> >>
>> >> >> [1] 
>> >> >> https://ci.apache.org/projects/flink/flink-docs-master/ops/security-kerberos.html#jaas-security-module
>> >> >> [2] 
>> >> >> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
>> >> >> [3] 
>> >> >> https://ci.apache.org/projects/flink/flink-docs-master/ops/security-kerberos.html#hadoop-security-module
>> >> >> [4] 
>> >> >> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
>> >> >>
>> >> >> Best,
>> >> >> Yangze Guo
>> >> >>
>> >> >> On Thu, May 21, 2020 at 11:06 PM Nick Bendtner  
>> >> >> 

Re: Flink1.9 yarn session模式 经常Ask timed out

2020-06-01 Thread Benchao Li
我们一般遇到这种情况,大都是JobManager Full GC导致的。你可以检查下你的JM的内存使用情况

star <3149768...@qq.com> 于2020年6月2日周二 上午10:03写道:

> 您好,
>
> Flink1.9 yarn session模式 经常Ask timed out
>
>
> 在执行flink list shell命令的时候连接不上 报
> akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/dispatcher#180439734]] after [1 ms]. Message
> of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A
> typical reason for `AskTimeoutException` is that the recipient actor didn't
> send a reply.
>
>
>
> 提交作业也是一样
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.rest.util.RestClientException: [Job cancellation
> timed out.]
>
>
>
>
>
> 此时web ui打开会一直提示:Server Response Message: Internal server error.
>  作业列表也是空的
>
>
>
>
>
>
> 此种情况过阵子自己能恢复,但是这期间什么都干不了
>
>
> 请问大家有遇到过吗? 谢谢



-- 

Best,
Benchao Li


Flink1.9 yarn session???? ????Ask timed out

2020-06-01 Thread star
??

Flink1.9 yarn session Ask timed out


??flink list shell?? ??
akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/dispatcher#180439734]] after [1 ms]. Message of 
type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical 
reason for `AskTimeoutException` is that the recipient actor didn't send a 
reply.




Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.rest.util.RestClientException: [Job cancellation timed 
out.]





web uiServer Response Message: Internal server error. 
 









 

Re:回复: 关于使用IngressTime,window过小的问题

2020-06-01 Thread chaojianok
把 Time.milliseconds(10L) 改成 Time.seconds(10L) 
后,其实是改变了时间窗口的大小,这会使以前在同一个窗口的数据现在被分在了两个窗口里,而聚合的时候是按照窗口进行聚合的,所以结果变了。















在 2020-06-01 22:41:14,"xue...@outlook.com"  写道:
>如果说window的10毫秒的状态ValueState被超时逐出了,可以理解。但不带window的聚合操作是否意味着所有的key的
>ValueState都存在与StateBackend中,是否会无限制增长,超过集群的一些限制,比如内存、slot等会怎么样。
>即使ValueState在window中被逐出,但代码中未明确指定TimeService和逐出器。
>
>如果假设相同的key在被处理时在时空上间隔足够远,不带windows和带windows的输出结果是否还会不同?
>
>
>发送自 Windows 10 版邮件应用
>
>发件人: xue...@outlook.com
>发送时间: 2020年6月1日 22:27
>收件人: user-zh@flink.apache.org
>主题: 回复: 关于使用IngressTime,window过小的问题
>
>如果不使用window,那么输出会按照ValueState的存量的key的ValueState聚合
>输出
>```
>+++:2> (k1,1)
>+++:1> (k3,10)
>+++:2> (k1,11)
>+++:8> (k2,2)
>+++:2> (k1,22)
>+++:8> (k2,22)
>```
>
>发送自 Windows 10 版邮件应用
>
>发件人: xue...@outlook.com
>发送时间: 2020年6月1日 22:22
>收件人: user-zh
>主题: 关于使用IngressTime,window过小的问题
>
>Flink 1.10,windows 10 flink api验证
>
>代码如下
>```
>
>import org.apache.flink.api.java.functions.KeySelector;
>import org.apache.flink.api.java.tuple.Tuple2;
>import org.apache.flink.streaming.api.TimeCharacteristic;
>import org.apache.flink.streaming.api.datastream.DataStreamSource;
>import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
>import 
>org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
>import 
>org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
>import org.apache.flink.streaming.api.windowing.time.Time;
>
>import java.util.ArrayList;
>import java.util.List;
>
>public class KeyedStreamJob {
>public static void main(String[] args) throws Exception {
>StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>//env.setParallelism(3);
>
>Tuple2 item = null;
>List> items = new ArrayList<>();
>item = new Tuple2<>("k1", 1);
>items.add(item);
>item = new Tuple2<>("k3", 10);
>items.add(item);
>item = new Tuple2<>("k1", 10);
>items.add(item);
>item = new Tuple2<>("k2", 2);
>items.add(item);
>item = new Tuple2<>("k1", 11);
>items.add(item);
>item = new Tuple2<>("k2", 20);
>items.add(item);
>DataStreamSource> streamSource = 
> env.fromCollection(items);
>streamSource
>//by 1
>//.assignTimestampsAndWatermarks(new IngestionTimeExtractor())
>.keyBy(new KeySelector, String>() {
>@Override
>public String getKey(Tuple2 value) throws 
> Exception {
>return value.f0;
>}
>})
>.window(TumblingEventTimeWindows.of(Time.milliseconds(10L)))
>.sum(1)
>.print("+++");
>
>env.execute("keyedSteamJob");
>}
>}
>
>```
>输出
>```
>+++:1> (k3,10)
>+++:2> (k1,1)
>+++:8> (k2,22)
>+++:2> (k1,21)
>```
>如果把
>
>window(TumblingEventTimeWindows.of(Time.milliseconds(10L)))
>改成
>
>.window(TumblingEventTimeWindows.of(Time.seconds(10L)))
>输出
>```
>+++:8> (k2,22)
>+++:1> (k3,10)
>+++:2> (k1,22)
>```
>两次不同的windows窗口,第一次输出对于key=‘k1‘不聚集,第二次输出聚集
>
>为什么会这样,如何验证怎么样的过程处理流程导致这样的结果区别
>
>如果k1=1已经在ValueState中(2>(k1,1)),
>那么再次输出时currentKey=k1时,这个时候ValueState的value是1,那么输出应该是10+11+1,而不是10+11;
>
>
>如果window改成1秒也是按照正常结果输出
>
>
>
>
>
>发送自 Windows 10 版邮件应用
>
>


RE: History Server Not Showing Any Jobs - File Not Found?

2020-06-01 Thread Hailu, Andreas
So I created a new HDFS directory with just 1 archive and pointed the server to 
monitor that directory, et voila - I'm able to see the applications in the UI. 
So it must have been really churning trying to fetch all of those initial 
archives :)

I have a couple of follow up questions if you please:

1.  What is the upper limit of the number of archives the history server 
can support? Does it attempt to download every archive and load them all into 
memory?

2.  Retention: we have on the order of 100K applications per day in our 
production environment. Is there any native retention of policy? E.g. only keep 
the latest X archives in the dir - or is this something we need to manage 
ourselves?

Thanks.

// ah

From: Hailu, Andreas [Engineering]
Sent: Friday, May 29, 2020 8:46 AM
To: 'Chesnay Schepler' ; user@flink.apache.org
Subject: RE: History Server Not Showing Any Jobs - File Not Found?

Yes, these are all in the same directory, and we're at 67G right now. I'll try 
with incrementally smaller directories and let you know what I find.

// ah

From: Chesnay Schepler mailto:ches...@apache.org>>
Sent: Friday, May 29, 2020 3:11 AM
To: Hailu, Andreas [Engineering] 
mailto:andreas.ha...@ny.email.gs.com>>; 
user@flink.apache.org
Subject: Re: History Server Not Showing Any Jobs - File Not Found?

oh I'm not using the HistoryServer; I just wrote it ;)
Are these archives all in the same location? So we're roughly looking at 5 GB 
of archives then?

That could indeed "just" be a resource problem. The HistoryServer eagerly 
downloads all archives, and not on-demand.
The next step would be to move some of the archives into a separate HDFS 
directory and try again.

(Note that by configuring "historyserver.web.tmpdir" to some permanent 
directory subsequent (re)starts of the HistorySserver can re-use this 
directory; so you only have to download things once)

On 29/05/2020 00:43, Hailu, Andreas wrote:
May I also ask what version of flink-hadoop you're using and the number of jobs 
you're storing the history for? As of writing we have roughly 101,000 
application history files. I'm curious to know if we're encountering some kind 
of resource problem.

// ah

From: Hailu, Andreas [Engineering]
Sent: Thursday, May 28, 2020 12:18 PM
To: 'Chesnay Schepler' ; 
user@flink.apache.org
Subject: RE: History Server Not Showing Any Jobs - File Not Found?

Okay, I will look further to see if we're mistakenly using a version that's 
pre-2.6.0. However, I don't see flink-shaded-hadoop in my /lib directory for 
flink-1.9.1.

flink-dist_2.11-1.9.1.jar
flink-table-blink_2.11-1.9.1.jar
flink-table_2.11-1.9.1.jar
log4j-1.2.17.jar
slf4j-log4j12-1.7.15.jar

Are the files within /lib.

// ah

From: Chesnay Schepler mailto:ches...@apache.org>>
Sent: Thursday, May 28, 2020 11:00 AM
To: Hailu, Andreas [Engineering] 
mailto:andreas.ha...@ny.email.gs.com>>; 
user@flink.apache.org
Subject: Re: History Server Not Showing Any Jobs - File Not Found?

Looks like it is indeed stuck on downloading the archive.

I searched a bit in the Hadoop JIRA and found several similar instances:
https://issues.apache.org/jira/browse/HDFS-6999
https://issues.apache.org/jira/browse/HDFS-7005
https://issues.apache.org/jira/browse/HDFS-7145

It is supposed to be fixed in 2.6.0 though :/

If hadoop is available from the HADOOP_CLASSPATH and flink-shaded-hadoop in 
/lib then you basically don't know what Hadoop version is actually being used,
which could lead to incompatibilities and dependency clashes.
If flink-shaded-hadoop 2.4/2.5 is on the classpath, maybe that is being used 
and runs into HDFS-7005.

On 28/05/2020 16:27, Hailu, Andreas wrote:
Just created a dump, here's what I see:

"Flink-HistoryServer-ArchiveFetcher-thread-1" #19 daemon prio=5 os_prio=0 
tid=0x7f93a5a2c000 nid=0x5692 runnable [0x7f934a0d3000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at 

Native kubernetes integration for standalone job clusters.

2020-06-01 Thread Akshay Iyangar

Hi

So we actually run Flink in Kubernetes as a standalone Flink job for each Flink 
pipeline currently.

We wanted to take advantage of Flink HA using K8 but looks like it only 
supports Flink session  clusters currently for version 1.10.

Any ideas when will it have support for standalone job clusters?

Thanks
Akshay I



回复: 关于使用IngressTime,window过小的问题

2020-06-01 Thread 1048262223
Hi
sum的实现是aggregatefunction,每次输出只会输出当前窗口的聚合结果,结果不同可能是因为上游时间戳提取是按照ingestion提取的把1 10 
11分配到了两个窗口中1一个窗口,10和11一个窗口


Best
Yichao Yang



发自我的iPhone


-- 原始邮件 --
发件人: xue...@outlook.com https://go.microsoft.com/fwlink/?LinkId=550986应用

发件人: xue...@outlook.com

Re: kerberos integration with flink

2020-06-01 Thread Nick Bendtner
Hi Guo,
The auto renewal happens fine, however I want to generate a new ticket with
a new renew until period so that the job can run longer than 7 days, I am
talking about the second paragraph your email, I have set a custom cache by
setting KRB5CCNAME . Just want to make sure that Krb5LoginModule does a
re-login like you said. I think it does because I generated a new ticket
when the flink job was running and the job continues to auto renew the new
ticket. Let me know if you can think of any pit falls. Once again i really
want to thank you for your help and your time.

Best,
Nick.

On Mon, Jun 1, 2020 at 12:29 AM Yangze Guo  wrote:

> Hi, Nick.
>
> Do you mean that you manually execute "kinit -R" to renew the ticket cache?
> If that is the case, Flink already sets the "renewTGT" to true. If
> everything is ok, you do not need to do it yourself. However, it seems
> this mechanism has a bug and this bug is not fixed in all JDK
> versions. Please refer to [1].
>
> If you mean that you generate a new ticket cache in the same place(by
> default /tmp/krb5cc_uid), I'm not sure will Krb5LoginModule re-login
> with your new ticket cache. I'll try to do a deeper investigation.
>
> [1] https://bugs.openjdk.java.net/browse/JDK-8058290.
>
> Best,
> Yangze Guo
>
> On Sat, May 30, 2020 at 3:07 AM Nick Bendtner  wrote:
> >
> > Hi Guo,
> > Thanks again for your inputs. If I periodically renew the kerberos cache
> using an external process(kinit) on all flink nodes in standalone mode,
> will the cluster still be short lived or will the new ticket in the cache
> be used and the cluster can live till the end of the new expiry ?
> >
> > Best,
> > Nick.
> >
> > On Sun, May 24, 2020 at 9:15 PM Yangze Guo  wrote:
> >>
> >> Yes, you can use kinit. But AFAIK, if you deploy Flink on Kubernetes
> >> or Mesos, Flink will not ship the ticket cache. If you deploy Flink on
> >> Yarn, Flink will acquire delegation tokens with your ticket cache and
> >> set tokens for job manager and task executor. As the document said,
> >> the main drawback is that the cluster is necessarily short-lived since
> >> the generated delegation tokens will expire (typically within a week).
> >>
> >> Best,
> >> Yangze Guo
> >>
> >> On Sat, May 23, 2020 at 1:23 AM Nick Bendtner 
> wrote:
> >> >
> >> > Hi Guo,
> >> > Even for HDFS I don't really need to set
> "security.kerberos.login.contexts" . As long as there is the right ticket
> in the ticket cache before starting the flink cluster it seems to work
> fine. I think even [4] from your reference seems to do the same thing. I
> have defined own ticket cache specifically for flink cluster by setting
> this environment variable. Before starting the cluster I create a ticket by
> using kinit.
> >> > This is how I make flink read this cache.
> >> > export KRB5CCNAME=/home/was/Jaas/krb5cc . I think even flink tries to
> find the location of ticket cache using this variable [1].
> >> > Do you see any problems in setting up hadoop security module this way
> ? And thanks a lot for your help.
> >> >
> >> > [1]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
> >> >
> >> > Best,
> >> > Nick
> >> >
> >> >
> >> >
> >> > On Thu, May 21, 2020 at 9:54 PM Yangze Guo 
> wrote:
> >> >>
> >> >> Hi, Nick,
> >> >>
> >> >> From my understanding, if you configure the
> >> >> "security.kerberos.login.keytab", Flink will add the
> >> >> AppConfigurationEntry of this keytab to all the apps defined in
> >> >> "security.kerberos.login.contexts". If you define
> >> >> "java.security.auth.login.config" at the same time, Flink will also
> >> >> keep the configuration in it. For more details, see [1][2].
> >> >>
> >> >> If you want to use this keytab to interact with HDFS, HBase and Yarn,
> >> >> you need to set "security.kerberos.login.contexts". See [3][4].
> >> >>
> >> >> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/security-kerberos.html#jaas-security-module
> >> >> [2]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
> >> >> [3]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/security-kerberos.html#hadoop-security-module
> >> >> [4]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
> >> >>
> >> >> Best,
> >> >> Yangze Guo
> >> >>
> >> >> On Thu, May 21, 2020 at 11:06 PM Nick Bendtner 
> wrote:
> >> >> >
> >> >> > Hi guys,
> >> >> > Is there any difference in providing kerberos config to the flink
> jvm using this method in the flink configuration?
> >> >> >
> >> >> > env.java.opts:  -Dconfig.resource=qa.conf
> -Djava.library.path=/usr/mware/flink-1.7.2/simpleapi/lib/
> -Djava.security.auth.login.config=/usr/mware/flink-1.7.2/Jaas/kafka-jaas.conf
> -Djava.security.krb5.conf=/usr/mware/flink-1.7.2/Jaas/krb5.conf
> >> >> >
> >> >> > Is there any difference in doing 

回复: 关于使用IngressTime,window过小的问题

2020-06-01 Thread xue...@outlook.com
如果说window的10毫秒的状态ValueState被超时逐出了,可以理解。但不带window的聚合操作是否意味着所有的key的
ValueState都存在与StateBackend中,是否会无限制增长,超过集群的一些限制,比如内存、slot等会怎么样。
即使ValueState在window中被逐出,但代码中未明确指定TimeService和逐出器。

如果假设相同的key在被处理时在时空上间隔足够远,不带windows和带windows的输出结果是否还会不同?


发送自 Windows 10 版邮件应用

发件人: xue...@outlook.com
发送时间: 2020年6月1日 22:27
收件人: user-zh@flink.apache.org
主题: 回复: 关于使用IngressTime,window过小的问题

如果不使用window,那么输出会按照ValueState的存量的key的ValueState聚合
输出
```
+++:2> (k1,1)
+++:1> (k3,10)
+++:2> (k1,11)
+++:8> (k2,2)
+++:2> (k1,22)
+++:8> (k2,22)
```

发送自 Windows 10 版邮件应用

发件人: xue...@outlook.com
发送时间: 2020年6月1日 22:22
收件人: user-zh
主题: 关于使用IngressTime,window过小的问题

Flink 1.10,windows 10 flink api验证

代码如下
```

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.ArrayList;
import java.util.List;

public class KeyedStreamJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
//env.setParallelism(3);

Tuple2 item = null;
List> items = new ArrayList<>();
item = new Tuple2<>("k1", 1);
items.add(item);
item = new Tuple2<>("k3", 10);
items.add(item);
item = new Tuple2<>("k1", 10);
items.add(item);
item = new Tuple2<>("k2", 2);
items.add(item);
item = new Tuple2<>("k1", 11);
items.add(item);
item = new Tuple2<>("k2", 20);
items.add(item);
DataStreamSource> streamSource = 
env.fromCollection(items);
streamSource
//by 1
//.assignTimestampsAndWatermarks(new IngestionTimeExtractor())
.keyBy(new KeySelector, String>() {
@Override
public String getKey(Tuple2 value) throws 
Exception {
return value.f0;
}
})
.window(TumblingEventTimeWindows.of(Time.milliseconds(10L)))
.sum(1)
.print("+++");

env.execute("keyedSteamJob");
}
}

```
输出
```
+++:1> (k3,10)
+++:2> (k1,1)
+++:8> (k2,22)
+++:2> (k1,21)
```
如果把

window(TumblingEventTimeWindows.of(Time.milliseconds(10L)))
改成

.window(TumblingEventTimeWindows.of(Time.seconds(10L)))
输出
```
+++:8> (k2,22)
+++:1> (k3,10)
+++:2> (k1,22)
```
两次不同的windows窗口,第一次输出对于key=‘k1‘不聚集,第二次输出聚集

为什么会这样,如何验证怎么样的过程处理流程导致这样的结果区别

如果k1=1已经在ValueState中(2>(k1,1)),
那么再次输出时currentKey=k1时,这个时候ValueState的value是1,那么输出应该是10+11+1,而不是10+11;


如果window改成1秒也是按照正常结果输出





发送自 Windows 10 版邮件应用




回复: 关于使用IngressTime,window过小的问题

2020-06-01 Thread xue...@outlook.com
如果不使用window,那么输出会按照ValueState的存量的key的ValueState聚合
输出
```
+++:2> (k1,1)
+++:1> (k3,10)
+++:2> (k1,11)
+++:8> (k2,2)
+++:2> (k1,22)
+++:8> (k2,22)
```

发送自 Windows 10 版邮件应用

发件人: xue...@outlook.com
发送时间: 2020年6月1日 22:22
收件人: user-zh
主题: 关于使用IngressTime,window过小的问题

Flink 1.10,windows 10 flink api验证

代码如下
```

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.ArrayList;
import java.util.List;

public class KeyedStreamJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
//env.setParallelism(3);

Tuple2 item = null;
List> items = new ArrayList<>();
item = new Tuple2<>("k1", 1);
items.add(item);
item = new Tuple2<>("k3", 10);
items.add(item);
item = new Tuple2<>("k1", 10);
items.add(item);
item = new Tuple2<>("k2", 2);
items.add(item);
item = new Tuple2<>("k1", 11);
items.add(item);
item = new Tuple2<>("k2", 20);
items.add(item);
DataStreamSource> streamSource = 
env.fromCollection(items);
streamSource
//by 1
//.assignTimestampsAndWatermarks(new IngestionTimeExtractor())
.keyBy(new KeySelector, String>() {
@Override
public String getKey(Tuple2 value) throws 
Exception {
return value.f0;
}
})
.window(TumblingEventTimeWindows.of(Time.milliseconds(10L)))
.sum(1)
.print("+++");

env.execute("keyedSteamJob");
}
}

```
输出
```
+++:1> (k3,10)
+++:2> (k1,1)
+++:8> (k2,22)
+++:2> (k1,21)
```
如果把

window(TumblingEventTimeWindows.of(Time.milliseconds(10L)))
改成

.window(TumblingEventTimeWindows.of(Time.seconds(10L)))
输出
```
+++:8> (k2,22)
+++:1> (k3,10)
+++:2> (k1,22)
```
两次不同的windows窗口,第一次输出对于key=‘k1‘不聚集,第二次输出聚集

为什么会这样,如何验证怎么样的过程处理流程导致这样的结果区别

如果k1=1已经在ValueState中(2>(k1,1)),
那么再次输出时currentKey=k1时,这个时候ValueState的value是1,那么输出应该是10+11+1,而不是10+11;


如果window改成1秒也是按照正常结果输出





发送自 Windows 10 版邮件应用



Re: pyflink 使用udf函数 处理成json字符串发送到kafka的格式问题

2020-06-01 Thread Xingbo Huang
客气客气,互相交流学习

Best,
Xingbo

jack  于2020年6月1日周一 下午9:07写道:

> 非常感谢解惑,刚开始使用pyflink,不是很熟悉,还请多多指教
>
>
>
>
>
>
> 在 2020-06-01 20:50:53,"Xingbo Huang"  写道:
>
> Hi,
> 其实这个是CSV connector的一个可选的
> quote_character参数的效果,默认值是",所以你每个字段都会这样外面包一层,你可以配置一下这个参数为\0,就可以得到你要的效果了。
> st_env.connect(
> Kafka()
> .version("0.11")
> .topic("logSink")
> .start_from_earliest()
> .property("zookeeper.connect", "localhost:2181")
> .property("bootstrap.servers", "localhost:9092")) \
> .with_format(  # declare a format for this system
> Csv()
> .schema(DataTypes.ROW([DataTypes.FIELD("log",
> DataTypes.STRING())]))
> .quote_character("\0")
> ) \
> .with_schema(  # declare the schema of the table
> Schema()
> .field("log", DataTypes.STRING())) \
> .in_append_mode() \
> .register_table_sink("sink")
>
> Best,
> Xingbo
>
>>
>>
>>


Re: pyflink 使用udf函数 处理成json字符串发送到kafka的格式问题

2020-06-01 Thread Xingbo Huang
客气客气,互相交流学习

Best,
Xingbo

jack  于2020年6月1日周一 下午9:07写道:

> 非常感谢解惑,刚开始使用pyflink,不是很熟悉,还请多多指教
>
>
>
>
>
>
> 在 2020-06-01 20:50:53,"Xingbo Huang"  写道:
>
> Hi,
> 其实这个是CSV connector的一个可选的
> quote_character参数的效果,默认值是",所以你每个字段都会这样外面包一层,你可以配置一下这个参数为\0,就可以得到你要的效果了。
> st_env.connect(
> Kafka()
> .version("0.11")
> .topic("logSink")
> .start_from_earliest()
> .property("zookeeper.connect", "localhost:2181")
> .property("bootstrap.servers", "localhost:9092")) \
> .with_format(  # declare a format for this system
> Csv()
> .schema(DataTypes.ROW([DataTypes.FIELD("log",
> DataTypes.STRING())]))
> .quote_character("\0")
> ) \
> .with_schema(  # declare the schema of the table
> Schema()
> .field("log", DataTypes.STRING())) \
> .in_append_mode() \
> .register_table_sink("sink")
>
> Best,
> Xingbo
>
>>
>>
>>


关于使用IngressTime,window过小的问题

2020-06-01 Thread xue...@outlook.com
Flink 1.10,windows 10 flink api验证

代码如下
```

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.ArrayList;
import java.util.List;

public class KeyedStreamJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
//env.setParallelism(3);

Tuple2 item = null;
List> items = new ArrayList<>();
item = new Tuple2<>("k1", 1);
items.add(item);
item = new Tuple2<>("k3", 10);
items.add(item);
item = new Tuple2<>("k1", 10);
items.add(item);
item = new Tuple2<>("k2", 2);
items.add(item);
item = new Tuple2<>("k1", 11);
items.add(item);
item = new Tuple2<>("k2", 20);
items.add(item);
DataStreamSource> streamSource = 
env.fromCollection(items);
streamSource
//by 1
//.assignTimestampsAndWatermarks(new IngestionTimeExtractor())
.keyBy(new KeySelector, String>() {
@Override
public String getKey(Tuple2 value) throws 
Exception {
return value.f0;
}
})
.window(TumblingEventTimeWindows.of(Time.milliseconds(10L)))
.sum(1)
.print("+++");

env.execute("keyedSteamJob");
}
}

```
输出
```
+++:1> (k3,10)
+++:2> (k1,1)
+++:8> (k2,22)
+++:2> (k1,21)
```
如果把

window(TumblingEventTimeWindows.of(Time.milliseconds(10L)))
改成

.window(TumblingEventTimeWindows.of(Time.seconds(10L)))
输出
```
+++:8> (k2,22)
+++:1> (k3,10)
+++:2> (k1,22)
```
两次不同的windows窗口,第一次输出对于key=‘k1‘不聚集,第二次输出聚集

为什么会这样,如何验证怎么样的过程处理流程导致这样的结果区别

如果k1=1已经在ValueState中(2>(k1,1)),
那么再次输出时currentKey=k1时,这个时候ValueState的value是1,那么输出应该是10+11+1,而不是10+11;


如果window改成1秒也是按照正常结果输出





发送自 Windows 10 版邮件应用



Re: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交

2020-06-01 Thread godfrey he
目前StreamTableEnvironment和TableEnvironment在DAG优化方面的行为自1.11开始都是一样的了,建议都使用StatementSet来支持多insert。TableEnvironment以后还会进一步的简化和整合。

Best,
Godfrey

wind.fly@outlook.com  于2020年5月28日周四 下午5:45写道:

> Hi,
>
> StreamTableEnvironment类似于TableEnvironment的Dag优化有版本规划吗?未来TableEnvironment的dag优化和StreamTableEnvironment较丰富api会不会统一?
>
> Best,
> Junbao Zhang
> 
> 发件人: Benchao Li 
> 发送时间: 2020年5月28日 17:35
> 收件人: user-zh 
> 主题: Re: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交
>
> Hi,
>
> 时间差了13个小时这个比较奇怪,我理解不应该出现这个问题的。你是说不用TableEnvironment就不会出现这个问题么?
>
> 第二个问题,TableEnvironment目前应该是没法设置checkpoint的,这个应该是只能在StreamExecutionEnvironment来配置吧。
>
> wind.fly@outlook.com  于2020年5月28日周四
> 下午5:27写道:
>
> > Hi, Benchao:
> >
> >
> DAG图是指向了同一个问题,但是用了TableEnvironment后,发现所有的时间都差了13个小时,比如调用LOCALTIMESTAMP函数。另外问一个问题,使用TableEnvironment怎么设置checkpoint?
> >
> >
> >
> >
> > Best,
> > Junbao Zhang
> > 
> > 发件人: Benchao Li 
> > 发送时间: 2020年5月28日 17:05
> > 收件人: user-zh 
> > 主题: Re: 疑问:flink sql
> 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交
> >
> > 嗯,是的。这个是blink planner特有的优化,可以支持多个sink的DAG图的优化。你也可以看下Flink
> > UI上的任务DAG图,比较明显的可以看到只有一个Source,后面接了两个Sink。
> >
> > wind.fly@outlook.com  于2020年5月28日周四
> > 下午5:02写道:
> >
> > > Hi, Benchao:
> > > 谢谢回复,经过测试确实是这样,也就是同一个TableEnvironment下,如果有多条insert语句访问同一个表,该表的数据只消费一次?
> > >
> > >
> > >
> > >
> > > Best,
> > > Junbao Zhang
> > > 
> > > 发件人: Benchao Li 
> > > 发送时间: 2020年5月28日 15:59
> > > 收件人: user-zh 
> > > 主题: Re: 疑问:flink sql
> > 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交
> > >
> > >
> > >
> >
> 如果你的tEnv用的是TableEnvironment,而不是StreamTableEnvironment的话,两个insert会公用前面的source,也就是会只读取a表一次,然后分别给下游c和d用。
> > >
> > > wind.fly@outlook.com  于2020年5月28日周四
> > > 下午3:14写道:
> > >
> > > > Hi,all:
> > > > 当前使用版本flink 1.10.0,使用blink planner,假如有如下代码:
> > > >
> > > > tEnv.createTemporaryView("b", tEnv.sqlQuery("select * from a"));
> > > > tEnv.sqlUpdate("insert into c select * from b where b.con1 = '1'");
> > > > tEnv.sqlUpdate("insert into d select * from b where b.con1 = '2'");
> > > >
> > > > 其中a是kafka表,connector属性为:
> > > > 'connector.properties.group.id' = 'testGroup',
> > > > 'connector.startup-mode' = 'group-offsets'
> > > >
> > > >疑问是该应用运行时c、d消费a表,a表group 'testGroup'
> > > > offset的提交会不会受到影响,还是a表中同一条记录会流向2个不同的分支?
> > > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: pyflink 使用udf函数 处理成json字符串发送到kafka的格式问题

2020-06-01 Thread jack
非常感谢解惑,刚开始使用pyflink,不是很熟悉,还请多多指教
















在 2020-06-01 20:50:53,"Xingbo Huang"  写道:

Hi, 
其实这个是CSV 
connector的一个可选的quote_character参数的效果,默认值是",所以你每个字段都会这样外面包一层,你可以配置一下这个参数为\0,就可以得到你要的效果了。
st_env.connect(
Kafka()
.version("0.11")
.topic("logSink")
.start_from_earliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")) \
.with_format(  # declare a format for this system
Csv()
.schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())]))
.quote_character("\0")
) \
.with_schema(  # declare the schema of the table
Schema()
.field("log", DataTypes.STRING())) \
.in_append_mode() \
.register_table_sink("sink")



Best,
Xingbo




Re: pyflink 使用udf函数 处理成json字符串发送到kafka的格式问题

2020-06-01 Thread jack
非常感谢解惑,刚开始使用pyflink,不是很熟悉,还请多多指教
















在 2020-06-01 20:50:53,"Xingbo Huang"  写道:

Hi, 
其实这个是CSV 
connector的一个可选的quote_character参数的效果,默认值是",所以你每个字段都会这样外面包一层,你可以配置一下这个参数为\0,就可以得到你要的效果了。
st_env.connect(
Kafka()
.version("0.11")
.topic("logSink")
.start_from_earliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")) \
.with_format(  # declare a format for this system
Csv()
.schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())]))
.quote_character("\0")
) \
.with_schema(  # declare the schema of the table
Schema()
.field("log", DataTypes.STRING())) \
.in_append_mode() \
.register_table_sink("sink")



Best,
Xingbo




Re: pyflink 使用udf函数 处理成json字符串发送到kafka的格式问题

2020-06-01 Thread Xingbo Huang
Hi,
其实这个是CSV connector的一个可选的
quote_character参数的效果,默认值是",所以你每个字段都会这样外面包一层,你可以配置一下这个参数为\0,就可以得到你要的效果了。
st_env.connect(
Kafka()
.version("0.11")
.topic("logSink")
.start_from_earliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")) \
.with_format(  # declare a format for this system
Csv()
.schema(DataTypes.ROW([DataTypes.FIELD("log",
DataTypes.STRING())]))
.quote_character("\0")
) \
.with_schema(  # declare the schema of the table
Schema()
.field("log", DataTypes.STRING())) \
.in_append_mode() \
.register_table_sink("sink")

Best,
Xingbo

jack  于2020年6月1日周一 下午5:31写道:

> *请教各位,我这边使用pyflink 消费kafka  json字符串数据发送到kafka, 把kafka消息按照一个字段来处理,*
>
> *数据输入:*
> {"topic": "logSource", "message": "x=1,y=1,z=1"}
>
> 发送到kafka里面的数据结果如下:
> "{""topic"": ""logSource"", ""message"": ""x=1,y=1,z=1""}"
>
> *又被双引号包了一层, 我的udf函数中使用的是 json模块进行处理,请各位帮我看一下怎么样转成正常的json串。*
>
> @udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), 
> DataTypes.STRING()], result_type=DataTypes.STRING())
> def kv(log, pair_sep=',', kv_sep='='):
> import json
> log = json.loads(log)
> ret = {}
> items = re.split(pair_sep, log.get("message"))
> for item in items:
> ret[re.split(kv_sep, item)[0]] = re.split(kv_sep, item)[1]
> log.update(ret)
> log = json.dumps(log)
> return log
>
>
> def register_source(st_env):
> st_env \
> .connect( # declare the external system to connect to
> Kafka()
> .version("0.10")
> .topic("logSource")
> .start_from_latest()
> .property("zookeeper.connect", "localhost:2181")
> .property("bootstrap.servers", "localhost:9092")) \
> .with_format( # declare a format for this system
> Csv()
> .schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())]))
> .field_delimiter("\n")) \
> .with_schema( # declare the schema of the table
> Schema()
> .field("log", DataTypes.STRING())) \
> .in_append_mode() \
> .register_table_source("source")
>
> def register_sink(st_env):
> st_env.connect(
> Kafka()
> .version("0.10")
> .topic("logSink")
> .start_from_earliest()
> .property("zookeeper.connect", "localhost:2181")
> .property("bootstrap.servers", "localhost:9092")) \
> .with_format(  # declare a format for this system
> Csv()
> .schema(DataTypes.ROW([DataTypes.FIELD("log", 
> DataTypes.STRING())]))) \
> .with_schema(  # declare the schema of the table
> Schema()
> .field("log", DataTypes.STRING())) \
> .in_append_mode() \
> .register_table_sink("sink")
>
> if __name__ == '__main__':
>
> s_env = StreamExecutionEnvironment.get_execution_environment()
> s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> s_env.set_parallelism(1)
> st_env = StreamTableEnvironment \
> .create(s_env, environment_settings=EnvironmentSettings
> .new_instance()
> .in_streaming_mode()
> .use_blink_planner().build())
> st_env.register_function('e_kv', e_kv)
> register_source(st_env)
> register_sink(st_env)
> st_env \
> .from_path("source") \
> .select("kv(log,',', '=') as log") \
> .insert_into("sink") \
> st_env.execute("test")
>
>
>


Re: pyflink 使用udf函数 处理成json字符串发送到kafka的格式问题

2020-06-01 Thread Xingbo Huang
Hi,
其实这个是CSV connector的一个可选的
quote_character参数的效果,默认值是",所以你每个字段都会这样外面包一层,你可以配置一下这个参数为\0,就可以得到你要的效果了。
st_env.connect(
Kafka()
.version("0.11")
.topic("logSink")
.start_from_earliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")) \
.with_format(  # declare a format for this system
Csv()
.schema(DataTypes.ROW([DataTypes.FIELD("log",
DataTypes.STRING())]))
.quote_character("\0")
) \
.with_schema(  # declare the schema of the table
Schema()
.field("log", DataTypes.STRING())) \
.in_append_mode() \
.register_table_sink("sink")

Best,
Xingbo

jack  于2020年6月1日周一 下午5:31写道:

> *请教各位,我这边使用pyflink 消费kafka  json字符串数据发送到kafka, 把kafka消息按照一个字段来处理,*
>
> *数据输入:*
> {"topic": "logSource", "message": "x=1,y=1,z=1"}
>
> 发送到kafka里面的数据结果如下:
> "{""topic"": ""logSource"", ""message"": ""x=1,y=1,z=1""}"
>
> *又被双引号包了一层, 我的udf函数中使用的是 json模块进行处理,请各位帮我看一下怎么样转成正常的json串。*
>
> @udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), 
> DataTypes.STRING()], result_type=DataTypes.STRING())
> def kv(log, pair_sep=',', kv_sep='='):
> import json
> log = json.loads(log)
> ret = {}
> items = re.split(pair_sep, log.get("message"))
> for item in items:
> ret[re.split(kv_sep, item)[0]] = re.split(kv_sep, item)[1]
> log.update(ret)
> log = json.dumps(log)
> return log
>
>
> def register_source(st_env):
> st_env \
> .connect( # declare the external system to connect to
> Kafka()
> .version("0.10")
> .topic("logSource")
> .start_from_latest()
> .property("zookeeper.connect", "localhost:2181")
> .property("bootstrap.servers", "localhost:9092")) \
> .with_format( # declare a format for this system
> Csv()
> .schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())]))
> .field_delimiter("\n")) \
> .with_schema( # declare the schema of the table
> Schema()
> .field("log", DataTypes.STRING())) \
> .in_append_mode() \
> .register_table_source("source")
>
> def register_sink(st_env):
> st_env.connect(
> Kafka()
> .version("0.10")
> .topic("logSink")
> .start_from_earliest()
> .property("zookeeper.connect", "localhost:2181")
> .property("bootstrap.servers", "localhost:9092")) \
> .with_format(  # declare a format for this system
> Csv()
> .schema(DataTypes.ROW([DataTypes.FIELD("log", 
> DataTypes.STRING())]))) \
> .with_schema(  # declare the schema of the table
> Schema()
> .field("log", DataTypes.STRING())) \
> .in_append_mode() \
> .register_table_sink("sink")
>
> if __name__ == '__main__':
>
> s_env = StreamExecutionEnvironment.get_execution_environment()
> s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> s_env.set_parallelism(1)
> st_env = StreamTableEnvironment \
> .create(s_env, environment_settings=EnvironmentSettings
> .new_instance()
> .in_streaming_mode()
> .use_blink_planner().build())
> st_env.register_function('e_kv', e_kv)
> register_source(st_env)
> register_sink(st_env)
> st_env \
> .from_path("source") \
> .select("kv(log,',', '=') as log") \
> .insert_into("sink") \
> st_env.execute("test")
>
>
>


Re: Creating Kafka Topic dynamically in Flink

2020-06-01 Thread Leonard Xu
I think @brat is right, I didn’t know the Kafka property  
'auto.create.topics.enable’ , you can pass the property to Kafka Producer, that 
should work.
Best,
Leonard Xu

> 在 2020年6月1日,18:33,satya brat  写道:
> 
> Prasanna,
> You might want to check the kafka broker configs where 
> 'auto.create.topics.enable' helps with creating a new topic whenever a new 
> message with non existent topic is published.
> https://kafka.apache.org/documentation/#brokerconfigs 
> 
> 
> I am not too sure about pitfalls if any.
> 
> On Mon, Jun 1, 2020 at 3:20 PM Leonard Xu  > wrote:
> Hi, kumar
> 
> Sorry for missed the original question, I think we can not create topic 
> dynamically current, creating topic should belong to control flow rather a 
> data flow, and user may has some custom configurations of the topic from my 
> understanding. Maybe you need implement the logic of check/create/manage 
> topic in your custom SinkFunction so that the topic can create dynamically in 
> runtime. 
> 
> Best,
> Leonard Xu
> 
>> 在 2020年6月1日,17:02,Prasanna kumar > > 写道:
>> 
>> Leaonard, 
>> 
>> Thanks for the reply and would look into those options.  
>> But as for the original question, could we create a topic dynamically when 
>> required . 
>> 
>> Prasanna.
>> 
>> On Mon, Jun 1, 2020 at 2:18 PM Leonard Xu > > wrote:
>> Hi, kumar
>> 
>> Flink support consume/produce from/to multiple kafka topics[1], in your case 
>> you can implement KeyedSerializationSchema(legacy interface) or 
>> KafkaSerializationSchema[2] to make one producer instance support send data 
>> to multiple topics. There is an ITCase you can reference[3].
>> 
>> 
>> Best,
>> Leonard Xu
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-producer
>>  
>> 
>> [2]https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java
>>  
>> 
>>  
>> [3]https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java#L126
>>  
>> 
>>  
>> 
>>> 在 2020年6月1日,15:35,Prasanna kumar >> > 写道:
>>> 
>>> Hi,
>>> 
>>> I have Use Case where i read events from a Single kafka Stream comprising 
>>> of JSON messages.
>>> 
>>> Requirement is to split the stream into multiple output streams based on 
>>> some criteria say based on Type of Event or Based on Type and Customer 
>>> associated with the event. 
>>> 
>>> We could achieve the splitting of stream using Side outputs as i have seen 
>>> in the documentation.
>>> 
>>> Our business environment is such that there could be new event types 
>>> flowing in and would the Flink Kafka producer create the topics dynamically 
>>> based on the inflowing events. I did not see any documentation saying that 
>>> it could create.  
>>> 
>>> Or should it be always pre created by running a script separately. (Not a 
>>> good scalable practice in our case)
>>> 
>>> Thanks,
>>> Prasanna.
>> 
> 



Re: Creating Kafka Topic dynamically in Flink

2020-06-01 Thread satya brat
Prasanna,
You might want to check the kafka broker configs where
'auto.create.topics.enable' helps with creating a new topic whenever a new
message with non existent topic is published.
https://kafka.apache.org/documentation/#brokerconfigs

I am not too sure about pitfalls if any.

On Mon, Jun 1, 2020 at 3:20 PM Leonard Xu  wrote:

> Hi, kumar
>
> Sorry for missed the original question, I think we can not create topic
> dynamically current, creating topic should belong to control flow rather a
> data flow, and user may has some custom configurations of the topic from my
> understanding. Maybe you need implement the logic of check/create/manage
> topic in your custom SinkFunction so that the topic can create dynamically
> in runtime.
>
> Best,
> Leonard Xu
>
> 在 2020年6月1日,17:02,Prasanna kumar  写道:
>
> Leaonard,
>
> Thanks for the reply and would look into those options.
> But as for the original question, could we create a topic dynamically when
> required .
>
> Prasanna.
>
> On Mon, Jun 1, 2020 at 2:18 PM Leonard Xu  wrote:
>
>> Hi, kumar
>>
>> Flink support consume/produce from/to multiple kafka topics[1], in your
>> case you can implement KeyedSerializationSchema(legacy interface) or
>> KafkaSerializationSchema[2] to make one producer instance support send data
>> to multiple topics. There is an ITCase you can reference[3].
>>
>>
>> Best,
>> Leonard Xu
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-producer
>> [2]
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java
>>
>> [3]
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java#L126
>>
>>
>> 在 2020年6月1日,15:35,Prasanna kumar  写道:
>>
>> Hi,
>>
>> I have Use Case where i read events from a Single kafka Stream comprising
>> of JSON messages.
>>
>> Requirement is to split the stream into multiple output streams based on
>> some criteria say based on Type of Event or Based on Type and Customer
>> associated with the event.
>>
>> We could achieve the splitting of stream using Side outputs as i have
>> seen in the documentation.
>>
>> Our business environment is such that there could be new event types
>> flowing in and would the Flink Kafka producer create the topics dynamically
>> based on the inflowing events. I did not see any documentation saying
>> that it could create.
>>
>> Or should it be always pre created by running a script separately. (Not a
>> good scalable practice in our case)
>>
>> Thanks,
>> Prasanna.
>>
>>
>>
>


[ANNOUNCE] Weekly Community Update 2020/22

2020-06-01 Thread Konstantin Knauf
Dear community,

happy to share this week's community update with an update on the upcoming
Apache Flink releases: Flink 1.11 and Flink Stateful Functions 2.1. With
the community focused on release testing  the dev@ mailing list remains
relatively quiet.

Flink Development
==

* [releases] Release Testing for Flink 1.11 is progressing. To follow the
testing efforts check out the Flink 1.11 burndown board [1] in the Flink
Jira. Stephan proposes to backport FLIP-126 to Flink 1.11 (after the
feature freeze) as it is an isolated change and to avoid breaking the newly
added source interface again in the next release. [2]

* [releases] Apache Flink-shaded 11.0 was released. Flink 1.11 will depend
on it. [3]

* [releases] Gordon just published the first release candidate for Apache
Flink Stateful Functions 2.1. [4]

* [savepoints] I started a small discussion about documenting (breaking)
backwards compatibility of Apache Flink's savepoint format. [5]

[1]
https://issues.apache.org/jira/secure/RapidBoard.jspa?rapidView=364=FLINK

[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Backpoint-FLIP-126-watermarks-integration-with-FLIP-27-tp41897p41999.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-shaded-11-0-released-tp42056.html
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Apache-Flink-Stateful-Functions-2-1-0-release-candidate-1-tp42061.html
[5]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Document-Backwards-Compatibility-of-Savepoints-tp41903.html

Notable Bugs
==

* Flink 1.10.1 seemed to have reintroduced an issue with Kerberos-secured
MapR environments. [6]

[6] https://issues.apache.org/jira/browse/FLINK-18045

Events, Blog Posts, Misc
===

* Ververica has added a second training to Flink Forward Global (Oct 20)
shifting the two conference days back by day. The new dates are 19th/20th
Oct for training, and 21st/22nd Oct for conference/talks. [7]
Pre-registration is already opened. [8]

[7] https://twitter.com/FlinkForward/status/1265281578676166658
[8] https://www.flink-forward.org/global-2020

Cheers,

Konstantin

-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


Re: Creating Kafka Topic dynamically in Flink

2020-06-01 Thread Leonard Xu
Hi, kumar

Sorry for missed the original question, I think we can not create topic 
dynamically current, creating topic should belong to control flow rather a data 
flow, and user may has some custom configurations of the topic from my 
understanding. Maybe you need implement the logic of check/create/manage topic 
in your custom SinkFunction so that the topic can create dynamically in 
runtime. 

Best,
Leonard Xu

> 在 2020年6月1日,17:02,Prasanna kumar  写道:
> 
> Leaonard, 
> 
> Thanks for the reply and would look into those options.  
> But as for the original question, could we create a topic dynamically when 
> required . 
> 
> Prasanna.
> 
> On Mon, Jun 1, 2020 at 2:18 PM Leonard Xu  > wrote:
> Hi, kumar
> 
> Flink support consume/produce from/to multiple kafka topics[1], in your case 
> you can implement KeyedSerializationSchema(legacy interface) or 
> KafkaSerializationSchema[2] to make one producer instance support send data 
> to multiple topics. There is an ITCase you can reference[3].
> 
> 
> Best,
> Leonard Xu
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-producer
>  
> 
> [2]https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java
>  
> 
>  
> [3]https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java#L126
>  
> 
>  
> 
>> 在 2020年6月1日,15:35,Prasanna kumar > > 写道:
>> 
>> Hi,
>> 
>> I have Use Case where i read events from a Single kafka Stream comprising of 
>> JSON messages.
>> 
>> Requirement is to split the stream into multiple output streams based on 
>> some criteria say based on Type of Event or Based on Type and Customer 
>> associated with the event. 
>> 
>> We could achieve the splitting of stream using Side outputs as i have seen 
>> in the documentation.
>> 
>> Our business environment is such that there could be new event types flowing 
>> in and would the Flink Kafka producer create the topics dynamically based on 
>> the inflowing events. I did not see any documentation saying that it could 
>> create.  
>> 
>> Or should it be always pre created by running a script separately. (Not a 
>> good scalable practice in our case)
>> 
>> Thanks,
>> Prasanna.
> 



Re:用命令启动job

2020-06-01 Thread chaojianok
邮件里你上传的图片没有显示;
你的配置文件里没有设置 checkpoint 相关的配置,你可以参照 Flink 官网的这里设置一下,也可以在代码里手动设置。
















在 2020-06-01 16:46:11,"sun" <1392427...@qq.com> 写道:

用bin/flink run -n  -c com.toonyoo.bi.flink.Application 
jar/ty-bi-flink-1.0-SNAPSHOT.jar --topic  member启动topic
为什么checkpoint都是0啊,我的配置文件在下面。


# high-availability.zookeeper.client.acl: open


#==
# Fault tolerance and checkpointing
#==


# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# .
#
#state.backend: filesystem


# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
#state.checkpoints.dir: file:///opt/flink/flink-1.7.2/checkpoints


# Default target directory for savepoints, optional.
#
#state.savepoints.dir: file:///opt/flink/flink-1.7.2/savepoints


# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend). 
#
#state.backend.incremental: false






pyflink 使用udf函数 处理成json字符串发送到kafka的格式问题

2020-06-01 Thread jack
请教各位,我这边使用pyflink 消费kafka  json字符串数据发送到kafka, 把kafka消息按照一个字段来处理,

数据输入:
{"topic": "logSource", "message": "x=1,y=1,z=1"}


发送到kafka里面的数据结果如下:
"{""topic"": ""logSource"", ""message"": ""x=1,y=1,z=1""}"


又被双引号包了一层, 我的udf函数中使用的是 json模块进行处理,请各位帮我看一下怎么样转成正常的json串。


@udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()], 
result_type=DataTypes.STRING())
defkv(log, pair_sep=',', kv_sep='='):
import json

log = json.loads(log)
ret = {}

items = re.split(pair_sep, log.get("message"))

for item in items:

ret[re.split(kv_sep, item)[0]] = re.split(kv_sep, item)[1]

log.update(ret)
log = json.dumps(log)
return log


defregister_source(st_env):
st_env \
.connect( # declare the external system to connect to
Kafka()
.version("0.10")
.topic("logSource")
.start_from_latest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")) \
.with_format( # declare a format for this system
Csv()
.schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())]))
.field_delimiter("\n")) \
.with_schema( # declare the schema of the table
Schema()
.field("log", DataTypes.STRING())) \
.in_append_mode() \
.register_table_source("source")


defregister_sink(st_env):
st_env.connect(

Kafka()

.version("0.10")

.topic("logSink")

.start_from_earliest()

.property("zookeeper.connect", "localhost:2181")

.property("bootstrap.servers", "localhost:9092")) \

.with_format(  # declare a format for this system
Csv()

.schema(DataTypes.ROW([DataTypes.FIELD("log", 
DataTypes.STRING())]))) \

.with_schema(  # declare the schema of the table
Schema()

.field("log", DataTypes.STRING())) \

.in_append_mode() \

.register_table_sink("sink")


if __name__ == '__main__':



s_env = StreamExecutionEnvironment.get_execution_environment()

s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

s_env.set_parallelism(1)

st_env = StreamTableEnvironment \

.create(s_env, environment_settings=EnvironmentSettings

.new_instance()

.in_streaming_mode()

.use_blink_planner().build())

st_env.register_function('e_kv', e_kv)
register_source(st_env)

register_sink(st_env)

st_env \

.from_path("source") \

.select("kv(log,',', '=') as log") \
.insert_into("sink") \

st_env.execute("test")





pyflink 使用udf函数 处理成json字符串发送到kafka的格式问题

2020-06-01 Thread jack
请教各位,我这边使用pyflink 消费kafka  json字符串数据发送到kafka, 把kafka消息按照一个字段来处理,

数据输入:
{"topic": "logSource", "message": "x=1,y=1,z=1"}


发送到kafka里面的数据结果如下:
"{""topic"": ""logSource"", ""message"": ""x=1,y=1,z=1""}"


又被双引号包了一层, 我的udf函数中使用的是 json模块进行处理,请各位帮我看一下怎么样转成正常的json串。


@udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()], 
result_type=DataTypes.STRING())
defkv(log, pair_sep=',', kv_sep='='):
import json

log = json.loads(log)
ret = {}

items = re.split(pair_sep, log.get("message"))

for item in items:

ret[re.split(kv_sep, item)[0]] = re.split(kv_sep, item)[1]

log.update(ret)
log = json.dumps(log)
return log


defregister_source(st_env):
st_env \
.connect( # declare the external system to connect to
Kafka()
.version("0.10")
.topic("logSource")
.start_from_latest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")) \
.with_format( # declare a format for this system
Csv()
.schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())]))
.field_delimiter("\n")) \
.with_schema( # declare the schema of the table
Schema()
.field("log", DataTypes.STRING())) \
.in_append_mode() \
.register_table_source("source")


defregister_sink(st_env):
st_env.connect(

Kafka()

.version("0.10")

.topic("logSink")

.start_from_earliest()

.property("zookeeper.connect", "localhost:2181")

.property("bootstrap.servers", "localhost:9092")) \

.with_format(  # declare a format for this system
Csv()

.schema(DataTypes.ROW([DataTypes.FIELD("log", 
DataTypes.STRING())]))) \

.with_schema(  # declare the schema of the table
Schema()

.field("log", DataTypes.STRING())) \

.in_append_mode() \

.register_table_sink("sink")


if __name__ == '__main__':



s_env = StreamExecutionEnvironment.get_execution_environment()

s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

s_env.set_parallelism(1)

st_env = StreamTableEnvironment \

.create(s_env, environment_settings=EnvironmentSettings

.new_instance()

.in_streaming_mode()

.use_blink_planner().build())

st_env.register_function('e_kv', e_kv)
register_source(st_env)

register_sink(st_env)

st_env \

.from_path("source") \

.select("kv(log,',', '=') as log") \
.insert_into("sink") \

st_env.execute("test")





Re: pyflink Table Api 使用udf函数报错 Given parameters do not match any signature.

2020-06-01 Thread jack



您理解的是对的,我测试了下,好像pyflink的udf函数不太支持python的可变参数














在 2020-06-01 14:47:21,"Dian Fu"  写道:
>你传的第二个参数是string,这样试一下?
>select("drop_fields(message, array('x'))")
>
>不太确定我是否理解了你的问题(如果上面不行的话,建议你发一下exception)
>
>> 在 2020年6月1日,下午1:59,jack  写道:
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 是的,对应参数没有填写正确,感谢;
>> 另外请教,udf函数是不是目前不支持python的可变参数,我使用可变参数依然会报错参数不对的问题。
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-06-01 11:01:34,"Dian Fu"  写道:
>>> The input types should be as following:
>>> 
>>> input_types=[DataTypes.STRING(), DataTypes.ARRAY(DataTypes.STRING())]
>>> 
>>> Regards,
>>> Dian
>>> 
 在 2020年6月1日,上午10:49,刘亚坤  写道:
 
 目前在学习使用pyflink的Table api,请教一个pyflink udf函数报错问题:
 
 @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
 def drop_fields(message, *fields):
  import json
  message = json.loads(message)
  for field in fields:
message.pop(field)
  return  json.dumps(message)
 
 
 st_env \
  .form_path("source") \
  .select("drop_fields(message,'x')") \
  .insert_into("sink")
 
 message 格式:
 {“a”:"1","x","2"}
 
 报错参数类型不匹配:
 Actual:(java.lang.String, java.lang.String)
 Expected:(org.apache.flink.table.dataformat.BinaryString)
 
 新手入门,请多指教,感谢。
>>> 


Re: Creating Kafka Topic dynamically in Flink

2020-06-01 Thread Prasanna kumar
Leaonard,

Thanks for the reply and would look into those options.
But as for the original question, could we create a topic dynamically when
required .

Prasanna.

On Mon, Jun 1, 2020 at 2:18 PM Leonard Xu  wrote:

> Hi, kumar
>
> Flink support consume/produce from/to multiple kafka topics[1], in your
> case you can implement KeyedSerializationSchema(legacy interface) or
> KafkaSerializationSchema[2] to make one producer instance support send data
> to multiple topics. There is an ITCase you can reference[3].
>
>
> Best,
> Leonard Xu
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-producer
> [2]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java
>
> [3]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java#L126
>
>
> 在 2020年6月1日,15:35,Prasanna kumar  写道:
>
> Hi,
>
> I have Use Case where i read events from a Single kafka Stream comprising
> of JSON messages.
>
> Requirement is to split the stream into multiple output streams based on
> some criteria say based on Type of Event or Based on Type and Customer
> associated with the event.
>
> We could achieve the splitting of stream using Side outputs as i have seen
> in the documentation.
>
> Our business environment is such that there could be new event types
> flowing in and would the Flink Kafka producer create the topics dynamically
> based on the inflowing events. I did not see any documentation saying
> that it could create.
>
> Or should it be always pre created by running a script separately. (Not a
> good scalable practice in our case)
>
> Thanks,
> Prasanna.
>
>
>


Re: State expiration in Flink

2020-06-01 Thread Yun Tang
Hi Vasily

As far as I know, current TTL of state lack of such kind of trigger, and 
perhaps onTimer or process specific event to trigger could help your scenario.

Best
Yun Tang.

From: Vasily Melnik 
Sent: Monday, June 1, 2020 14:13
To: Yun Tang 
Cc: user 
Subject: Re: State expiration in Flink

Thanks, Yun!

One more question: is it possible to create some kind of handler on clearing up 
the state? For example i want to flush state to external storage (e.g. HBase) 
before cleanup.Now we make this manually with onTimer method, but is there 
another way?


On Mon, 1 Jun 2020 at 05:28, Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Vasily

After Flink-1.10, state will be cleaned up periodically as CleanupInBackground 
is enabled by default. Thus, even you never access some specific entry of state 
and that entry could still be cleaned up.

Best
Yun Tang

From: Vasily Melnik 
mailto:vasily.mel...@glowbyteconsulting.com>>
Sent: Saturday, May 30, 2020 23:29
To: user mailto:user@flink.apache.org>>
Subject: State expiration in Flink

Hi .
I'm a bit confused with this point in State TTL documentation:
" By default, expired values are explicitly removed on read, such as 
ValueState#value, and periodically garbage collected in the background if 
supported by the configured state backend. "
Does it mean, that  if i have only one event with specific key, it's state will 
never be cleaned on TTL expiration cause of i will never call value method for 
this key again?




Re: Creating Kafka Topic dynamically in Flink

2020-06-01 Thread Leonard Xu
Hi, kumar

Flink support consume/produce from/to multiple kafka topics[1], in your case 
you can implement KeyedSerializationSchema(legacy interface) or 
KafkaSerializationSchema[2] to make one producer instance support send data to 
multiple topics. There is an ITCase you can reference[3].


Best,
Leonard Xu

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-producer
 

[2]https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java
 

 
[3]https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java#L126
 

 

> 在 2020年6月1日,15:35,Prasanna kumar  写道:
> 
> Hi,
> 
> I have Use Case where i read events from a Single kafka Stream comprising of 
> JSON messages.
> 
> Requirement is to split the stream into multiple output streams based on some 
> criteria say based on Type of Event or Based on Type and Customer associated 
> with the event. 
> 
> We could achieve the splitting of stream using Side outputs as i have seen in 
> the documentation.
> 
> Our business environment is such that there could be new event types flowing 
> in and would the Flink Kafka producer create the topics dynamically based on 
> the inflowing events. I did not see any documentation saying that it could 
> create.  
> 
> Or should it be always pre created by running a script separately. (Not a 
> good scalable practice in our case)
> 
> Thanks,
> Prasanna.



Re: checkpoint失败讨论

2020-06-01 Thread Yun Tang
Hi

这个错误“could only be replicated to 0 nodes instead of minReplication 
(=1)”是HDFS不稳定导致的,无法将数据进行duplicate与Flink本身并无关系。

祝好
唐云


From: yanggang_it_job 
Sent: Monday, June 1, 2020 15:30
To: user-zh@flink.apache.org 
Subject: checkpoint失败讨论

最近多个以rocksdb作为状态后端,hdfs作为远程文件系统的任务,频繁报错,这个报错有以下特征
1、报错之前这些任务都平稳运行,突然在某一天报错
2、当发现此类错误的时候,多个任务也会因相同的报错而导致checkpoint失败


报错信息如下
org.apache.hadoop.ipc.RemoteException(java.io.IOException): File 
/user/stream/flink-checkpoints/19523bf083346eb80b409167e9b91b53/chk-43396/cef72b90-8492-4b09-8d1b-384b0ebe5768
 could only be replicated to 0 nodes instead of minReplication (=1). There are 
8 datanode(s) running and no node(s) are excluded in this operation.
at 
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1723)


辛苦大家看看
谢谢


??????????job

2020-06-01 Thread sun
??bin/flink run -n -c com.toonyoo.bi.flink.Application 
jar/ty-bi-flink-1.0-SNAPSHOT.jar --topic membertopic
??checkpoint0



# high-availability.zookeeper.client.acl: open


#==
# Fault tolerance and checkpointing
#==


# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# 

在yarn-session模式下怎么用rest api 触发savepoint并停止任务

2020-06-01 Thread wind.fly....@outlook.com
Hi,all:
本人当前用的flink版本1.10,通过yarn-session发布job,通过jobs/job/stop api 停止任务会报Unable to load 
requested file,问一下在yarn-session模式下没有这个api吗?

Best,
Junbao Zhang


Re: Flink s3 streaming performance

2020-06-01 Thread Jörn Franke
I think S3 is a wrong storage backend for this volumes of small messages. 
Try to use a NoSQL database or write multiple messages into one file in S3 
(1 or 10)

If you still want to go with your scenario then try a network optimized 
instance and use s3a in Flink and configure s3 entropy.

> Am 31.05.2020 um 15:30 schrieb venkata sateesh` kolluru 
> :
> 
> 
> Hi David,
> 
> The avg size of each file is around 30KB and I have checkpoint interval of 5 
> minutes. Some files are even 1 kb, because of checkpoint some files are 
> merged into 1 big file around 300MB.
> 
> With 120 million files and 4Tb, if the rate of transfer is 300 per minute, it 
> is taking weeks to write to s3.
> 
> I have tried to increase parallelism of sink but I dont see any improvement. 
> 
> The sink record is Tuple3, the actual content of file 
> is f2. This is content is written to /f0/f1/part*-* 
> 
> I guess the prefix determination in custombucketassigner wont be causing this 
> delay?
> 
> Could you please shed some light on writing custom s3 sink ?
> 
> Thanks
> 
> 
>> On Sun, May 31, 2020, 6:34 AM David Magalhães  wrote:
>> Hi Venkata. 
>> 
>> 300 requests per minute look like a 200ms per request, which should be a 
>> normal response time to send a file if there isn't any speed limitation (how 
>> big are the files?).
>> 
>> Have you changed the parallelization to be higher than 1? I also recommend 
>> to limit the source parallelization, because it can consume pretty fast from 
>> Kafka and create some kind of backpressure.
>> 
>> I don't any much experience with StreamingFileSink, because I've ended up 
>> using a custom S3Sink, but I did have some issues writing to S3 because the 
>> request wasn't parallelised. Check this thread, 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070
>> 
>>> On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru 
>>>  wrote:
>>> Hello,
>>> 
>>> I have posted the same in stackoverflow but didnt get any response. So 
>>> posting it here for help.
>>> 
>>> https://stackoverflow.com/questions/62068787/flink-s3-write-performance-optimization?noredirect=1#comment109814428_62068787
>>> 
>>> Details:
>>> 
>>> I am working on a flink application on kubernetes(eks) which consumes data 
>>> from kafka and write it to s3.
>>> 
>>> We have around 120 million xml messages of size 4TB in kafka. Consuming 
>>> from kafka is super fast.
>>> 
>>> These are just string messages from kafka. 
>>> 
>>> There is a high back pressure while writing to s3. We are not even hitting 
>>> the s3 PUT request limit which is arounf 3500 requests/sec. I am seeing 
>>> only 300 writes per minute to S3 which is very slow.
>>> 
>>> I am using StreamFileSink to write to s3 with Rolling policy as 
>>> OnCheckpointPolicy.
>>> 
>>> Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or s3p)
>>> 
>>> Other than this I dont have any config related to s3
>>> 
>>> StreamingFileSink> sink = 
>>> StreamingFileSink
>>> .forRowFormat(new Path(s3://BUCKET),
>>> (Tuple3 element, OutputStream 
>>> stream) -> {
>>> PrintStream out = new PrintStream(stream);
>>> out.println(element.f2);
>>> })
>>> // Determine component type for each record
>>> .withBucketAssigner(new CustomBucketAssigner())
>>> .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>> .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1)))
>>> .build(); 
>>> Is there anything that we can optimize on s3 from streamfilesink or in 
>>> flink-conf.xml ?
>>> 
>>> Like using bulkformat or any config params like fs.s3.maxThreads etc.
>>> 
>>> For checkpointing too I am using s3:// instead of s3p or s3a
>>> 
>>> env.setStateBackend((StateBackend) new 
>>> RocksDBStateBackend(s3://checkpoint_bucket, true));
>>> env.enableCheckpointing(30);
>>> 


Creating Kafka Topic dynamically in Flink

2020-06-01 Thread Prasanna kumar
Hi,

I have Use Case where i read events from a Single kafka Stream comprising
of JSON messages.

Requirement is to split the stream into multiple output streams based on
some criteria say based on Type of Event or Based on Type and Customer
associated with the event.

We could achieve the splitting of stream using Side outputs as i have seen
in the documentation.

Our business environment is such that there could be new event types
flowing in and would the Flink Kafka producer create the topics dynamically
based on the inflowing events. I did not see any documentation saying
that it could create.

Or should it be always pre created by running a script separately. (Not a
good scalable practice in our case)

Thanks,
Prasanna.


Re: sql中使用Create view 报错

2020-06-01 Thread Benchao Li
Hi Kyle,

这个功能已经在1.11中merge进来了。1.11最近就会发布,到时候可以尝试下1.11

Kyle Zhang  于2020年6月1日周一 下午3:25写道:

> Hi,
> 最近在使用tEnv.sqlUpdate("create view ….的时候报错,Unsupported query: create
> view。稍微看了一下,master上的SqlToOperationConverter.convert里是有“validated instanceof
> SqlCreateView”,这个判断的,1.10的分支上还没有,感觉这个功能应该挺常见的吧,Flink SQL CLI上也是支持create
> view的,还没合并是有什么考虑么。



-- 

Best,
Benchao Li


Re: best practice for handling corrupted records / exceptions in custom DefaultKryoSerializer?

2020-06-01 Thread Yu Yang
Thanks for the suggestion, Yun!

On Sun, May 31, 2020 at 11:15 PM Yun Gao  wrote:

> Hi Yu,
>
> I think when the serializer returns *null, *the following operator should
> still receive a record of null. A possible thought is that the following
> operator may couting the number of null records received and use a metric
> to publish the value to a monitor system, and the monitor system promethus,
> and the monitor system should be able to configure alert conditions.
>
> If *null* has problems, a special indicating object instance may be
> created like NULL_TBASE, and the operator should be able to count the
> number of NULL_TBASE received.
>
> Best,
>  Yun
>
>
> --Original Mail --
> *Sender:*Yu Yang 
> *Send Date:*Mon Jun 1 06:37:35 2020
> *Recipients:*user 
> *Subject:*best practice for handling corrupted records / exceptions in
> custom DefaultKryoSerializer?
>
>> Hi all,
>>
>> To deal with corrupted messages that can leak into the data source once
>> in a while, we implement a custom DefaultKryoSerializer class as below that
>> catches exceptions. The custom serializer returns null in read(...) method
>> when it encounters exception in reading. With this implementation, the
>> serializer may silently drop records.  One concern is that it may drop too
>> many records before we notice and take actions. What is the best practice
>> to handle this?
>>
>> The serializer processes one record at a time. Will reading a corrupted
>> record make the serialize fail to process the next valid record?
>>
>> public class CustomTBaseSerializer extends TBaseSerializer {
>>  private static final Logger LOG = LoggerFactory.getLogger
>> (CustomTBaseSerializer.class);
>>  @Override
>>  public void write(Kryo kryo, Output output, TBase tBase) {
>>  try {
>>  super.write(kryo, output, tBase);
>> } catch (Throwable t) {
>>  LOG.error("Failed to write due to unexpected Throwable", t);
>> }
>> }
>>
>>  @Override
>>  public TBase read(Kryo kryo, Input input, Class tBaseClass) {
>>  try {
>>  return super.read(kryo, input, tBaseClass);
>> } catch (Throwable t) {
>>  LOG.error("Failed to read from input due to unexpected
>> Throwable", t);
>>  return null;
>> }
>>  }
>>   }
>>
>> Thank you!
>>
>> Regards,
>> -Yu
>>
>


checkpoint失败讨论

2020-06-01 Thread yanggang_it_job
最近多个以rocksdb作为状态后端,hdfs作为远程文件系统的任务,频繁报错,这个报错有以下特征
1、报错之前这些任务都平稳运行,突然在某一天报错
2、当发现此类错误的时候,多个任务也会因相同的报错而导致checkpoint失败


报错信息如下
org.apache.hadoop.ipc.RemoteException(java.io.IOException): File 
/user/stream/flink-checkpoints/19523bf083346eb80b409167e9b91b53/chk-43396/cef72b90-8492-4b09-8d1b-384b0ebe5768
 could only be replicated to 0 nodes instead of minReplication (=1). There are 
8 datanode(s) running and no node(s) are excluded in this operation.
at 
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1723)


辛苦大家看看
谢谢

sql中使用Create view 报错

2020-06-01 Thread Kyle Zhang
Hi,
最近在使用tEnv.sqlUpdate("create view ….的时候报错,Unsupported query: create 
view。稍微看了一下,master上的SqlToOperationConverter.convert里是有“validated instanceof 
SqlCreateView”,这个判断的,1.10的分支上还没有,感觉这个功能应该挺常见的吧,Flink SQL CLI上也是支持create 
view的,还没合并是有什么考虑么。

?????? Kafka Consumer??????????????

2020-06-01 Thread Even
??pom??provided??jar??




----
??:"tison"https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html

Best,
tison.


tison https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#user-jars--classpath

 Best,
 tison.


 Even <452232...@qq.com ??2020??5??29?? 6:48??

 




 --nbsp;nbsp;--
 ??:nbsp;"zz zhang"

Re: How to create schema for flexible json data in Flink SQL

2020-06-01 Thread Guodong Wang
Hi Jark,

You totally got my point. Actually, the perfect solution in my opinion is
to support schema evolution in one query.
Although classic SQL needs to know the schema before do any computing, when
integrating the nosql data source to flink datastream, if schema evolution
is possible, it will save tons of time for user.
For example, when I have some json docs in mongodb, I want to expose the
collections as tables in flink SQL. But aligning the schema in flink
catalog service is not very friendly, I need to remember to update the
catalog when I add a new field in my database.

Although, it is not easy to validate SQL correctly if there is no schema
information about the table, for example "select sum(amount) from my_table
group by category", if the amount field is not number, runtime error will
be thrown.
I think this is another challenge about supporting schema evolution.
anyway, I think deferring the errors to runtime is fair when user wants to
have schema flexibility.


Guodong


On Mon, Jun 1, 2020 at 12:29 PM Jark Wu  wrote:

> Hi all,
>
> This is an interesting topic. Schema inference will be the next big
> feature planned in the next release.
> I added this thread link into FLINK-16420.
>
> I think the case of Guodong is schema evolution, which I think there is
> something to do with schema inference.
> I don't have a clear idea for this yet, but some initial thoughts are:
>
> 1) schema inference can happen for each query, instead of when creating
> table.
> So that, once data schema is evolved, the catalog table can have the
> new schema.
> However, this may break existing queries on this catalog table (e.g.
> SELECT * FROM T).
> 2) manually create a new table with schema inference, we can use LIKE
> grammer or SHOW CREATE TABLE to
> help creating a table based on existing ones. The new table have the
> new schema because we re-infer schema again.
> 3) auto-matically create a new tabel with schema inference. This can be
> done with some catalogs, for example, SchemaRegistryCatalog,
> once a new avro schema (say schema id = 100) is added to the registry,
> users can use this new schema with table "mytopic-100".
>
>
> Best,
> Jark
>
>
> On Fri, 29 May 2020 at 22:05, Guodong Wang  wrote:
>
>> Benchao,
>>
>> Thank you for your detailed explanation.
>>
>> Schema Inference can solve my problem partially. For example, starting
>> from some time, all the json afterward will contain a new field. I think
>> for this case, schema inference will help.
>> but if I need to handle all the json events with different schemas in one
>> table(this is the case 2),  I agree with you. Schema inference does not
>> help either.
>>
>>
>>
>> Guodong
>>
>>
>> On Fri, May 29, 2020 at 11:02 AM Benchao Li  wrote:
>>
>>> Hi Guodong,
>>>
>>> After an offline discussion with Leonard. I think you get the right
>>> meaning of schema inference.
>>> But there are two problems here:
>>> 1. schema of the data is fixed, schema inference can save your effort to
>>> write the schema explicitly.
>>> 2. schema of the data is dynamic, in this case the schema inference
>>> cannot help. Because SQL is somewhat static language, which should know all
>>> the data types at compile stage.
>>>
>>> Maybe I've misunderstood your question at the very beginning. I thought
>>> your case is #2. If your case is #1, then schema inference is a good
>>> choice.
>>>
>>> Guodong Wang  于2020年5月28日周四 下午11:39写道:
>>>
 Yes. Setting the value type as raw is one possible approach. And I
 would like to vote for schema inference as well.

 Correct me if I am wrong, IMO schema inference means I can provide a
 method in the table source to infer the data schema base on the runtime
 computation. Just like some calcite adaptor does. Right?
 For SQL table registration, I think that requiring the table source to
 provide a static schema might be too strict. Let planner to infer the table
 schema will be more flexible.

 Thank you for your suggestions.

 Guodong


 On Thu, May 28, 2020 at 11:11 PM Benchao Li 
 wrote:

> Hi Guodong,
>
> Does the RAW type meet your requirements? For example, you can specify
> map type, and the value for the map is the raw JsonNode
> parsed from Jackson.
> This is not supported yet, however IMO this could be supported.
>
> Guodong Wang  于2020年5月28日周四 下午9:43写道:
>
>> Benchao,
>>
>> Thank you for your quick reply.
>>
>> As you mentioned, for current scenario, approach 2 should work for
>> me. But it is a little bit annoying that I have to modify schema to add 
>> new
>> field types when upstream app changes the json format or adds new fields.
>> Otherwise, my user can not refer the field in their SQL.
>>
>> Per description in the jira, I think after implementing this, all the
>> json values will be converted as strings.
>> I am wondering if Flink SQL can/will support the 

FileInputFormat 使用问题

2020-06-01 Thread 阿华田
使用FileInputFormat 
递归读取hdfs文件,并添加过滤器。程序执行没有报错但是很快就执行完成也没有读取到数据,本地测试可以过滤并读取到数据,yarn集群上执行出现上述情况。
代码:
//初始化任务参数
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
FileInputFormat fileInputFormat = new TextInputFormat(new 
Path("hdfs://arc/success_fid_flow"));
fileInputFormat.setNestedFileEnumeration(true);
//过滤掉条件为true
fileInputFormat.setFilesFilter(new 
RegexExcludePathAndTimeFilter("2020-05-24","2020-05-24"));
DataSet source =env.createInput(fileInputFormat);
source.output(new HdfsTrainSinktest());
打印的日志:


2020-06-01 14:43:41,848 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.rpc.address, localhost
2020-06-01 14:43:41,848 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.rpc.port, 6123
2020-06-01 14:43:41,848 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.heap.size, 1024m
2020-06-01 14:43:41,848 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: taskmanager.heap.size, 1024m
2020-06-01 14:43:41,849 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: taskmanager.numberOfTaskSlots, 1
2020-06-01 14:43:41,849 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: parallelism.default, 1
2020-06-01 14:43:41,849 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.execution.failover-strategy, region
2020-06-01 14:43:41,849 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: historyserver.web.address, 0.0.0.0
2020-06-01 14:43:41,849 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: historyserver.web.port, 8082
2020-06-01 14:43:41,890 INFO  org.apache.flink.api.java.ExecutionEnvironment
- The job has 0 registered types and 0 default Kryo serializers
2020-06-01 14:43:41,993 INFO  org.apache.flink.optimizer.Optimizer  
- Compiler could not determine the size of input 'TextInputFormat 
([hdfs://arc/success_fid_flow]) - UTF-8'. Using default estimates.
2020-06-01 14:43:42,022 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.rpc.address, localhost
2020-06-01 14:43:42,022 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.rpc.port, 6123
2020-06-01 14:43:42,022 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.heap.size, 1024m
2020-06-01 14:43:42,022 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: taskmanager.heap.size, 1024m
2020-06-01 14:43:42,022 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: taskmanager.numberOfTaskSlots, 1
2020-06-01 14:43:42,022 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: parallelism.default, 1
2020-06-01 14:43:42,023 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.execution.failover-strategy, region
2020-06-01 14:43:42,023 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.archive.fs.dir, 
hdfs://dap/tmp/completed-jobs/
2020-06-01 14:43:42,023 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: historyserver.web.address, 0.0.0.0
2020-06-01 14:43:42,023 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: historyserver.web.port, 8082
2020-06-01 14:43:42,023 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: historyserver.archive.fs.dir, 
hdfs://dap/tmp/completed-jobs/
2020-06-01 14:43:42,023 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: historyserver.archive.fs.refresh-interval, 1
2020-06-01 14:43:42,069 INFO  
org.apache.flink.client.program.rest.RestClusterClient- Submitting job 
410508f08b0775c0529e84b221dd909d (detached: false).
2020-06-01 14:43:52,134 INFO  org.apache.flink.api.java.ExecutionEnvironment
- The job has 0 registered types and 0 default Kryo serializers
2020-06-01 14:43:52,167 INFO  org.apache.flink.optimizer.Optimizer  
-Compiler could not determine the size of input 'TextInputFormat 
([hdfs://arc/success_fid_flow]) - UTF-8'. Using default estimates.
2020-06-01 14:43:52,171 INFO  
org.apache.flink.client.program.rest.RestClusterClient- 

Re: pyflink Table Api 使用udf函数报错 Given parameters do not match any signature.

2020-06-01 Thread Dian Fu
你传的第二个参数是string,这样试一下?
select("drop_fields(message, array('x'))")

不太确定我是否理解了你的问题(如果上面不行的话,建议你发一下exception)

> 在 2020年6月1日,下午1:59,jack  写道:
> 
> 
> 
> 
> 
> 
> 
> 是的,对应参数没有填写正确,感谢;
> 另外请教,udf函数是不是目前不支持python的可变参数,我使用可变参数依然会报错参数不对的问题。
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-06-01 11:01:34,"Dian Fu"  写道:
>> The input types should be as following:
>> 
>> input_types=[DataTypes.STRING(), DataTypes.ARRAY(DataTypes.STRING())]
>> 
>> Regards,
>> Dian
>> 
>>> 在 2020年6月1日,上午10:49,刘亚坤  写道:
>>> 
>>> 目前在学习使用pyflink的Table api,请教一个pyflink udf函数报错问题:
>>> 
>>> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
>>> def drop_fields(message, *fields):
>>>  import json
>>>  message = json.loads(message)
>>>  for field in fields:
>>>message.pop(field)
>>>  return  json.dumps(message)
>>> 
>>> 
>>> st_env \
>>>  .form_path("source") \
>>>  .select("drop_fields(message,'x')") \
>>>  .insert_into("sink")
>>> 
>>> message 格式:
>>> {“a”:"1","x","2"}
>>> 
>>> 报错参数类型不匹配:
>>> Actual:(java.lang.String, java.lang.String)
>>> Expected:(org.apache.flink.table.dataformat.BinaryString)
>>> 
>>> 新手入门,请多指教,感谢。
>> 



??????????jenkins????flink

2020-06-01 Thread sun
??jenkinsflink

??????????jenkens????flink

2020-06-01 Thread sun
??jenkensflink

Re: best practice for handling corrupted records / exceptions in custom DefaultKryoSerializer?

2020-06-01 Thread Yun Gao
Hi Yu,

I think when the serializer returns null, the following operator should still 
receive a record of null. A possible thought is that the following operator may 
couting the number of null records received and use a metric to publish the 
value to a monitor system, and the monitor system promethus, and the monitor 
system should be able to configure alert conditions.

If null has problems, a special indicating object instance may be created like 
NULL_TBASE, and the operator should be able to count the number of NULL_TBASE 
received.

Best,
 Yun



 --Original Mail --
Sender:Yu Yang 
Send Date:Mon Jun 1 06:37:35 2020
Recipients:user 
Subject:best practice for handling corrupted records / exceptions in custom 
DefaultKryoSerializer?

Hi all, 

To deal with corrupted messages that can leak into the data source once in a 
while, we implement a custom DefaultKryoSerializer class as below that catches 
exceptions. The custom serializer returns null in read(...) method when it 
encounters exception in reading. With this implementation, the serializer may 
silently drop records.  One concern is that it may drop too many records before 
we notice and take actions. What is the best practice to handle this?  

The serializer processes one record at a time. Will reading a corrupted record 
make the serialize fail to process the next valid record?

public class CustomTBaseSerializer extends TBaseSerializer {
 private static final Logger LOG = 
LoggerFactory.getLogger(CustomTBaseSerializer.class);
 @Override
 public void write(Kryo kryo, Output output, TBase tBase) {
 try {
 super.write(kryo, output, tBase);
} catch (Throwable t) {
 LOG.error("Failed to write due to unexpected Throwable", t);
}
}

 @Override
 public TBase read(Kryo kryo, Input input, Class tBaseClass) {
 try {
 return super.read(kryo, input, tBaseClass);
} catch (Throwable t) {
 LOG.error("Failed to read from input due to unexpected Throwable", 
t);
 return null;
}
 }
  }

Thank you!

Regards, 
-Yu

Re: State expiration in Flink

2020-06-01 Thread Vasily Melnik
Thanks, Yun!

One more question: is it possible to create some kind of handler on
clearing up the state? For example i want to flush state to external
storage (e.g. HBase) before cleanup.Now we make this manually with onTimer
method, but is there another way?


On Mon, 1 Jun 2020 at 05:28, Yun Tang  wrote:

> Hi Vasily
>
> After Flink-1.10, state will be cleaned up periodically as
> CleanupInBackground is enabled by default. Thus, even you never access
> some specific entry of state and that entry could still be cleaned up.
>
> Best
> Yun Tang
> --
> *From:* Vasily Melnik 
> *Sent:* Saturday, May 30, 2020 23:29
> *To:* user 
> *Subject:* State expiration in Flink
>
> Hi .
> I'm a bit confused with this point in State TTL documentation:
> " By default, expired values are explicitly removed on read, such as
> ValueState#value, and periodically garbage collected in the background if
> supported by the configured state backend. "
> Does it mean, that  if i have only one event with specific key, it's state
> will never be cleaned on TTL expiration cause of i will never call value
> method for this key again?
>
>
>


Re: Tumbling windows - increasing checkpoint size over time

2020-06-01 Thread Guowei Ma
Hi,
1. I am not the expert of Rocksdb. However, I think the state garbage
collection depends on the rocksdb compaction especially if the checkpoint
interval is 2s.  This is because the window element is still in the sst
file even if the window is triggerred.
2. Do you try the checkpoint interval 15s?  I guess it might reduce the
state size.
3. Would you like to share your rocksdb configuration? I think this could
help other state guys to know whether it is related to rocksdb or not.
Best,
Guowei


Wissman, Matt  于2020年5月29日周五 下午10:30写道:

> Till,
>
>
>
> I’ll have to calculate the theoretical upper bound for our window state.
> Our data distribution and rate has a predictable pattern but the data rate
> pattern didn’t match the checkpoint size growth.
>
>
> [image: image.png]
>
>
>
> Here is a screenshot of the checkpoint size for the pipeline. The yellow
> section is when we had the checkpoint interval at 2 secs – the size seems
> to grow linearly and indefinitely. The blue, red and orange lines are in
> line with what I’d expect in terms of checkpoint size (100KB-2 MB).
>
>
>
> The incoming stream data for the whole time period is consistent (follows
> the same pattern).
>
>
>
> Changing the checkpoint interval seemed to fix the problem of the large
> and growing checkpoint size but I’m not sure why.
>
>
>
> Thanks!
>
>
>
> -Matt
>
>
>
> *From: *Till Rohrmann 
> *Date: *Thursday, May 28, 2020 at 10:48 AM
> *To: *"Wissman, Matt" 
> *Cc: *Guowei Ma , "user@flink.apache.org" <
> user@flink.apache.org>
> *Subject: *Re: Tumbling windows - increasing checkpoint size over time
>
>
>
> Hi Matt,
>
>
>
> when using tumbling windows, then the checkpoint size is not only
> dependent on the number of keys (which is equivalent to the number of open
> windows) but also on how many events arrive for each open window because
> the windows store every window event in its state. Hence, it can be the
> case that you see different checkpoint sizes depending on the actual data
> distribution which can change over time. Have you checked whether the data
> distribution and rate is constant over time?
>
>
>
> What is the expected number of keys, size of events and number of events
> per key per second? Based on this information one could try to estimate an
> upper state size bound.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Wed, May 27, 2020 at 8:19 PM Wissman, Matt 
> wrote:
>
> Hello Till & Guowei,
>
>
>
> Thanks for the replies! Here is a snippet of the window function:
>
>
>
>   SingleOutputStreamOperator aggregatedStream = dataStream
>
> .keyBy(idKeySelector())
>
> .window(TumblingProcessingTimeWindows.of(seconds(15)))
>
> .apply(new Aggregator())
>
> .name("Aggregator")
>
> .setParallelism(3);
>
>
>
> Checkpoint interval: 2 secs when the checkpoint size grew from 100KB to
> 100MB (we’ve since changed the 5 minutes, which has slowed the checkpoint
> size growth)
>
> Lateness allowed: 0
>
> Watermarks: nothing is set in terms of watermarks – do they apply for
> Process Time?
>
> The set of keys processed in the stream is stable over time
>
>
>
> The checkpoint size actually looks pretty stable now that the interval was
> increased. Is it possible that the short checkpoint interval prevented
> compaction?
>
>
>
> Thanks!
>
>
>
> -Matt
>
>
>
>
>
> *From: *Till Rohrmann 
> *Date: *Wednesday, May 27, 2020 at 9:00 AM
> *To: *Guowei Ma 
> *Cc: *"Wissman, Matt" , "user@flink.apache.org" <
> user@flink.apache.org>
> *Subject: *Re: Tumbling windows - increasing checkpoint size over time
>
>
>
> *LEARN FAST: This email originated outside of HERE.*
> Please do not click on links or open attachments unless you recognize the
> sender and know the content is safe. Thank you.
>
>
>
> Hi Matt,
>
>
>
> could you give us a bit more information about the windows you are using?
> They are tumbling windows. What's the size of the windows? Do you allow
> lateness of events? What's your checkpoint interval?
>
>
>
> Are you using event time? If yes, how is the watermark generated?
>
>
>
> You said that the number of events per window is more or less constant.
> Does this is also apply to the size of the individual events?
>
>
>
> Cheers,
>
> Till
>
>
>
> On Wed, May 27, 2020 at 1:21 AM Guowei Ma  wrote:
>
> Hi, Matt
> The total size of the state of the window operator is related to the
> number of windows. For example if you use keyby+tumblingwindow there
> would be keys number of windows.
> Hope this helps.
> Best,
> Guowei
>
> Wissman, Matt  于2020年5月27日周三 上午3:35写道:
> >
> > Hello Flink Community,
> >
> >
> >
> > I’m running a Flink pipeline that uses a tumbling window and incremental
> checkpoint with RocksDB backed by s3. The number of objects in the window
> is stable but overtime the checkpoint size grows seemingly unbounded.
> Within the first few hours after bringing the Flink pipeline up, the
> checkpoint size is around 100K but after a week of operation it grows to
> 

Rest Api body size

2020-06-01 Thread snack white
Hi,
When I using rest api submit job , my post body size is 25.83kb (my jar 
has being uploaded to job manager before ) , my body was cut off in flink job 
manager, can someone tell me how to modify the post body length limit .
Thanks,
White