Re: Flink sql 实现全局row_number()分组排序

2021-03-17 文章 Kurt Young
直接 SQL Top-N 即可:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#top-n

Best,
Kurt


On Tue, Mar 16, 2021 at 3:40 PM Tian Hengyu  wrote:

> 咋么有人啊~~~
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: [DISCUSS] Deprecation and removal of the legacy SQL planner

2021-02-25 文章 Kurt Young


Hi Timo,

First of all I want to thank you for introducing this planner design back
in 1.9, this is a great work
that allows lots of blink features to be merged to Flink in a reasonably
short time. It greatly
accelerates the evolution speed of Table & SQL.

Everything comes with a cost, as you said, right now we are facing the
overhead of maintaining
two planners and it causes bugs and also increases imbalance between these
two planners. As
a developer and also for the good of all Table & SQL users, I also think
it's better for us to be more
focused on a single planner.

Your proposed roadmap looks good to me, +1 from my side and thanks
again for all your efforts!

Best,
Kurt


On Thu, Feb 25, 2021 at 5:01 PM Timo Walther  wrote:

> Hi everyone,
>
> since Flink 1.9 we have supported two SQL planners. Most of the original
> plan of FLIP-32 [1] has been implemented. The Blink code merge has been
> completed and many additional features have been added exclusively to
> the new planner. The new planner is now in a much better shape than the
> legacy one.
>
> In order to avoid user confusion, reduce duplicate code, and improve
> maintainability and testing times of the Flink project as a whole we
> would like to propose the following steps to complete FLIP-32:
>
> In Flink 1.13:
> - Deprecate the `flink-table-planner` module
> - Deprecate `BatchTableEnvironment` for both Java, Scala, and Python
>
> In Flink 1.14:
> - Drop `flink-table-planner` early
> - Drop many deprecated interfaces and API on demand
> - Rename `flink-table-planner-blink` to `flink-table-planner`
> - Rename `flink-table-runtime-blink` to `flink-table-runtime`
> - Remove references of "Blink" in the code base
>
> This will have an impact on users that still use DataSet API together
> with Table API. With this change we will not support converting between
> DataSet API and Table API anymore. We hope to compensate the missing
> functionality in the new unified TableEnvironment and/or the batch mode
> in DataStream API during 1.14 and 1.15. For this, we are looking for
> further feedback which features are required in Table API/DataStream API
> to have a smooth migration path.
>
> Looking forward to your feedback.
>
> Regards,
> Timo
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions
>


Re: [DISCUSS] Correct time-related function behavior in Flink SQL

2021-01-20 文章 Kurt Young
cc this to user & user-zh mailing list because this will affect lots of
users, and also quite a lot of users
were asking questions around this topic.

Let me try to understand this from user's perspective.

Your proposal will affect five functions, which are:

   - PROCTIME()
   - NOW()
   - CURRENT_DATE
   - CURRENT_TIME
   - CURRENT_TIMESTAMP

Before the changes, as I am writing this reply, the local time here is
*2021-01-21
12:03:35 (Beijing time, UTC+8)*.
And I tried these 5 functions in sql client, and got:

*Flink SQL> select now(), PROCTIME(), CURRENT_TIMESTAMP, CURRENT_DATE,
CURRENT_TIME;*

*+-+-+-+--+--+*

*|  EXPR$0 |  EXPR$1 |
CURRENT_TIMESTAMP | CURRENT_DATE | CURRENT_TIME |*

*+-+-+-+--+--+*

*| 2021-01-21T04:03:35.228 | 2021-01-21T04:03:35.228 |
2021-01-21T04:03:35.228 |   2021-01-21 | 04:03:35.228 |*

*+-+-+-+--+--+*
After the changes, the expected behavior will change to:

*Flink SQL> select now(), PROCTIME(), CURRENT_TIMESTAMP, CURRENT_DATE,
CURRENT_TIME;*

*+-+-+-+--+--+*

*|  EXPR$0 |  EXPR$1 |
CURRENT_TIMESTAMP | CURRENT_DATE | CURRENT_TIME |*

*+-+-+-+--+--+*

*| 2021-01-21T12:03:35.228 | 2021-01-21T12:03:35.228 |
2021-01-21T12:03:35.228 |   2021-01-21 | 12:03:35.228 |*

*+-+-+-+--+--+*
The return type of now(), proctime() and CURRENT_TIMESTAMP still be
TIMESTAMP;

Best,
Kurt


On Tue, Jan 19, 2021 at 6:42 PM Leonard Xu  wrote:

> I found above example format may mess up in different mail client, I post
> a picture here[1].
>
> Best,
> Leonard
>
> [1]
> https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/pictures/CURRRENT_TIMESTAMP.png
> <
> https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/pictures/CURRRENT_TIMESTAMP.png>
>
>
> > 在 2021年1月19日,16:22,Leonard Xu  写道:
> >
> > Hi, all
> >
> > I want to start the discussion about correcting time-related function
> behavior in Flink SQL, this is a tricky topic but I think it’s time to
> address it.
> >
> > Currently some temporal function behaviors are wired to users.
> > 1.  When users use a PROCTIME() in SQL, the value of PROCTIME() has a
> timezone offset with the wall-clock time in users' local time zone, users
> need to add their local time zone offset manually to get expected local
> timestamp(e.g: Users in Germany need to +1h to get expected local
> timestamp).
> >
> > 2. Users can not use CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP  to get
> wall-clock timestamp in local time zone, and thus they need write UDF in
> their SQL just for implementing a simple filter like WHERE date_col =
> CURRENT_DATE.
> >
> > 3. Another common case  is the time window  with day interval based on
> PROCTIME(), user plan to put all data from one day into the same window,
> but the window is assigned using timestamp in UTC+0 timezone rather than
> the session timezone which leads to the window starts with an offset(e.g:
> Users in China need to add -8h in their business sql start and then +8h
> when output the result, the conversion like a magic for users).
> >
> > These problems come from that lots of time-related functions like
> PROCTIME(), NOW(), CURRENT_DATE, CURRENT_TIME and CURRENT_TIMESTAMP are
> returning time values based on UTC+0 time zone.
> >
> > This topic will lead to a comparison of the three types, i.e.
> TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE, TIMESTAMP WITH LOCAL TIME ZONE and
> TIMESTAMP WITH TIME ZONE. In order to better understand the three types, I
> wrote a document[1] to help understand them better. You can also know the
> tree timestamp types behavior in Hadoop ecosystem from the reference link
> int the doc.
> >
> >
> > I Invested all Flink time-related functions current behavior and
> compared with other DB vendors like Pg,Presto, Hive, Spark, Snowflake,  I
> made an excel [2] to organize them well, we can use it for the next
> discussion. Please let me know if I missed something.
> > From my investigation, I think we need to correct the behavior of
> function NOW()/PROCTIME()/CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP, to
> correct them, we can change the function return type or function return
> value or change return type and return value both. All of those way are
> valid because SQL:2011 does not specify the function return type and every
> SQL engine vendor has its own implementation. For example the
> CURRENT_TIMESTAMP 

【公告】Flink Forward 2020 亚洲峰会议题提交时间延长

2020-10-09 文章 Kurt Young
大家好,

希望大家都过了一个美好充实的国庆。由于长假的影响,我们也决定将 Flink Forward 2020 亚洲峰会的议题提交截止时间延长到
*2020年10月22日*,提交链接:https://sourl.cn/ZEXM2Y

期待您的投递和参会!如果您有任何问题欢迎与我联系。

谢谢,
Kurt


【公告】Flink Forward 2020 亚洲峰会议题征集

2020-09-27 文章 Kurt Young
大家好,

自 2018 年 Flink Forward 大会首次引入亚洲以后,Flink 社区已成功举办了两届盛况空前的大会。不论是在参会公司、参会人数,还是议题
的深度和丰富度,无一不体现了这是目前国内最具规模和影响力的数据处理领域大会之一。

结合 2020 年的特殊情况,Flink Forward 亚洲峰会将转为全免费的线上模式。与以往相比,今年大会的主要特色在于:
1. *在线直播互动,听众反馈更及时*:大会将在线收集听众反馈,实时了解听众疑惑和问题并进行快速互动形成良性沟通闭环。
2. *组合传播,影响范围更广泛*:除主题分享的直播之外,大会还会将内容整理为视频、文字、电子书、专题等多种形式组合传播,扩大您的内容的影响力。
3. *内容主题划分更丰富*:在保留过往的热门主题之外,今年新增机器学习,云原生等热门领域,技术覆盖面更广。

目前大会已定档 *2020年12月26日* 正式举办,现已开放议题投递通道,您可以在 *2020年10月12日前* 通过以下链接提交您的议题:
https://sourl.cn/ZEXM2Y

期待您的投递和参会!如果您有任何问题欢迎与我联系。

谢谢,
Kurt


Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 文章 Kurt Young
Congratulations Dian!

Best,
Kurt


On Thu, Aug 27, 2020 at 7:28 PM Rui Li  wrote:

> Congratulations Dian!
>
> On Thu, Aug 27, 2020 at 5:39 PM Yuan Mei  wrote:
>
>> Congrats!
>>
>> On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang  wrote:
>>
>>> Congratulations Dian!
>>>
>>> Best,
>>> Xingbo
>>>
>>> jincheng sun  于2020年8月27日周四 下午5:24写道:
>>>
 Hi all,

 On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now
 part of the Apache Flink Project Management Committee (PMC).

 Dian Fu has been very active on PyFlink component, working on various
 important features, such as the Python UDF and Pandas integration, and
 keeps checking and voting for our releases, and also has successfully
 produced two releases(1.9.3&1.11.1) as RM, currently working as RM to push
 forward the release of Flink 1.12.

 Please join me in congratulating Dian Fu for becoming a Flink PMC
 Member!

 Best,
 Jincheng(on behalf of the Flink PMC)

>>>
>
> --
> Best regards!
> Rui Li
>


Re: flink-1.11 集成hive-1.2.1 DDL问题

2020-07-19 文章 Kurt Young
1.11 把默认planner换成blink了,需要添加下blink planner的依赖

Best,
Kurt


On Mon, Jul 20, 2020 at 11:39 AM Rui Li  wrote:

> stacktrace上看起来是创建blink planner的时候出错的。检查下依赖的blink planner版本是不是正确?
>
> On Fri, Jul 17, 2020 at 7:29 PM kcz <573693...@qq.com> wrote:
>
> > idea 本地测试
> > 跟hive有关pom依赖
> > hive-exec flink-connector-hive_2.11
> > 代码如下:
> >  StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> > env.setParallelism(1);
> > env.enableCheckpointing(60*1000, CheckpointingMode.EXACTLY_ONCE);
> > // 同一时间只允许进行一个检查点
> > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> >
> > env.setStateBackend(new FsStateBackend(path));
> >
> > StreamTableEnvironment tableEnv =
> > StreamTableEnvironment.create(env);
> >
> > String name= "myhive";
> > String defaultDatabase = "situation";
> > String hiveConfDir = "/load/data/hive/hive-conf"; // a local
> > path
> > String version = "1.2.1";
> >
> > HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > hiveConfDir, version);
> > tableEnv.registerCatalog("myhive", hive);
> >
> > // set the HiveCatalog as the current catalog of the session
> > tableEnv.useCatalog("myhive");
> > tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS stream_tmp");
> > tableEnv.executeSql("DROP TABLE IF EXISTS
> > stream_tmp.source_table");
> >
> >
> > 报错如下:
> > 
> > Exception in thread "main" java.lang.IncompatibleClassChangeError:
> > Implementing class
> > at java.lang.ClassLoader.defineClass1(Native Method)
> > at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
> > at
> > java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> > at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
> > at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
> > at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
> > at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> > at
> >
> org.apache.flink.table.planner.delegation.PlannerBase. > at
> >
> org.apache.flink.table.planner.delegation.StreamPlanner. > at
> >
> org.apache.flink.table.planner.delegation.BlinkPlannerFactory.create(BlinkPlannerFactory.java:50)
> > at
> >
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:130)
> > at
> >
> org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:111)
> > at com.hive.HiveTest.main(HiveTest.java:33)
>
>
>
> --
> Best regards!
> Rui Li
>


Re: Flink SQL ddl 中含有关键字 且在ddl中使用TO_TIMESTAMP、TO_DATE函数语法检查异常问题

2020-06-16 文章 Kurt Young
应该是这个: https://issues.apache.org/jira/browse/FLINK-16068

Best,
Kurt


On Tue, Jun 16, 2020 at 5:09 PM zilong xiao  wrote:

> 我看了下1.10.1的release note,您说的应该就是这个issue:
> https://issues.apache.org/jira/browse/FLINK-16345
> ,但是这个issue所描述的问题貌似和我的不太一样,我的这个问题是在使用TO_TIMESTAMP、TO_
> DATE函数且,ddl中含有关键字字段时,语法检测会报错,不知道这个问题是否跟这个issue有关呢?
>
> Benchao Li  于2020年6月16日周二 下午5:00写道:
>
> > 你用的是1.10.0版本么?这个版本的计算列处理有点问题,建议用1.10.1尝试一下。
> >
> > zilong xiao  于2020年6月16日周二 下午4:56写道:
> >
> >> 如题,在SQL
> >>
> ddl中如含有关键字字段(已用反引号括上),并且在ddl中使用了TO_TIMESTAMP、TO_DATE函数,在进行语法检查的时候会报关键字冲突异常,如果将图中的```itime
> >> as TO_TIMESTAMP(time2)```去掉,语法检查正常通过,还麻烦各位大佬帮忙看看~
> >> 代码如下图:
> >> [image: image.png]
> >> 异常堆栈:
> >>
> >
>


Re: flink1.11 小疑问(提升 DDL 易用性(动态 Table 属性))

2020-06-15 文章 Kurt Young
table hint的语法是紧跟在你query中访问某张表的时候,所以我理解并不会有 ”这个动态参数作用在哪张表“ 上的疑问吧?

Best,
Kurt


On Tue, Jun 16, 2020 at 10:02 AM Yichao Yang <1048262...@qq.com> wrote:

> Hi
>
>
> 1.2版本将会有like字句的支持,参考[1],不过也是通过定义一张表的方式,而不是直接在query内指定。
>
> 个人理解在query内指定其实会涉及到很多因素,假设涉及到多张表的时候,涉及到同key属性时,你在query内指定的属性到底是赋予给哪张表的?这个其实是比较模糊的。
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table
>
>
> Best,
> Yichao Yang
>
>
>
>
> --原始邮件--
> 发件人:"Kurt Young" 发送时间:2020年6月16日(星期二) 上午9:53
> 收件人:"user-zh"
> 主题:Re: flink1.11 小疑问(提升 DDL 易用性(动态 Table 属性))
>
>
>
> 就是你DDL定义表的时候的WITH参数,有时候有个别参数写的不对或者需要调整,可以在query里直接修改,而不用重新定义一张新表。
>
> Best,
> Kurt
>
>
> On Tue, Jun 16, 2020 at 9:49 AM kcz <573693...@qq.com wrote:
>
>  动态 Table 属性是指什么?可以举一个列子吗。


Re: flink1.11 小疑问(提升 DDL 易用性(动态 Table 属性))

2020-06-15 文章 Kurt Young
就是你DDL定义表的时候的WITH参数,有时候有个别参数写的不对或者需要调整,可以在query里直接修改,而不用重新定义一张新表。

Best,
Kurt


On Tue, Jun 16, 2020 at 9:49 AM kcz <573693...@qq.com> wrote:

> 动态 Table 属性是指什么?可以举一个列子吗。


Re: Flink/SparkStreaming 性能测试(吞吐/延时)

2020-06-11 文章 Kurt Young
我们最近做了一个基于beam nexmark的性能对比测试[1],你可以参考一下。
和beam的测试不同的是,我们用各自引擎的API对着测试case描述的场景重新写了一下,并不是像这个里面一样全都用
beam的api写测试case,然后翻译到多个runner之上。

[1] https://beam.apache.org/documentation/sdks/java/testing/nexmark/

Best,
Kurt


On Fri, Jun 12, 2020 at 10:49 AM Zhonghan Tang <13122260...@163.com> wrote:

> Hi,
> 近期我需要做一个 Flink/SparkStreaming 吞吐/延时的性能分析, 我发现网上这方面资料很少,  只有17年美团/15年yahoo
> 做了一个类似的分析. 问题如下:
> 1. 简单的读kafka写kafka 要如何记录数据进flink/出flink 的时间? 如果是打时间戳要怎么打? 打的话会不会影响性能?
> 2. 我想到的场景是: 简单插数etl, 基本的过滤, 窗口. 请问还有没有什么定性定量的方式可以测量框架性能?
>
>
> 美团链接:
>
> https://tech.meituan.com/2017/11/17/flink-benchmark.html?spm=a2c6h.13066369.0.0.5e3c1455V4UrXH
> yahoo:
>
> https://yahooeng.tumblr.com/post/135321837876/benchmarking-streaming-computation-engines-at
>
>
> | |
> Zhonghan Tang
> |
> |
> 13122260...@163.com
> |
> 签名由网易邮箱大师定制
>
>


Re: [DISCUSS] Hierarchies in ConfigOption

2020-04-29 文章 Kurt Young
IIUC FLIP-122 already delegate the responsibility for designing and parsing
connector properties to connector developers.
So frankly speaking, no matter which style we choose, there is no strong
guarantee for either of these. So it's also possible
that developers can choose a totally different way to express properties,
such as:

'format' = 'csv',
'csv.allow-comments' = 'true',
'csv.ignore-parse-errors' = 'true'

which also seems quite straightforward and easy to use. So my opinion on
this would be since there is no guarantee for developers
to choose "format" as common prefix of all format related properties, there
is not much value to extend 'format' to 'format.kind'.


Best,
Kurt


On Thu, Apr 30, 2020 at 10:17 AM Jingsong Li  wrote:

> Thanks Timo for staring the discussion.
>
> I am +1 for "format: 'json'".
> Take a look to Dawid's yaml case:
>
> connector: 'filesystem'
> path: '...'
> format: 'json'
> format:
> option1: '...'
> option2: '...'
> option3: '...'
>
> Is this work?
> According to my understanding, 'format' key is the attribute of connector,
> which can be separately configured outside. In the 'format' block, they are
> the attribute of format.
> So this json style block can only contain the properties exclude format
> itself.
>
> Best,
> Jingsong Lee
>
> On Thu, Apr 30, 2020 at 9:58 AM Benchao Li  wrote:
>
>> Thanks Timo for staring the discussion.
>>
>> Generally I like the idea to keep the config align with a standard like
>> json/yaml.
>>
>> From the user's perspective, I don't use table configs from a config file
>> like yaml or json for now,
>> And it's ok to change it to yaml like style. Actually we didn't know that
>> this could be a yaml like
>> configuration hierarchy. If it has a hierarchy, we maybe consider that in
>> the future to load the
>> config from a yaml/json file.
>>
>> Regarding the name,
>> 'format.kind' looks fine to me. However there is another name from the
>> top of my head:
>> 'format.name', WDYT?
>>
>> Dawid Wysakowicz  于2020年4月29日周三 下午11:56写道:
>>
>>> Hi all,
>>>
>>> I also wanted to share my opinion.
>>>
>>> When talking about a ConfigOption hierarchy we use for configuring Flink
>>> cluster I would be a strong advocate for keeping a yaml/hocon/json/...
>>> compatible style. Those options are primarily read from a file and thus
>>> should at least try to follow common practices for nested formats if we
>>> ever decide to switch to one.
>>>
>>> Here the question is about the properties we use in SQL statements. The
>>> origin/destination of these usually will be external catalog, usually in a
>>> flattened(key/value) representation so I agree it is not as important as in
>>> the aforementioned case. Nevertheless having a yaml based catalog or being
>>> able to have e.g. yaml based snapshots of a catalog in my opinion is
>>> appealing. At the same time cost of being able to have a nice
>>> yaml/hocon/json representation is just adding a single suffix to a
>>> single(at most 2 key + value) property. The question is between `format` =
>>> `json` vs `format.kind` = `json`. That said I'd be slighty in favor of
>>> doing it.
>>>
>>> Just to have a full picture. Both cases can be represented in yaml, but
>>> the difference is significant:
>>> format: 'json'
>>> format.option: 'value'
>>>
>>> vs
>>> format:
>>> kind: 'json'
>>>
>>> option: 'value'
>>>
>>> Best,
>>> Dawid
>>>
>>> On 29/04/2020 17:13, Flavio Pompermaier wrote:
>>>
>>> Personally I don't have any preference here.  Compliance wih standard
>>> YAML parser is probably more important
>>>
>>> On Wed, Apr 29, 2020 at 5:10 PM Jark Wu  wrote:
>>>
 From a user's perspective, I prefer the shorter one "format=json",
 because
 it's more concise and straightforward. The "kind" is redundant for
 users.
 Is there a real case requires to represent the configuration in JSON
 style?
 As far as I can see, I don't see such requirement, and everything works
 fine by now.

 So I'm in favor of "format=json". But if the community insist to follow
 code style on this, I'm also fine with the longer one.

 Btw, I also CC user mailing list to listen more user's feedback.
 Because I
 think this is relative to usability.

 Best,
 Jark

 On Wed, 29 Apr 2020 at 22:09, Chesnay Schepler 
 wrote:

 >  > Therefore, should we advocate instead:
 >  >
 >  > 'format.kind' = 'json',
 >  > 'format.fail-on-missing-field' = 'false'
 >
 > Yes. That's pretty much it.
 >
 > This is reasonable important to nail down as with such violations I
 > believe we could not actually switch to a standard YAML parser.
 >
 > On 29/04/2020 16:05, Timo Walther wrote:
 > > Hi everyone,
 > >
 > > discussions around ConfigOption seem to be very popular recently.
 So I
 > > would also like to get some opinions on a different topic.
 > >
 > > How do we represent hierarchies in ConfigOption? In 

Re: Re: sql 行转列

2020-04-26 文章 Kurt Young
从你的原始sql看起来,我猜测你是想在做统计的时候,要套用一个过滤条件?从你的原始sql我没看出任何和“行转列”相关的迹象和需求,能否详细解释一下

Best,
Kurt


On Sun, Apr 26, 2020 at 6:20 PM Benchao Li  wrote:

> 你指的是多行转多行么?如果是的话,那你需要的应该是Table Aggregate Function[1],但是这个只能在Table Api里使用,
> 在SQL里面没有这种语义可以直接使用。
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html#table-aggregation-functions
>
> 王双利  于2020年4月26日周日 下午6:14写道:
>
> >看到的例子,都是 对单行数据进行转行的,这种多行数据的,有相关例子吗?
> >
> > 发件人: Benchao Li
> > 发送时间: 2020-04-26 17:31
> > 收件人: user-zh
> > 主题: Re: sql 行转列
> > Hi 双利,
> >
> > 在Flink里面行转列用的是Table Function,你可以参考下[1] 里面的 ”Join with Table Function
> > (UDTF)“ 部分。
> >
> > [1]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins
> >
> > 王双利  于2020年4月26日周日 下午5:19写道:
> >
> > > select ip,
> > > case status when 'success' THEN sum(t) ELSE 0 end successct,
> > > case status when 'fail' THEN sum(t) ELSE 0 end failct
> > > from view1
> > > group by ip 这样不能行转列,有解决方案吗?
> > >
> > >
> > >
> >
> > --
> >
> > Benchao Li
> > School of Electronics Engineering and Computer Science, Peking University
> > Tel:+86-15650713730
> > Email: libenc...@gmail.com; libenc...@pku.edu.cn
> >
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


Re: how to send back result via job manager to client

2020-04-19 文章 Kurt Young
可以看下这个jira:https://issues.apache.org/jira/browse/FLINK-14807

Best,
Kurt


On Mon, Apr 20, 2020 at 7:07 AM Eleanore Jin  wrote:

> Hi,
> 刚刚读到一篇关于Flink 在OLAP 上的使用案例 (
> https://ververica.cn/developers/olap-engine-performance-optimization-and-application-cases/),
> 其中一点提到了:
> [image: image.png]
> 这部分优化,源于 OLAP 的一个特性:OLAP 会将最终计算结果发给客户端,通过JobManager 转发给 Client。
>
> 想请问一下这一点是如何实现的: 通过JobManager 把结果转发给Client.
>
> 谢谢!
> Eleanore
>


Re: Flink Weekly | 每周社区动态更新 - 2020/04/18

2020-04-18 文章 Kurt Young
感谢整理!

Best,
Kurt


On Sat, Apr 18, 2020 at 9:43 PM 王雷  wrote:

> 大家好,本文为 Flink Weekly 的第十三期,由王雷整理,主要内容包括:近期社区开发进展,邮件问题答疑以及 Flink
> 最新社区动态及技术文章推荐。
>
>
>
>
> Flink 开发进展
>
> ■ [Releases] Tzu-Li (Gordon) Tai 发布了 Apache Flink Stateful Functions 2.0.0。
>
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ANNOUNCE-Apache-Flink-Stateful-Functions-2-0-0-released-td34121.html
>
>
>
>
> ■ [Releases] Yu Li 发起了关于发布 Flink 1.10.1 版本的讨论,即将发布的 1.10.1 版本还有1个
> Blocker。预计下周会有一个 RC 版本。
>
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Releasing-Flink-1-10-1-td38689.html
>
>
>
>
> ■ [Releases] 1.9.3 版本所有的 blocker issues 都已经被解决,Dian Fu 正在准备发布第一个候选版本。
>
> [3]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Releasing-Flink-1-9-3-td40086.html
>
>
>
>
> ■ [SQL] 在 FLIP-84 重构 TableEnvironment 和 Table 方案中,遗漏了提交 DQL
> 任务的场景,godfreyhe 重新发起了 FLIP-84 的投票,已通过。
>
> [4]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-84-Improve-amp-Refactor-API-of-TableEnvironment-amp-Table-td39543.html
>
>
>
>
> ■ [SQL] FLIP-122 关于在新的 TableFactory 中使用新的 connector 属性的投票已通过。
>
> [5]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/RESULT-VOTE-FLIP-122-New-Connector-Property-Keys-for-New-Factory-td39935.html
>
>
>
>
> ■ [SQL] FLIP-110 关于支持在创建表的语句中增加 like 语法的投票已通过。
>
> [6]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-110-Support-LIKE-clause-in-CREATE-TABLE-td39554.html
>
>
>
>
> ■ [Python] FLIP-121 关于支持 Cython 优化 Python UDF 的投票已通过。
>
> [7]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-121-Support-Cython-Optimizing-Python-User-Defined-Function-td39577.html
>
>
>
>
> ■ [Runtime] FLIP-119 关于对调度策略优化的投票通过,优化主要集中在运行批处理作业时避免资源死锁等。
>
> [8]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-119-Pipelined-Region-Scheduling-td39585.html
>
>
>
>
> 邮件问题答疑
>
> ■ guanyq 在社区提问如何查询 flink job 的异常日志。田志声推荐使用 log agent (比如 filebeat)
> 统一采集作业的日志,然后收集到 ElasticSearch 查看。
>
> [9]http://apache-flink.147419.n8.nabble.com/flink-td2378.html
>
>
>
>
> ■ chanamper 希望能在 Java Api 中使用 LocalGlobal 的聚合优化方法,Congxian Qiu
> 予以了回答,DataStream API 暂时没有 local aggregation 的功能,可以通过给 key 拼前缀或者后缀来达到类似的效果。
>
> [10]http://apache-flink.147419.n8.nabble.com/Flink-keyby-td2309.html
>
>
>
>
> ■ 111 发现 flink 中的 calcite 依赖 guava 16 以上的版本,hbase-connector 模块依赖 guava
> 12.0 版本,经过 shade 后,作业运行正常,但是在 IDEA 运行单元测试会冲突。目前只能通过 mvn test
> 来运行单元测试,或者把单元测试改为连接远程 Hbase 的方法来解决。
>
> [11]
> http://apache-flink.147419.n8.nabble.com/Flink1-10-0-flink-hbase-guava-td2385.html
>
>
>
>
> ■ KristoffSC 在社区提问在用 RocksDB 作为 StateBackend 的情况下,少量的 MapState,每个 state
> 的大小很大,和很多的 ValueState,每个 state 的大小很小相比,哪种性能更高。Congxian Qiu
> 认为这两者没有区别,并给予了详细的解析。
>
> [12]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-size-Vs-keys-number-perfromance-td34135.html
>
>
>
>
> ■ 111 在使用 TopN 语句时遇到了 ”This calc has no
>
> useful projection and no filter. It should be removed by CalcRemoveRule”
> 的问题,Jark 认为这是由于 codegen bug 导致。
>
> [13]
> http://apache-flink.147419.n8.nabble.com/Flink-SQL-1-10-ROW-NUMBER-td2355.html
>
>
>
>
> ■ Dongwon Kim 在社区里提问关于在 NullAwareMapIterator 中抛出 NPE 的问题,Jark Wu 认为这是由于
> HeapMapState iterator 的一处 bug 导致返回空的 iterator所致。
>
> [14]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/NPE-from-NullAwareMapIterator-in-flink-table-runtime-blink-td34083.html
>
>
>
>
> ■ Krzysztof Zarzycki 希望能够用 Flink SQL 实现动态修改 job
> 的拓扑图,以动态的增删业务的处理分支。该功能目前还不支持,大家对该功能的实现进行了讨论。
>
> [15]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-Flink-SQL-td33853.html
>
>
>
>
> ■ Salva Alcántara 在 snapshotState 方法中对 keyed state 进行了 clear 处理。job
> 启动后,没有一条数据进入 input streams 时,触发 checkpoint 会报 NPE。Yun Tang 对该问题进行了回复,讲述了
> keyed state 和 operator state 的区别,并根据 Salva Alcántara 的业务逻辑推荐他使用 operator
> state。
>
> [16]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Using-MapState-clear-put-methods-in-snapshotState-within-KeyedCoProcessFunction-valid-or-not-td31353.html
>
>
>
>
> ■ Aaron Levin 遇到了在 RocksDB 中存储含有百万个元素的 ListState
> 的应用场景,担心在这种场景下会遇到一些问题。Seth Wiesman 回复说 RocksDB's JNI bridge 不支持超过 2GB 的
> ListState。Aljoscha Krettek 提供了另外的思路,可以把数据分成多个key,分散处理。
>
> [17]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ListState-with-millions-of-elements-td34153.html
>
>
>
>
> ■ Robin Cassan 遇到了 checkpoint 因超时失败时,接下来的 checkpoint 出现雪球效应的问题。Congxian
> Qiu 回答说非对齐的 checkpoint 可以解决该问题,但该方案还未实现。
>
> [18]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Making-job-fail-on-Checkpoint-Expired-td34051.html
>
>
>
>
> ■ Gyula Fóra 遇到了 source 表含有 nullable 字段,将 null 数据过滤掉,写入对应字段类型 STRING NOT
> NULL 的 sink 表时,报类型不兼容的错误。Timo Walther 告知 type system 仍然在完善中,该问题是已知问题。
>
> [19]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Inserting-nullable-data-into-NOT-NULL-columns-td34198.html
>
>
>
>
> ■ forideal 在一个任务中提交了三个 SQL,使用的同一个 source 表。Flink 

Re: 【反馈收集】在 1.11 版本中将 blink planner 作为默认的 planner

2020-04-03 文章 Kurt Young
你好,这个是预期中的。在新的类型系统下,我们将使用 LocalDateTime 作为 TIMESTAMP 类型的默认对象。
同时我们还禁用了 long 和 TIMESTAMP 对象的直接互转。
具体的原因和细节可以看:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System

Best,
Kurt


On Fri, Apr 3, 2020 at 4:58 PM 1193216154 <1193216...@qq.com> wrote:

>
> 你好,最近改成blinkplanner发现了两个问题。及时两者生成的proctime的时间类型不同,一个是TimeStamp,一个是LocalDateTime。
>
>
> org.apache.flink.table.dataformat.DataFormatConverters中TimestampConverter 的
>
> toInternalImpl方法只支持TimeStamp的参数,而我遇见的情况是传进了long类型,导致类转换异常,如果能重载toInternalImpl方法加一个long,或许可以解决我的问题
> --原始邮件--
> 发件人:"Kurt Young" 发送时间:2020年4月1日(星期三) 上午9:22
> 收件人:"user-zh"
> 主题:【反馈收集】在 1.11 版本中将 blink planner 作为默认的 planner
>
>
>
> 大家好,
>
> 正如大家所知,Blink planner 是 Flink 1.9 版本中引入的一个全新的 Table API 和 SQL 的翻译优化
> 器,并且我们已经在 1.10 版本中将其作为 SQL CLI 的默认 planner。由于社区很早就已经决定不再
> 针对老的优化器去增添任何新的功能,所以从功能和性能上来说,老的 flink planner 已经缺了很多
> 现在社区最新的 SQL 相关的功能,比如新的类型系统,更多的DDL的支持,以及即将在 1.11 发布
> 的新的 TableSource 和 TableSink 接口和随之而来的对 Binlog 类日志的解析。
>
> 因此我们打算尝试在接下来这个 1.11 版本发布时将 blink planner 升级为默认的 planner。但在此之
> 前,我们希望听听您的反馈。尤其是一些使用过程的体验,和您观察到的或者经历过的 blink planner
> 做不了的事情从而导致您退回了老的 flink planner 的原因,可能是两边功能不对等,或者 blink planner
> 有什么新的 bug 而老的 planner 没有。我们距离 1.11 的需求冻结还有差不多一个月的时间,收到
> 您的反馈之后,我们有足够的时间进行修复和完善。
>
> 希望听到您宝贵的声音和意见,谢谢。
>
> Best,
> Kurt


Re: flink 1.10 createTemporaryTable丢失proctime问题

2020-04-02 文章 Kurt Young
看起来你是踩到了这个bug:https://issues.apache.org/jira/browse/FLINK-16160
在这个bug修复前,先继续用老的API吧

Best,
Kurt


On Thu, Apr 2, 2020 at 10:34 AM deadwind4  wrote:

> registerTableSource 被标记了@Deprecated 在flink
> 1.10,我这种情况是继续沿用过期的API(registerTableSource)吗?
>
>
>  原始邮件
> 发件人: deadwind4
> 收件人: user-zh
> 发送时间: 2020年4月2日(周四) 10:30
> 主题: Re: flink 1.10 createTemporaryTable丢失proctime问题
>
>
> 修改前
> tEnv.connect().withFormat().withSchema(
> xxx.proctime()
> ).registerTableSource(“foo”);
>
>
> 修改后
> tEnv.connect().withFormat().withSchema(
> xxx.proctime()
> ).createTemporaryTable(“foo”);
>
>
> 修改后.proctime()就失效了,所以我proctime window也用不了了。
>
>
>  原始邮件
> 发件人: deadwind4
> 收件人: user-zh
> 发送时间: 2020年4月2日(周四) 10:22
> 主题: Re: flink 1.10 createTemporaryTable丢失proctime问题
>
>
> tEnv.connect().withFormat().withSchema().registerTableSource(“foo”);
> tEnv.connect().withFormat().withSchema().createTemporaryTable(“foo”);
>
>
>  原始邮件
> 发件人: Jark Wu
> 收件人: user-zh
> 发送时间: 2020年4月2日(周四) 10:18
> 主题: Re: flink 1.10 createTemporaryTable丢失proctime问题
>
>
> Hi, 你能描述下你的改动前后的代码吗?据我所知 TableEnvironment 上没有 createTemporaryTable
> 方法,只有createTemporaryView方法,而且 registerTableSource 和 createTemporaryView
> 的参数是不一样的。 Best, Jark > 2020年4月1日 23:13,deadwind4 
> 写道: > > 我其实是想用processing time window 但是我把过期的API
> registerTableSource换成createTemporaryTable,proctime就不起作用了。这种情况我应该如何使用呢?谢谢您,叨扰了。
> > > > 原始邮件 > 发件人: Jark Wu > 收件人: user-zh<
> user-zh@flink.apache.org> > 发送时间: 2020年4月1日(周三) 21:37 > 主题: Re: flink
> 1.10 createTemporaryTable丢失proctime问题 > > > Hi, proctime 的含义是机器时间,不等价于
> now()或者 current_timestamp() 函数,该字段只有在真正使用的才会物化(即去取
> System.currentTimeMillis)。 能请描述下你想用 createTemporaryTable
> 做什么呢?当前哪里不能满足你的需求呢? Best, Jark On Wed, 1 Apr 2020 at 18:56, deadwind4 <
> deadwi...@outlook.com> wrote: > >
> 我使用1.10版本的createTemporaryTable发现proctime字段全是null但是换成过时的registerTableSource就可以。
> > 如果我想使用createTemporaryTable该怎么办。 >
> 并且我debug了createTemporaryTable的源码没有发现对proctime的处理。


【反馈收集】在 1.11 版本中将 blink planner 作为默认的 planner

2020-03-31 文章 Kurt Young
大家好,

正如大家所知,Blink planner 是 Flink 1.9 版本中引入的一个全新的 Table API 和 SQL 的翻译优化
器,并且我们已经在 1.10 版本中将其作为 SQL CLI 的默认 planner。由于社区很早就已经决定不再
针对老的优化器去增添任何新的功能,所以从功能和性能上来说,老的 flink planner 已经缺了很多
现在社区最新的 SQL 相关的功能,比如新的类型系统,更多的DDL的支持,以及即将在 1.11 发布
的新的 TableSource 和 TableSink 接口和随之而来的对 Binlog 类日志的解析。

因此我们打算尝试在接下来这个 1.11 版本发布时将 blink planner 升级为默认的 planner。但在此之
前,我们希望听听您的反馈。尤其是一些使用过程的体验,和您观察到的或者经历过的 blink planner
做不了的事情从而导致您退回了老的 flink planner 的原因,可能是两边功能不对等,或者 blink planner
有什么新的 bug 而老的 planner 没有。我们距离 1.11 的需求冻结还有差不多一个月的时间,收到
您的反馈之后,我们有足够的时间进行修复和完善。

希望听到您宝贵的声音和意见,谢谢。

Best,
Kurt


Re: flink 安装包的几个 jar 是怎么 build 出来的

2020-03-26 文章 Kurt Young
flink-table-uber-blink 应该是这个module,它负责打出 blink planner 的胖包(uber jar)

Best,
Kurt


On Thu, Mar 26, 2020 at 5:54 PM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

>
> 单机版 下载 tgz 解压,lib 目录下有好几个 jar, 比如 flink-table-blink_2.12-1.10.0.jar
> 这个 jar 是从哪里 build 出来的呢?
>
> 我 clone github 上的源代码,mvn clean package
> 我以为 flink-table/flink-table-planner-blink 目录下build 出的
> flink-table-planner-blink_2.11-1.10.0.jar 跟发布 tgz 里的
>  flink-table-blink_2.12-1.10.0.jar  是对应的
> 我直接替换到安装目录下,start-cluster.sh 还可以,但 ./bin/sql-client.sh embedded 就出错了。
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn
>
>


Re: 关于 SQL DATE_FORMAT 的时区设置的构想

2020-03-25 文章 Kurt Young
我们先改成 timestamp with local zone,如果这个字段的类型在整个query里都没变过,那个 with time
zone的效果也差不多了。

Best,
Kurt


On Wed, Mar 25, 2020 at 8:43 PM Zhenghua Gao  wrote:

> Hi Jark,
>
> 这里的确是有问题的。
> 目前的问题是Calcite本身并不支持TIMESTAMP WITH TIME ZONE.
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Tue, Mar 24, 2020 at 11:00 PM Jark Wu  wrote:
>
> > Thanks for reporting this Weike.
> >
> > 首先,我觉得目前 Flink 返回 TIMESTAMP WITHOUT TIME ZONE 应该是有问题的。
> > 因为 SQL 标准(SQL:2011 Part 2 Section 6.32)定义了返回类型是 WITH TIME ZONE。
> > 另外 Calcite 文档中 [1] 也说返回的是 TIMESTAMP WITH TIME ZONE (虽然好像和实现不一致)
> > 其他的一些数据库也都差不多:mysql [2], oracle[3]
> >
> > Best,
> > Jark
> >
> > [1]: https://calcite.apache.org/docs/reference.html#datetime-functions
> > [2]:
> >
> >
> https://dev.mysql.com/doc/refman/8.0/en/date-and-time-functions.html#function_current-timestamp
> > [3]:
> >
> >
> https://docs.oracle.com/cd/B28359_01/server.111/b28286/functions038.htm#SQLRF00629
> >
> >
> >
> > On Tue, 24 Mar 2020 at 17:00, DONG, Weike 
> wrote:
> >
> > > Hi Zhenghua,
> > >
> > > 感谢您的回复。感觉既然 Flink 已经提供了时区的设定,这方面也许可以进一步增强一些。CONVERT_TZ
> > > 用户很容易忘记或者漏掉,这里还是有不少完善的空间。
> > >
> > > Best,
> > > Weike
> > >
> > > On Tue, Mar 24, 2020 at 4:20 PM Zhenghua Gao  wrote:
> > >
> > > > CURRENT_TIMESTAMP 返回值类型是 TIMESTAMP (WITHOUT TIME ZONE),
> > > > 其语义可参考 java.time.LocalDateTime。
> > > > 其字符形式的表示并不随着时区变化而变化(如你所见,和UTC+0 一致)。
> > > >
> > > > 你的需求可以通过 CONVERT_TZ(timestamp_string, time_zone_from_string,
> > > > time_zone_to_string)
> > > >
> > > > *Best Regards,*
> > > > *Zhenghua Gao*
> > > >
> > > >
> > > > On Mon, Mar 23, 2020 at 10:12 PM DONG, Weike <
> kyled...@connect.hku.hk>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > 近期发现 Flink 的 Blink Planner 在 DATE_FORMAT 对 CURRENT_TIMESTAMP
> > > > > 做时间格式化为字符串时,默认以 UTC+0 为准。
> > > > >
> > > > > 长期以来,TableConfig 类里面有一个 setLocalTimeZone
> 方法;将其设置为东八区以后,发现格式化后的字符串仍然是
> > > > UTC+0
> > > > > 的。而深入来看,Flink 的时间格式化时的代码生成逻辑(time.scala)并未考虑时区的设置。
> > > > >
> > > > > 由于大多数用户的时区均不是 UTC+0(GMT、UTC),如果时间格式化、显示等都可以考虑到 TableConfig
> 中的时区设置,那么
> > > > Flink
> > > > > 是否会更用户友好一些呢?当然这个会涉及到不兼容的变更,需要谨慎一些。
> > > > >
> > > > > 也许提供一个 DATE_FORMAT_WITH_TIMEZONE 的内置函数,社区是否会更容易接受一些呢?
> > > > >
> > > > > 仅仅是个人一点想法,感谢 :)
> > > > >
> > > >
> > >
> >
>


Re: 关于flink sql 1.10 source并行度自动推断的疑问

2020-03-24 文章 Kurt Young
你的数据量有多大?有一个可能的原因是source的其他并发调度起来的时候,数据已经被先调度起来的并发读完了。

Best,
Kurt


On Tue, Mar 24, 2020 at 10:39 PM Chief  wrote:

> hi all:
> 之前用flink sql查询hive的数据,hive的数据文件是150个,sql
> client配置文件设置的并行度是10,source通过自动推断生成了150并发,但是通过看web
> ui发现只有前十个子任务是读到数据了,其他的任务显示没有读到数据,请问是我设置有问题吗?


Re: Flink SQL1.10 大表join如何优化?

2020-03-21 文章 Kurt Young
你的plan里除了source之外,其他所有节点都是在单并发运行,这对两张1000多万的表join来说是不够的,你可以尝试加大并发。

Best,
Kurt


On Sat, Mar 21, 2020 at 1:30 PM 111  wrote:

> Hi:
> 看了下源代码,了解了下Hybrid hash join。大致了解了瓶颈点:
> Hybrid hash
> join,会把build表(也就是我的右表)通过hash映射成map,并按照某种规则进行分区存储(有的在内存,超过的放入磁盘)。
> 目前看磁盘上的那部分join应该是整个任务的瓶颈。
> 具体调优方法,还在探索中...也许有什么配置可以控制build表内存存储的大小.
> 在2020年03月21日 11:01,111 写道:
> Hi, wu:
> 好的,我这边观察下gc情况。
> 另外,我的sql里面有关联条件的,只是第一个表1400多万条,第二张表1000多万条。
> | select
>
>
>   wte.external_user_id,
>
>   wte.union_id,
>
>   mr.fk_member_id as member_id
>
> from a wte
>
> left join b mr
>
>  on wte.union_id = mr.relation_code
>
> where wte.ods_date = '${today}'
>
> limit 10;
>
> |
> 我在ui里面可以看到任务也在正常运行,只是每秒输入700条左右,每秒输出1700,所以对比总量来说十分缓慢。
>
>
> 目前不太清楚性能的瓶颈点和优化的方向:
> 1
> 网络传输太慢,导致两表不能及时join?这里不知道如何排查,Metrics里面有个netty的相关指标,看不出什么;其他的指标除了hashjoin
> in和out缓慢变化,其他的都没有什么变化。
> 2 并行度过低,导致单点slot需要执行两个千万级表的关联?可否动态修改或者配置probe表的并行度?
> 3 JVM内存问题?详情见附件,观察内存还是很充足的,貌似垃圾回收有点频繁,是否有必要修改jvm配置?
> 4 taskmanager的日志不太理解….到build phase就停住了,是日志卡主了 还是 此时正在进行build的网络传输?
> |
> 2020-03-21 09:23:14,732 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:14,738 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:14,744 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:14,750 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:14,756 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:14,762 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:14,772 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:14,779 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:16,357 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 14 ms for 65536 segments
> 2020-03-21 09:23:16,453 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 10 ms for 65536 segments
> 2020-03-21 09:23:16,478 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 9 ms for 65536 segments
> 2020-03-21 09:23:16,494 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 9 ms for 65536 segments
> 2020-03-21 09:23:16,509 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 10 ms for 65536 segments
> 2020-03-21 09:23:16,522 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 9 ms for 65536 segments
> 2020-03-21 09:23:16,539 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 9 ms for 65536 segments
> 2020-03-21 09:23:16,554 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 10 ms for 65536 segments
> 2020-03-21 09:23:16,574 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 9 ms for 65536 segments
> 2020-03-21 09:23:16,598 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 9 ms for 65536 segments
> 2020-03-21 09:23:16,611 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 10 ms for 65536 segments
> 2020-03-21 09:23:20,157 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 213 ms for 131072 segments
> 2020-03-21 09:23:21,579 INFO
> org.apache.flink.table.runtime.operators.join.HashJoinOperator  - Finish
> build phase.
> |
>
>
>
>
> 在2020年03月21日 10:31,Jark Wu 写道:
> Hi,
>
> 看起来你的 join 没有等值关联条件,导致只能单并发运行。你可以观察下这个 join 节点的 gc 情况,看看是不是 full gc 导致运行缓慢。
> 关于 batch join,Jingsong 比我更熟悉一些调优手段,也许他能提供一些思路,cc @Jingsong Li
> 
>
> Best,
> Jark
>
> On Fri, 20 Mar 2020 at 17:56, 111  wrote:
>
>
>
> 图片好像挂了:
>
>
>
>
> https://picabstract-preview-ftn.weiyun.com/ftn_pic_abs_v3/93a8ac1299f8edd31aa93d69bd591dcc5b768e2c6f2d7a32ff3ac244040b1cac3e8afffd0daf92c4703c276fa1202361?pictype=scale=30113=3.3.3.3=23603357=F74D73D5-810B-4AE7-888C-E65BF787E490.png=750
>
>
> 在2020年03月20日 17:52,111 写道:
> 您好:
> 我有两张表数据量都是1000多万条,需要针对两张表做join。
> 提交任务后,发现join十分缓慢,请问有什么调优的思路?
> 需要调整managed memory吗?
>
> 目前每个TaskManager申请的总内存是2g,每个taskManager上面有4个slot。taskmanager的metrics如下:
> | {
> "id":"container_e40_1555496777286_675191_01_000107",
> "path":"akka.tcp://flink@hnode9:33156/user/taskmanager_0",
> "dataPort":39423,
> "timeSinceLastHeartbeat":1584697728127,
> "slotsNumber":4,
> "freeSlots":3,
> "hardware":{
> "cpuCores":32,
> "physicalMemory":135355260928,
> "freeMemory":749731840,
> 

Re: sql关键字问题

2020-03-18 文章 Kurt Young
好像已经有了,应该是这个jira:
https://issues.apache.org/jira/browse/FLINK-16526

Best,
Kurt


On Wed, Mar 18, 2020 at 4:19 PM Jingsong Li  wrote:

> Hi lucas,
>
> 赞专业的分析,看起来是Flink的bug,你可以建个Jira来跟踪。
> CC: @Yuzhao Chen 
>
> Best,
> Jingsong Lee
>
> On Wed, Mar 18, 2020 at 4:15 PM lucas.wu  wrote:
>
> > 初步找到了原因
> > 原来我的建表语句用了computed_column_expression 这种语义。
> > 然后flink内部在使用的时候其实是把它转成了select 语句
> > ...
> > if (columnExprs.nonEmpty) {
> >  val fieldExprs = fieldNames
> >  .map { name =
> >  if (columnExprs.contains(name)) {
> >  columnExprs(name)
> >  } else {
> >  name
> >  }
> >  }.toArray
> >  val rexNodes =
> > toRexFactory.create(newRelTable.getRowType).convertToRexNodes(fieldExprs)
> > …..
> >
> >
> > 然后我们看看convertToRexNodes方法
> >
> >
> > public RexNode[] convertToRexNodes(String[] exprs) {
> > ….
> >  String query = String.format(QUERY_FORMAT, String.join(",", exprs));
> >  SqlNode parsed = planner.parser().parse(query);
> > }
> >
> >
> > 重点就在这个QUERY_FORMAT
> > private static final String QUERY_FORMAT = "SELECT %s FROM " +
> > TEMPORARY_TABLE_NAME;
> >
> >
> > 这样写是有问题的,当我的字段本身是有``的时候,就被去掉了,导致后面valid的时候就报错。
> >
> >
> > 所以这个是算flink的bug吗?
> > 原始邮件
> > 发件人:lucas.wulucas...@xiaoying.com
> > 收件人:user-zhuser...@flink.apache.org
> > 发送时间:2020年3月18日(周三) 15:36
> > 主题:sql关键字问题
> >
> >
> > create table `source_table`( `SeqNo` varchar, `Type` varchar, `Table`
> > varchar, `ServerId` varchar, `Database` varchar, `OldData` varchar,
> `GTID`
> > varchar, `Offset` varchar, `event_ts` as
> > to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'-MM-ddHH:mm:ss'),
> > WATERMARK FOR event_ts AS event_ts - interval '60' second ) with(…) 查询语句
> > Select * from source_table; 这是我的建表和查询语句,发现即使用了`` 查询的时候还是会报Table是关键字的问题。
> SQL
> > parse failed. Encountered "Table" at line 1,column 19. 但是一旦我把 `event_ts`
> as
> > to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'-MM-ddHH:mm:ss'),
> > WATERMARK FOR event_ts AS event_ts - interval '60’ second 这两行去掉
> > ,就正常了。是我的使用方法有问题吗?
>
>
>
> --
> Best, Jingsong Lee
>


Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-11 文章 Kurt Young
https://github.com/ververica/flink-sql-gateway  了解一下

Best,
Kurt


On Wed, Mar 11, 2020 at 9:26 PM zhisheng  wrote:

> hi, Kurt Young
>
> 除了使用 sql-client 可以使用纯 SQL 来执行,有没有其他的办法来执行?因为通常不让本地直接连到生产的环境,也不会直接在生产的机器执行
> sql-client
>
> Kurt Young  于2020年3月11日周三 下午7:59写道:
>
> > 那有可能是可以的,你可以试试看
> >
> > Best,
> > Kurt
> >
> >
> > On Wed, Mar 11, 2020 at 6:57 PM wangl...@geekplus.com.cn <
> > wangl...@geekplus.com.cn> wrote:
> >
> > > Hi Kurt,
> > >
> > > 如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从 state
> > > 中恢复的功能吗?
> > > 代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state
> > > 存储并且再次提交任务可以被访问到直接用吗?
> > >
> > > 谢谢,
> > > 王磊
> > >
> > > --
> > > wangl...@geekplus.com.cn
> > >
> > >
> > > *Sender:* Kurt Young 
> > > *Send Time:* 2020-03-11 12:54
> > > *Receiver:* wangl...@geekplus.com.cn
> > > *cc:* user-zh 
> > > *Subject:* Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
> > > sql client 目前还不支持这个功能。
> > >
> > > Best,
> > > Kurt
> > >
> > >
> > > On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn <
> > > wangl...@geekplus.com.cn> wrote:
> > >
> > >> Hi Kurt,
> > >> 确实是可以 直接 flink  cancel -s 保存状态。
> > >> 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢?
> > >>
> > >> 谢谢,
> > >> 王磊
> > >>
> > >>
> > >> *Sender:* Kurt Young 
> > >> *Send Time:* 2020-03-11 10:38
> > >> *Receiver:* user-zh 
> > >> *Subject:* Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
> > >> 理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。
> > >> 应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。
> > >>
> > >> Best,
> > >> Kurt
> > >>
> > >>
> > >> On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn <
> > >> wangl...@geekplus.com.cn> wrote:
> > >>
> > >> > 有两个表:
> > >> > tableA: key  valueA
> > >> > tableB: key  valueB
> > >> >
> > >> > 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到
> valueA
> > >> > 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
> > >> > flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?
> > >> >
> > >> > 谢谢,
> > >> > 王磊
> > >> >
> > >>
> > >>
> >
>


Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-11 文章 Kurt Young
那有可能是可以的,你可以试试看

Best,
Kurt


On Wed, Mar 11, 2020 at 6:57 PM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

> Hi Kurt,
>
> 如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从 state
> 中恢复的功能吗?
> 代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state
> 存储并且再次提交任务可以被访问到直接用吗?
>
> 谢谢,
> 王磊
>
> --
> wangl...@geekplus.com.cn
>
>
> *Sender:* Kurt Young 
> *Send Time:* 2020-03-11 12:54
> *Receiver:* wangl...@geekplus.com.cn
> *cc:* user-zh 
> *Subject:* Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
> sql client 目前还不支持这个功能。
>
> Best,
> Kurt
>
>
> On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
>> Hi Kurt,
>> 确实是可以 直接 flink  cancel -s 保存状态。
>> 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢?
>>
>> 谢谢,
>> 王磊
>>
>>
>> *Sender:* Kurt Young 
>> *Send Time:* 2020-03-11 10:38
>> *Receiver:* user-zh 
>> *Subject:* Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
>> 理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。
>> 应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn <
>> wangl...@geekplus.com.cn> wrote:
>>
>> > 有两个表:
>> > tableA: key  valueA
>> > tableB: key  valueB
>> >
>> > 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA
>> > 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
>> > flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?
>> >
>> > 谢谢,
>> > 王磊
>> >
>>
>>


Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-10 文章 Kurt Young
我在社区建了个issue:https://issues.apache.org/jira/browse/FLINK-16534
后续你可以关注下

Best,
Kurt


On Wed, Mar 11, 2020 at 12:54 PM Kurt Young  wrote:

> sql client 目前还不支持这个功能。
>
> Best,
> Kurt
>
>
> On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
>> Hi Kurt,
>> 确实是可以 直接 flink  cancel -s 保存状态。
>> 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢?
>>
>> 谢谢,
>> 王磊
>>
>>
>> *Sender:* Kurt Young 
>> *Send Time:* 2020-03-11 10:38
>> *Receiver:* user-zh 
>> *Subject:* Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
>> 理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。
>> 应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn <
>> wangl...@geekplus.com.cn> wrote:
>>
>> > 有两个表:
>> > tableA: key  valueA
>> > tableB: key  valueB
>> >
>> > 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA
>> > 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
>> > flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?
>> >
>> > 谢谢,
>> > 王磊
>> >
>>
>>


Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-10 文章 Kurt Young
sql client 目前还不支持这个功能。

Best,
Kurt


On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

> Hi Kurt,
> 确实是可以 直接 flink  cancel -s 保存状态。
> 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢?
>
> 谢谢,
> 王磊
>
>
> *Sender:* Kurt Young 
> *Send Time:* 2020-03-11 10:38
> *Receiver:* user-zh 
> *Subject:* Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
> 理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。
> 应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。
>
> Best,
> Kurt
>
>
> On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
> > 有两个表:
> > tableA: key  valueA
> > tableB: key  valueB
> >
> > 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA
> > 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
> > flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?
> >
> > 谢谢,
> > 王磊
> >
>
>


Re: flink HiveTableSink 何时支持 streaming 模式写入

2020-03-10 文章 Kurt Young
预计1.11会ready。

Best,
Kurt


On Wed, Mar 11, 2020 at 10:44 AM chenkaibit  wrote:

> Hi:
> 我看 https://issues.apache.org/jira/browse/FLINK-14255 引入了 一个
> FileSystemStreamingSink,貌似是为 HiveTableSink 支持 streaming
> 模式写入做准备,这个功能预计会在后续哪个版本正式发布呢?
>
>


Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-10 文章 Kurt Young
理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。
应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。

Best,
Kurt


On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

> 有两个表:
> tableA: key  valueA
> tableB: key  valueB
>
> 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA
> 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
> flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?
>
> 谢谢,
> 王磊
>


Re: flink 长时间运行后出现报错

2020-03-09 文章 Kurt Young
我帮你 cc 了对 runtime 更熟悉的 zhuzhu 同学。

Best,
Kurt


On Mon, Mar 9, 2020 at 6:44 PM lucas.wu  wrote:

> 没人回复大概是之前没人遇到过这种问题,所以下午看了flink的代码,终于有了点头绪。
> 原因分析:
> 这个异常的原因就是在task出现异常之后,它需要调用updateTaskExecutionState(TaskExecutionState
> taskExecutionState)这个rpc接口去通知flink jobmanager
>
> 去改变对应task的状态并且重启task。但是呢,taskExecutionState这个参数里面有个error属性,当我的的task打出来的错误栈太多的时候,在序列化的之后超过了
> rpc接口要求的最大数据大小(也就是maximum akka framesize),导致调用updateTaskExecutionState
> 这个rpc接口失败,jobmanager无法获知这个task已经fail
>
> 的状态,也无法重启。这就导致了一系列连锁反应,其中一个就是我的checkpoint一直失败,原因就是我的task其实已经释放了,但是jobmanger无法感知。
>
> 结论:
> 这个算不算flink的一个bug,对于task已经失效,但是无法通知到jobmanger,导致该task一直无法重启。
> 原始邮件
> 发件人:lucas.wulucas...@xiaoying.com
> 收件人:user-zhuser...@flink.apache.org
> 发送时间:2020年3月9日(周一) 11:06
> 主题:flink 长时间运行后出现报错
>
>
> 大家好: 我的flink程序的主要功能是从kafka消费数据,简单处理后,通过jdbc
> outputformat发往mysql,但是长时间运行后,报下面的错。请问是什么原因造成的,我增加参数就可以解决吗? 2020-03-08
> 06:10:30,480 WARN org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler -
> Could not create remote rpc invocation message. Failing rpc invocation
> because... java.io.IOException: The rpc invocation size 34500577 exceeds
> the maximum akka framesize. at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:271)
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:200)
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:129)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:78)
> at com.sun.proxy.$Proxy12.updateTaskExecutionState(Unknown Source) at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1428)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1458)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2200(TaskExecutor.java:154)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1757)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
> akka.actor.Actor$class.aroundReceive(Actor.scala:517) at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:130) at
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:527) at
> akka.actor.ActorCell.invoke(ActorCell.scala:496) at
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at
> akka.dispatch.Mailbox.run(Mailbox.scala:224) at
> akka.dispatch.Mailbox.exec(Mailbox.scala:234) at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2020-03-08 06:10:30,480 ERROR
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor - Caught exception while
> executing runnable in main thread.
> java.lang.reflect.UndeclaredThrowableException at
> com.sun.proxy.$Proxy12.updateTaskExecutionState(Unknown Source) at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1428)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1458)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2200(TaskExecutor.java:154)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1757)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at
> 

Re: Hive Source With Kerberos认证问题

2020-02-27 文章 Kurt Young
cc @li...@apache.org 

Best,
Kurt


On Thu, Feb 13, 2020 at 10:22 AM 叶贤勋  wrote:

> Hi 大家好:
> 在做hive2.1.1 source带Kerberos认证有个异常请教下大家。
> flink 版本1.9
> hive 版本2.1.1,实现了HiveShimV211。
> 代码:
> public class HiveCatalogTest {
>private static final Logger LOG =
> LoggerFactory.getLogger(HiveCatalogTest.class);
>private String hiveConfDir = "/Users/yexianxun/dev/env/test-hive"; // a
> local path
>private TableEnvironment tableEnv;
>private HiveCatalog hive;
>private String hiveName;
>private String hiveDB;
>private String version;
>
>
>@Before
>public void before() {
>   EnvironmentSettings settings =
>  EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inBatchMode()
> .build();
>   tableEnv = TableEnvironment.create(settings);
>   hiveName = "myhive";
>   hiveDB = "sloth";
>   version = "2.1.1";
>}
>
>
>@Test
>public void testCatalogQuerySink() throws Exception {
>   hive = new HiveCatalog(hiveName, hiveDB, hiveConfDir, version);
>   System.setProperty("java.security.krb5.conf", hiveConfDir +
> "/krb5.conf");
>   tableEnv.getConfig().getConfiguration().setString("stream_mode",
> "false");
>   tableEnv.registerCatalog(hiveName, hive);
>   tableEnv.useCatalog(hiveName);
>   String query = "select * from " + hiveName + "." + hiveDB +
> ".testtbl2 where id = 20200202";
>   Table table = tableEnv.sqlQuery(query);
>   String newTableName = "testtbl2_1";
>   table.insertInto(hiveName, hiveDB, newTableName);
>   tableEnv.execute("test");
>}
> }
>
>
> HiveMetastoreClientFactory:
>public static HiveMetastoreClientWrapper create(HiveConf hiveConf,
> String hiveVersion) {
>   Preconditions.checkNotNull(hiveVersion, "Hive version cannot be
> null");
>   if (System.getProperty("java.security.krb5.conf") != null) {
>  if (System.getProperty("had_set_kerberos") == null) {
> String principal = "sloth/d...@bdms.163.com";
> String keytab =
> "/Users/yexianxun/dev/env/mammut-test-hive/sloth.keytab";
> try {
>sun.security.krb5.Config.refresh();
>UserGroupInformation.setConfiguration(hiveConf);
>UserGroupInformation.loginUserFromKeytab(principal, keytab);
>System.setProperty("had_set_kerberos", "true");
> } catch (Exception e) {
>LOG.error("", e);
> }
>  }
>   }
>   return new HiveMetastoreClientWrapper(hiveConf, hiveVersion);
>}
>
>
> HiveCatalog:
>private static HiveConf createHiveConf(@Nullable String hiveConfDir) {
>   LOG.info("Setting hive conf dir as {}", hiveConfDir);
>   try {
>  HiveConf.setHiveSiteLocation(
> hiveConfDir == null ?
>null : Paths.get(hiveConfDir,
> "hive-site.xml").toUri().toURL());
>   } catch (MalformedURLException e) {
>  throw new CatalogException(
> String.format("Failed to get hive-site.xml from %s",
> hiveConfDir), e);
>   }
>
>
>   // create HiveConf from hadoop configuration
>   HiveConf hiveConf = new
> HiveConf(HadoopUtils.getHadoopConfiguration(new
> org.apache.flink.configuration.Configuration()),
>  HiveConf.class);
>   try {
>  hiveConf.addResource(Paths.get(hiveConfDir,
> "hdfs-site.xml").toUri().toURL());
>  hiveConf.addResource(Paths.get(hiveConfDir,
> "core-site.xml").toUri().toURL());
>   } catch (MalformedURLException e) {
>  throw new CatalogException(String.format("Failed to get
> hdfs|core-site.xml from %s", hiveConfDir), e);
>   }
>   return hiveConf;
>}
>
>
> 在执行testCatalogQuerySink方法报以下错误:
> org.apache.flink.runtime.client.JobExecutionException: Could not retrieve
> JobResult.
>
>
> at
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:622)
> at
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:117)
> at
> org.apache.flink.table.planner.delegation.BatchExecutor.execute(BatchExecutor.java:55)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:410)
> at api.HiveCatalogTest.testCatalogQuerySink(HiveCatalogMumTest.java:234)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> 

Re: Flink向量化读取parquet

2020-01-13 文章 Kurt Young
据我所知,已经有这部分的计划了,不出意外的话应该会在 1.11 版本发布:
https://issues.apache.org/jira/browse/FLINK-11899

Best,
Kurt


On Mon, Jan 13, 2020 at 7:50 PM faaron zheng  wrote:

>
> flink使用的是hadoop中的parquetfilereader,这个貌似不支持向量化读取,hive和spark目前都支持向量化读取,请加一下flink有什么计划吗?
>


Re: 疑似ParquetTableSource Filter Pushdown bug

2020-01-08 文章 Kurt Young
如果是优化器一直卡住不能退出,那基本肯定是BUG了。请开一个issue把这些信息上传上去吧,我们会调查一下是什么问题导致的。

Best,
Kurt


On Wed, Jan 8, 2020 at 5:12 PM jun su  wrote:

> 添加代码文字:
>
> def main(args: Array[String]): Unit = {
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tableEnv = StreamTableEnvironment.create(env)
>
> val schema = 
> "{\"type\":\"record\",\"name\":\"root\",\"fields\":[{\"name\":\"log_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"city\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"log_from\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ip\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"type\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"data_source\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"is_scan\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"result\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timelong\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"is_sec\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"time_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"device\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timestamp_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"occur_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"row_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}"
> val parquetTableSource: ParquetTableSource = ParquetTableSource
> .builder
> .forParquetSchema(new 
> org.apache.parquet.avro.AvroSchemaConverter().convert(
> org.apache.avro.Schema.parse(schema, true)))
> .path("/Users/sujun/Documents/tmp/login_data")
> .build
>
> tableEnv.registerTableSource("source",parquetTableSource)
>
>
> val t1 = tableEnv.sqlQuery("select log_id,city from source where city = 
> '274' ")
> tableEnv.registerTable("t1",t1)
>
> val t4 = tableEnv.sqlQuery("select * from t1 where 
> log_id='5927070661978133'")
> t1.toAppendStream[Row].print()
>
> env.execute()
>
> }
>
>
> jun su  于2020年1月8日周三 下午4:59写道:
>
>> 你好:
>>我在使用ParquetTableSource时, 发现一些问题, 疑似是ParquetTableSource Filter
>> Pushdown的Bug, 以下是代码和描述:
>>
>> [image: 1578473593933.jpg]
>>
>> debug发现,
>> 代码卡在了: org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp方法, while
>> true循环一直出不来, 知道整合程序OOM
>>
>> [image: 1.jpg]
>>
>> 将ParquetTableSource的filter pushdown代码去掉后 , 主程序可以执行.
>> 怀疑是calcite的优化器在迭代找代价最小的plan时一直无法退出导致的
>>
>


Re: Flink SQL Count Distinct performance optimization

2020-01-07 文章 Kurt Young
Hi,

Could you try to find out what's the bottleneck of your current job? This
would leads to
different optimizations. Such as whether it's CPU bounded, or you have too
big local
state thus stuck by too many slow IOs.

Best,
Kurt


On Wed, Jan 8, 2020 at 3:53 PM 贺小令  wrote:

> hi sunfulin,
> you can try with blink planner (since 1.9 +), which optimizes distinct
> aggregation. you can also try to enable
> *table.optimizer.distinct-agg.split.enabled* if the data is skew.
>
> best,
> godfreyhe
>
> sunfulin  于2020年1月8日周三 下午3:39写道:
>
>> Hi, community,
>> I'm using Apache Flink SQL to build some of my realtime streaming apps.
>> With one scenario I'm trying to count(distinct deviceID) over about 100GB
>> data set in realtime, and aggregate results with sink to ElasticSearch
>> index. I met a severe performance issue when running my flink job. Wanner
>> get some help from community.
>>
>>
>> Flink version : 1.8.2
>> Running on yarn with 4 yarn slots per task manager. My flink task
>> parallelism is set to be 10, which is equal to my kafka source partitions.
>> After running the job, I can observe high backpressure from the flink
>> dashboard. Any suggestions and kind of help is highly appreciated.
>>
>>
>> running sql is like the following:
>>
>>
>> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
>>
>> select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as
>> clkCnt  from
>>
>> (
>>
>> SELECT
>>
>>  aggId,
>>
>>  pageId,
>>
>>  statkey,
>>
>>  COUNT(DISTINCT deviceId) as cnt
>>
>>  FROM
>>
>>  (
>>
>>  SELECT
>>
>>  'ZL_005' as aggId,
>>
>>  'ZL_UV_PER_MINUTE' as pageId,
>>
>>  deviceId,
>>
>>  ts2Date(recvTime) as statkey
>>
>>  from
>>
>>  kafka_zl_etrack_event_stream
>>
>>  )
>>
>>  GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)
>>
>> ) as t1
>>
>> group by aggId, pageId, statkey
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Best
>
>


Re: 注册table时catalog无法变更

2020-01-07 文章 Kurt Young
临时表的话只能放在指定的catalog中,不建议将临时表注册到另一个catalog,比如hive catalog。
临时表大部分情况下是不能序列化的,那样的话代码会报错。

Best,
Kurt


On Tue, Jan 7, 2020 at 9:20 PM 贺小令  wrote:

> hi,
>
> streamTableEnvironment.registerDataStream(tableName, dataStream, fields);
> 注册的表都是Temporary Table。
>
> 你可以通过:
> catalog = new InMemoryExternalCatalog(catalogName);
> streamTableEnvironment.registerCatalog(catalogName, catalog);
> catalog.createTable()
>
> 或者
> streamTableEnvironment.getCatalog().get().createTable()
>
> 的方式来注册表到指定的catalog
>
>
> xiyu...@163.com  于2020年1月7日周二 下午3:20写道:
>
> > hi,各位:
> >
> >
> 我在开发过程中,通过下面方式注册table时,默认使用的catalog是EnvironmentSettings.DEFAULT_BUILTIN_CATALOG
> > streamTableEnvironment.registerDataStream(tableName, dataStream,
> >
> fields);尝试通过下面方式解决,但是注册的table仍然在EnvironmentSettings.DEFAULT_BUILTIN_CATALOG中
> >   streamTableEnvironment.registerCatalog(catalogName, new
> > InMemoryExternalCatalog(catalogName));
> > streamTableEnvironment.useCatalog(catalogName);请问,我如何将table注册到指定的catalog?
> >
> >
> > xiyu...@163.com
> >
>


Re: Flink1.9批任务yn和ys对任务的影响

2019-12-25 文章 Kurt Young
也可以试下最新的1.10版本,这个版本里面 sql 的算子已经不再申请固定写死的内存数量,
而是根据当时 slot 能提供多少 managed 内存来自适应了。

Best,
Kurt


On Thu, Dec 26, 2019 at 1:36 PM Xintong Song  wrote:

> slot需要多少内存是和具体作业相关的,不同作业差别会比较大。
>
> slot的资源需求是根据所有算子的资源需求相加得到的,如果你对你的作业用到了哪些算子比较了解的话,可以根据算子的资源需求推算出来。
> 算子的默认资源需求可以参考 [1],里面有五个“table.exec.resource.*”的配置项,也可以调整这些配置项来更改算子使用的内存。
>
> 如果对作业使用到的算子不是很了解的话,那比较简单的办法还是直接提交作业试试看,去日志里面搜"Request slot with
> profile"就能够看到slot的资源需求。
>
> Thank you~
>
> Xintong Song
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html#execution-options
>
> On Thu, Dec 26, 2019 at 11:36 AM faaron zheng 
> wrote:
>
> > 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed
> > memory为2g,也就是一个slot平均200m,所以任务没调度起来。
> > 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱:
> > faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月26日 11:23,faaron zheng 写道:
> > 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed
> > memory为2g,也就是一个slot平均200m,所以任务没调度起来。
> > 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱:
> > faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月25日 11:30,Xintong Song 写道:
> > Hi faaron, Flink 1.9 中 -yn参数应该是不生效的,后续版本中已经删除了这个参数。 根据你的参数,在每个 TM
> > 的内存为30G不变的情况下,每个 TM 中的slot个数(-ys)从5变成10,也就意味着平均每个slot占用的内存变为了原来的一半。 Flink
> > 1.9 的sql batch 算子对 flink managed memory 是有确定的需求的,很可能是这个变化导致单个 slot
> 的managed
> > memory无法满足算子的资源需求了。 Thank you~ Xintong Song On Wed, Dec 25, 2019 at 11:09
> > AM faaron zheng  wrote: > 跑tpcds的query1: flink
> run
> > -m yarn-cluster -d -p 100 -yn 20 -ys 5 -yjm 60g > -ytm 30g 任务可以正常执行 flink
> > run -m yarn-cluster -d -p 100 -yn 10 -ys 10 -yjm > 60g -ytm 30g
> > 任务在做hashjoin的时候就会失败 报错是No pooled slot available and request to >
> > ResourceManager for new slot failed 搞不懂这有啥关系,求指教 faaron zheng 邮箱: >
> > faaronzh...@gmail.com 签名由 网易邮箱大师 定制
>


Re: Flink1.9.1的SQL向前不兼容的问题

2019-12-13 文章 Kurt Young
Hi,

建议你翻译成英文然后到jira里建个issue。

Best,
Kurt


On Thu, Dec 12, 2019 at 11:39 PM 李佟  wrote:

> 近期进行Flink升级,将原来的程序从老的集群(1.8.0运行正常)迁移到新的集群(1.9.1)中。在部署程序的时候发现在1.9.1的集群中,原来运行正常的Flink
> SQL的程序无法执行,异常如下:
>
>
> org.apache.flink.table.api.ValidationException: *Window can only be
> defined over a time attribute column.*
> at
> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85)
>
> at
> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:90)
>
>
>
> 跟踪到反编译的scala文件并设置断点,发现下图中的红框部分没有执行,直接跳过。
>
>
> 功能很简单,就是在某个时间窗对数值求和。测试用例如下:
>
>
> package org.flowmatrix.isp.traffic.accounting.test;
>
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.scala.typeutils.Types;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
> org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
> import org.apache.flink.streaming.api.watermark.Watermark;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.TableSchema;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.sinks.CsvTableSink;
> import org.apache.flink.table.sinks.TableSink;
> import org.apache.flink.table.sources.DefinedRowtimeAttributes;
> import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
> import org.apache.flink.table.sources.StreamTableSource;
> import org.apache.flink.table.sources.tsextractors.ExistingField;
> import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
> import org.apache.flink.types.Row;
> import org.junit.Test;
>
> import javax.annotation.Nullable;
> import java.sql.Timestamp;
> import java.util.ArrayList;
> import java.util.Collections;
> import java.util.List;
>
> public class TestSql {
> @Test
> public void testAccountingSql() {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.setParallelism(1);
>
> try {
> StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(env);
>
> SimpleTableSource source = new SimpleTableSource();
> Table t = tableEnv.fromTableSource(source);
>
> String interval = "5"; //5 second
> System.out.println("source schema is " +
> source.getTableSchema());
>
> Table sqlResult = tableEnv.sqlQuery("SELECT " +
> " TUMBLE_START(UserActionTime, INTERVAL '" + interval
> + "' SECOND) as rowTime, " +
> " Username," +
> " SUM(Data) as Data " +
> " FROM  " + t +
> " GROUP BY TUMBLE(UserActionTime, INTERVAL '" +
> interval + "' SECOND),Username");
>
>
> String[] fieldNames = {
> "rowTime",
> "Username", "Data"};
> TypeInformation[] fieldTypes = {
> TypeInformation.of(Timestamp.class),
> TypeInformation.of(String.class),
> TypeInformation.of(Long.class)};
>
> TableSink sink1 = new CsvTableSink("/tmp/data.log", ",");
> sink1 = sink1.configure(fieldNames, fieldTypes);
> tableEnv.registerTableSink("EsSinkTable", sink1);
> System.out.println("sql result schema is " +
> sqlResult.getSchema());
>
> tableEnv.sqlUpdate("insert into EsSinkTable select  " +
> "rowTime,Username,Data from " + sqlResult + "");
>
> env.execute("test");
> } catch (Exception e) {
> e.printStackTrace();
> System.err.println("start program error. FlowMatrix
> --zookeeper  --config " +
> " --name  --interval 
> --indexName ");
> System.err.println(e.toString());
> return;
> }
> }
>
> public static class SimpleTableSource implements
> StreamTableSource, DefinedRowtimeAttributes {
> @Override
> public DataStream getDataStream(StreamExecutionEnvironment
> env) {
> return
> env.fromCollection(genertateData()).assignTimestampsAndWatermarks(new
> AssignerWithPunctuatedWatermarks() {
> private long lastWaterMarkMillSecond = -1;
> private long waterMarkPeriodMillSecond = 1000;
> @Nullable
> @Override
> public Watermark checkAndGetNextWatermark(Row lastElement,
> long extractedTimestamp) {
> if(extractedTimestamp - lastWaterMarkMillSecond >=
> waterMarkPeriodMillSecond){
> lastWaterMarkMillSecond = 

Re: [Discuss] What should the "Data Source" be translated into Chinese

2019-08-13 文章 Kurt Young
"更倾向不去翻译Data Source和Data Sink, 通过用中文对其做解释即可"   +1

Best,
Kurt


On Tue, Aug 13, 2019 at 6:18 PM Simon Su  wrote:

> 更倾向不去翻译Data Source和Data Sink, 通过用中文对其做解释即可
>
>
> Thanks,
> SImon
>
>
> On 08/13/2019 18:07, wrote:
> How about translate  "data sink" into “数据漕”
> 漕,读作:cáo。汉字基本字义指通过水道运输粮食:漕运|漕粮。==>
> https://baike.baidu.com/item/%E6%BC%95?forcehttps=1%3Ffr%3Dkg_hanyu
>
>
>
> - 原始邮件 -
> 发件人:Kurt Young 
> 收件人:dev , user-zh 
> 主题:Re: [Discuss] What should the "Data Source" be translated into Chinese
> 日期:2019年08月13日 16点44分
>
> cc user-zh mailing list, since there are lots of chinese speaking people.
> Best,
> Kurt
> On Tue, Aug 13, 2019 at 4:02 PM WangHengwei  wrote:
> Hi all,
>
>
> I'm working on [FLINK-13405] Translate "Basic API Concepts" page into
> Chinese. I have a problem.
>
> Usually we translate "Data Source" into "数据源" but there is no agreed
> translation for "Data Sink". Since it often appears in documents, I think
> we'd better to have a unified translation. I have some alternatives, e.g.
> "数据沉淀","数据归" or "数据终".
>
> Committer Xingcan Cui has a good suggestion for "数据汇" which
> corresponds to source ("数据源"). I asked Committer Jark Wu, he is also fine
> with it. I think "数据汇" is a good representation of flow charactiristics so
> I would like to use it.
>
>
> I want to hear more thoughts from the community whether we should
> translate it and what it should be translated into.
>
>
> Thanks,
> WangHW
>


Re: [Discuss] What should the "Data Source" be translated into Chinese

2019-08-13 文章 Kurt Young
cc user-zh mailing list, since there are lots of chinese speaking people.
Best,
Kurt


On Tue, Aug 13, 2019 at 4:02 PM WangHengwei  wrote:

> Hi all,
>
>
> I'm working on [FLINK-13405] Translate "Basic API Concepts" page into
> Chinese. I have a problem.
>
> Usually we translate "Data Source" into "数据源" but there is no agreed
> translation for "Data Sink". Since it often appears in documents, I think
> we'd better to have a unified translation. I have some alternatives, e.g.
> "数据沉淀","数据归" or "数据终".
>
>  Committer Xingcan Cui has a good suggestion for "数据汇" which
> corresponds to source ("数据源"). I asked Committer Jark Wu, he is also fine
> with it. I think "数据汇" is a good representation of flow charactiristics so
> I would like to use it.
>
>
> I want to hear more thoughts from the community whether we should
> translate it and what it should be translated into.
>
>
> Thanks,
> WangHW


Re: Blink在Hive表没有统计信息的情况下如何优化

2019-05-28 文章 Kurt Young
你先试试把HashJoin这个算子禁用看看,TableConfig里添加这个配置

sql.exec.disabled-operators: HashJoin

Best,
Kurt


On Tue, May 28, 2019 at 3:23 PM bigdatayunzhongyan <
bigdatayunzhong...@aliyun.com> wrote:

> 感谢 @Kurt Young 大神的回复,报错信息在附件。谢谢!
>
>
> 在2019年05月28日 14:10,Kurt Young  写道:
>
> 没有统计信息确实很难生成比较靠谱的执行计划,这也是之前很多DBA的工作 ;-)
>
> 你可以试试以下以下操作:
> 1. 如果是join顺序不合理,可以手动调整sql中的join顺序,并且关闭join reorder
> 2.
> 看看fail的具体原因,如果是个别比较激进的算子表现不好,比如HashAggregate、HashJoin,你可以手动禁止掉这些算子,选择性能稍差但可能执行起来更稳健的算子,比如SortMergeJoin
>
> 这是我拍脑袋想的,具体的建议你先分析一下SQL为什么会fail,然后贴出具体的问题来。
>
> 另外,我们正在开发SQL hint功能,可以有效缓解类似问题。
>
> Best,
> Kurt
>
>
> On Tue, May 28, 2019 at 12:32 PM bigdatayunzhongyan <
> bigdatayunzhong...@aliyun.com> wrote:
>
>>  @Kurt Young  @Jark Wu  @Bowen Li
>> 先描述下我这边的情况,一些简单的SQL无论是否有统计的信息的情况下都能执行成功,
>> 但是一些复杂的SQL在没有统计信息的情况下一直执行失败,尝试开启各种参数,都失败了,很痛苦,
>> 不过在设置统计信息及开启相关参数后很轻松就能执行成功(点赞)。
>> 我们线上的情况是很多表信息都没有统计信息,请问有哪些优化的办法。
>>
>> 启动命令:./bin/yarn-session.sh -n 50 -s 20 -jm 3072 -tm 6144 -d
>> 配置见附件
>> 谢谢!
>>
>>


Re: 结邮 Re: Re: 请教一下Blink资源分配问题

2019-03-29 文章 Kurt Young
Blink是基于Flink 1.5.1做的二次开发,可能存在部分配置和最新版Flink不一致的情况。Sorry 让你踩坑了。

Best,
Kurt


On Fri, Mar 29, 2019 at 5:52 PM 邓成刚【qq】  wrote:

> 终于发现是什么问题了,是由于Blink的 配置与FLINK不同导致:
> Flink 里没有这个配置:  taskmanager.cpu.core  默认是   1
>
> 另外:blink 里 taskmanager.heap.mb   与 flink 的 taskmanager.heap.size  不同导致
>   taskmanager.heap 配置过小,默认1G
>
> 之前错误的配置:
> # The heap size for the JobManager JVM
>
> jobmanager.heap.size: 20480m
>
>
> # The heap size for the TaskManager JVM
>
> taskmanager.heap.size: 40960m
>
>
> # The number of task slots that each TaskManager offers. Each slot runs
> one parallel pipeline.
>
> taskmanager.numberOfTaskSlots: 24
>
>
>
> 现在配置:
>
>
> # The heap size for the JobManager JVM
>
> jobmanager.heap.mb: 20480
>
>
> # The heap size for the TaskManager JVM
>
> taskmanager.heap.mb: 102400
>
>
> # How many physical cpu cores a task manager will supply for user
>
> taskmanager.cpu.core: 30
>
>
> # The number of task slots that each TaskManager offers. Each slot runs
> one parallel pipeline.
>
> taskmanager.numberOfTaskSlots: 30
>
>
>
> 之前安装了Flink 1.7.2,后又安装 了 Blink,我直接拿的Flink 的配置,没注意看,在此希望我的经历能给大家一些提醒,谢谢。
>
> 在此 感谢龙三 的帮忙,和  谢谢大家的回复。。
>
> 邓成刚【qq】
>
> 发件人: 邓成刚【qq】
> 发送时间: 2019-03-29 17:18
> 收件人: user-zh
> 主题: Re: Re: 请教一下Blink资源分配问题
> 用的是table , LOG里资源申请情况:
>
> ResourceProfile{cpuCores=0.6, heapMemoryInMB=96, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=2, MANAGED_MEMORY_MB=0.0,
> FLOATING_MANAGED_MEMORY_MB=0.0} from resource manager (request =
> 5ac9229acae1e6ef90563a5a0bf3fe21).
> ResourceProfile{cpuCores=0.6, heapMemoryInMB=96, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=2, MANAGED_MEMORY_MB=0.0,
> FLOATING_MANAGED_MEMORY_MB=0.0} from resource manager (request =
> 2e39ec9bb22b2afe2baa15a87d854796).
> ResourceProfile{cpuCores=0.6, heapMemoryInMB=96, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=2, MANAGED_MEMORY_MB=0.0,
> FLOATING_MANAGED_MEMORY_MB=0.0} for job e40415828bbe184dc051e422e85e98c3
> with allocation id 1a8a3b10d3c235dcb64b4b91e0a22bc8.
> ResourceProfile{cpuCores=0.6, heapMemoryInMB=96, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=2, MANAGED_MEMORY_MB=0.0,
> FLOATING_MANAGED_MEMORY_MB=0.0} from resource manager (request =
> ce1a10fca1c7a9f6f93dcd248ebce56c).
> ResourceProfile{cpuCores=0.6, heapMemoryInMB=96, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=2, MANAGED_MEMORY_MB=0.0,
> FLOATING_MANAGED_MEMORY_MB=0.0} for job e40415828bbe184dc051e422e85e98c3
> with allocation id b9ea06e5ba61013ead29424b72e8535c.
> ResourceProfile{cpuCores=0.6, heapMemoryInMB=96, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=2, MANAGED_MEMORY_MB=0.0,
> FLOATING_MANAGED_MEMORY_MB=0.0} for job e40415828bbe184dc051e422e85e98c3
> with allocation id c3742a90378a264341e8d1a573c67535.
>
>
> 发件人: Guowei Ma
> 发送时间: 2019-03-29 17:12
> 收件人: user-zh
> 主题: Re: Re: 请教一下Blink资源分配问题
> 用的什么api,DataStream还是Table?
> 如果是DataStream的话,申请什么样的资源?
> 详细列下。
>
> Best,
> Guowei
>
>
> 邓成刚【qq】  于2019年3月29日周五 下午5:09写道:
>
> > 是的。
> >
> > 发件人: moxian
> > 发送时间: 2019-03-29 17:06
> > 收件人: user-zh
> > 主题: Re: 请教一下Blink资源分配问题
> > standalone 模式?
> >
> > 邓成刚【qq】  于2019年3月29日周五 上午9:59写道:
> >
> > > 请教一下Blink资源分配问题:
> > > blink 任务并行度设置 20  提示0个满足:Batch request 40 slots, but only 0 are
> > fulfilled.
> > > 调整到 3 并行度 提示:Batch request 6 slots, but only 4 are fulfilled.,
> > > 但是我的TASK SLOTS有配 48,没有其它任务,
> > > 按理説没有资源问题啊,集群配置情况:
> > >
> > > 其它的都是默认配置:
> > >
> > > taskmanager.numberOfTaskSlots: 24
> > >
> > > jobmanager.heap.size: 20480m
> > >
> > > # The heap size for the TaskManager JVM
> > >
> > > taskmanager.heap.size: 40960m
> > >
> > >
> > > 服务器 2 台:每台 48核,256G
> > >
> > >
> > >


Re: blink开源版本维表关联时开启缓存方式

2019-03-29 文章 Kurt Young
当时没有想清楚如何把Cache当成一个public的接口向外提供,它更像是一些实现上的特定优化。
后续我们在flink master上实现维表join的时候,会把这个问题考虑进去。

Best,
Kurt


On Fri, Mar 29, 2019 at 5:09 PM moxian  wrote:

> 这么好的一个优化,为啥被拿掉了呢?
>
> Kurt Young  于2019年3月29日周五 上午9:39写道:
>
> > Hi,
> >
> > Blink开源的时候把Cache的实现暂时拿掉了,你可以根据自己的需要自己实现一个cache。
> >
> > Best,
> > Kurt
> >
> >
> > On Wed, Mar 27, 2019 at 4:44 PM 苏 欣  wrote:
> >
> > > 我在ppt里面看到这些内容,但是在开源的blink里面没有找到相关的配置,请问各位老师应该如何开启缓存策略?
> > >
> > >
> > >
> > > 发送自 Windows 10 版邮件 <https://go.microsoft.com/fwlink/?LinkId=550986>应用
> > >
> > >
> > >
> >
>


Re: blink开源版本维表关联时开启缓存方式

2019-03-28 文章 Kurt Young
Hi,

Blink开源的时候把Cache的实现暂时拿掉了,你可以根据自己的需要自己实现一个cache。

Best,
Kurt


On Wed, Mar 27, 2019 at 4:44 PM 苏 欣  wrote:

> 我在ppt里面看到这些内容,但是在开源的blink里面没有找到相关的配置,请问各位老师应该如何开启缓存策略?
>
>
>
> 发送自 Windows 10 版邮件 应用
>
>
>


Re: flink疑问

2019-03-25 文章 Kurt Young
大家都好热情啊~

@IORI,这个问题取决于你是要把一个流复制成两个流分别套用不用的处理逻辑呢,还是说是要把数据根据一定的规则分开成两个流。
如果是复制的话,用@邓成刚 的方法就可以
如果是要进行数据分割的话,那用split或者sideoutput都行

Best,
Kurt


On Tue, Mar 26, 2019 at 10:45 AM Yun Chen  wrote:

> split官方好像是不建议使用了,建议使用   Side Outputs
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/side_output.html
>
>
> 下面是示例参考
>
>
> val xOutputTag = OutputTag[String]("xx-side-output")
> val xxOutputTag = OutputTag[String]("xx-side-output")
>
> val xxx = xxx.process(new ProcessFunction[String, String] {
>   override def processElement(i: String, context: ProcessFunction[String,
> String]#Context,
>   collector: Collector[String]): Unit = {
>
> ...
>
> arrData(0) = channel
> arrData(1) = tboxinfo
>
> collector.collect(parse)
>
> context.output(channelOutputTag,String.valueOf(arrData(0)))
> context.output(eventOutputTag,arrData(1))
>
>   }
>   })
>
> val xStream = ouputStream.getSideOutput(xOutputTag)
> val xxStream = ouputStream.getSideOutput(xxOutputTag)
>
> Best,
> YunKillere
>
> 
> 发件人: 戴嘉诚 
> 发送时间: 2019年3月25日 19:26
> 收件人: user-zh@flink.apache.org
> 主题: 答复: flink疑问
>
> 使用 Split 算子把流根据特定条件拆分成两个或者更多,然后在用select算子从拆分流中选择对应的拆分流做处理即可。
> 可以看看文档上,有介绍用法
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/
>
> 发件人: baiyg25...@hundsun.com
> 发送时间: 2019年3月26日 10:10
> 收件人: user-zh
> 主题: 回复: flink疑问
>
> 一个算子出来两个流好像不能吧。
> 要想实现你说的,可以先基于A流过滤生成要进行B算子的流,基于A流过滤生成要进行C算子的流。
>
>
>
> baiyg25...@hundsun.com
>
> 发件人: IORI
> 发送时间: 2019-03-26 09:46
> 收件人: user-zh
> 主题: flink疑问
>
> 请问:数据流通过算子A时,我想分裂成两个数据流,一个数据流进行算子B操作然后sink,另外一个数据流需要先进行算子C操作,再reduce然后sink,请问这种情况应该如何处理?一个operator中能出来两个不同的数据流吗?
>
>


Re: 欢迎来到 Apache Flink 社区

2019-01-29 文章 Kurt Young
继续测试一把


On Tue, Jan 29, 2019 at 5:44 PM Kurt Young  wrote:

> 欢迎来到 Apache Flink 社区
>


欢迎来到 Apache Flink 社区

2019-01-29 文章 Kurt Young
欢迎来到 Apache Flink 社区


大家好!

2019-01-29 文章 Kurt Young
测试(test)