回复: StreamTableEnvironment.registerDatastream() 开放用户自定义的schemaDescriptionh和DeserializationSchema

2020-01-03 Thread aven . wu
Hi Jingsong
感谢指点,使用DataStream 解决了我目前的问题。
对于RowTypeInfo的设置可能有些隐晦(指在创建Datastream时就需要指定)。
希望之后对tableenv.registerStream 
API能有更好更直接的方式来设置RowTypeInfo以及一些相关可能的信息。(包括注册Datastream, 
Datastream, Datastream)

Best,
Aven

发件人: JingsongLee
发送时间: 2019年12月31日 17:03
收件人: user-zh
主题: Re: StreamTableEnvironment.registerDatastream() 
开放用户自定义的schemaDescriptionh和DeserializationSchema

Hi aven,

这是个合理的需求。
现在的问题是:
- Flink table只支持Row, Pojo, Tuple, CaseClass作为结构化的数据类型。
- 
而你的类型是JSONObject,它其实也是一个结构化的数据类型,但是目前Flink不支持它,所以可以考虑有这样的DeserializationSchema机制来支持它。

但是我理解其实没有差别多少,比如你提供RowDeserializationSchema,其实就是JSONObject到Row的转换,那你完全可以把这个套在DataStream.map中,把它转换成Flink
 table支持的结构化类型。

Best,
Jingsong Lee


--
From:aven.wu 
Send Time:2019年12月31日(星期二) 14:09
To:user-zh@flink.apache.org 
Subject:回复: StreamTableEnvironment.registerDatastream() 
开放用户自定义的schemaDescriptionh和DeserializationSchema

你好!
“把 JSONObject类型定义成object类型” 可以解决在确定字段和类型的情况下并且需要编码到程序中。
如果能开放这部分的能力,可以不通过编码(新增POJO)的方式来完成一个Datastream 到 stream 的table注册。

best wish
发送自 Windows 10 版邮件应用

发件人: Terry Wang
发送时间: 2019年12月30日 12:37
收件人: user-zh@flink.apache.org
主题: Re: StreamTableEnvironment.registerDatastream() 
开放用户自定义的schemaDescriptionh和DeserializationSchema

你这种需求的一种解决思路,可以把 
JSONObject类型定义成object类型,然后注册成table之后通过一个UDTF把JSONObject转换成特定的schema。

Best,
Terry Wang



> 2019年12月27日 19:56,aven.wu  写道:
> 
> StreamTableEnvironment.registerDatastream(),目前只支持pojo 或者 
> 是public属性的对象,根据默认字段或者用户指定的字段注册table,但某些场景下没有固定的格式,比如使用JSONObject类型DataStream就无法通过这种方法注册成table,是否可以提供更底层的API来使table注册的灵活性更高。用户传入schema的描述和自定义的解析器DeserializationSchema.
> 
> 
> 发送自 Windows 10 版邮件应用
> 





Re:Re: Re: SQL层应用维表join jdbc的时候,请问怎么动态感知维表数据的变化呢?

2020-01-03 Thread amenhub
hi BenChao,


正是您所说的计算列特性,期待1.10版本的正式发布!非常感谢!


祝好!







在 2020-01-04 15:35:24,"Benchao Li"  写道:
>hi 世民,
>
>这个错误的意思是你的stream table里面需要有一个处理时间字段,目前为止我大概了解的有这么几种方法可以产生:
>1. 如果是从DataStream注册为Table的话,可以用:
>https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion
>2. 如果是用TableSource注册的Table的话,可以用:
>https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/time_attributes.html#using-a-tablesource
>3. 即将发布的1.10里面支持计算列,可以通过DDL直接声明一个处理时间字段:
>https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html
>
>此外,维表join的场景对mysql没有要求要支持Temporal Table,这个是Flink
>SQL负责解析处理的,最终请求mysql的是一个普通的select语句。
>
>amenhub  于2020年1月4日周六 下午1:55写道:
>
>>
>>
>> hi Benchao,
>>
>>
>> 我明白你的意思,我认真在看官方文档学习flink相关知识,知道目前temporal table join只支持processing-time,
>> 但是当我使用给出的join sql例子写法时,报出这个异常,[  Column 'proctime' not found in table 'o'
>> ],这个问题是传统的通过别名 [ o ]去找 [ o ]表中的proctime列,但是显然表中是没有proctime列的,
>> 请问我该怎么解决这个问题呢?我应该去了解熟悉哪方面的知识?Mysql支持ANSI-2011标准的Temporal table语义吧,请赐教~
>>
>>
>> 祝好
>>
>>
>>
>>
>>
>>
>> 在 2020-01-04 12:10:34,"Benchao Li"  写道:
>> >hi 世民,
>> >
>> >邮件列表里不支持直接发送图片,你可以用一些图床工具来发送图片。
>>
>> >根据你的描述,我猜测你应该是join维表的语法写的不对,写成了普通的join的方式。这种情况下,会把jdbc的表解析成`JDBCInputFormat`,一次性读取全部数据。
>> >维表join的SQL写法如下所示:
>> >
>> >SELECT
>> >  o.amout, o.currency, r.rate, o.amount * r.rateFROM
>> >  Orders AS o*  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
>> >*  ON r.currency = o.currency
>> >
>> >详细内容可以参考文档:
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#joins
>> >
>> >刘世民  于2020年1月4日周六 上午11:27写道:
>> >
>> >> hi~
>> >> 如图所示,在做kafka和jdbc
>> >> join的时候,jdbc数据全量加载并为Finished状态,这种情况下请问怎么感知jdbc表的数据更新呢?还是我哪里的配置不对,还请赐教
>> >>
>> >> 小白敬上~
>> >>
>> >>
>> >>
>> >>
>> >
>> >
>> >--
>> >
>> >Benchao Li
>> >School of Electronics Engineering and Computer Science, Peking University
>> >Tel:+86-15650713730
>> >Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>
>
>
>-- 
>
>Benchao Li
>School of Electronics Engineering and Computer Science, Peking University
>Tel:+86-15650713730
>Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: Migrate custom partitioner from Flink 1.7 to Flink 1.9

2020-01-03 Thread Salva Alcántara
Thanks Chesnay! Just to be clear, this how my current code looks like:

```
unionChannel = broadcastChannel.broadcast().union(singleChannel)

result = new DataStream<>(
unionChannel.getExecutionEnvironment(),
new PartitionTransformation<>(unionChannel.getTransformation(), new
MyDynamicPartitioner())   
)
```

The problem when migrating to Flink 1.9 is that MyDynamicPartitioner cannot
handle broadcasted elements as explained in the question description. So,
based on your reply, I guess I could do something like this:

```
resultSingleChannel = new DataStream<>(
singleChannel.getExecutionEnvironment(),
new PartitionTransformation<>(singleChannel.getTransformation(), new
MyDynamicPartitioner())   
)

result = broadcastChannel.broadcast().union(resultSingleChannel)
```

I will give it a try and see if it works.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re:Re: SQL层应用维表join jdbc的时候,请问怎么动态感知维表数据的变化呢?

2020-01-03 Thread amenhub


hi Benchao,


我明白你的意思,我认真在看官方文档学习flink相关知识,知道目前temporal table join只支持processing-time,
但是当我使用给出的join sql例子写法时,报出这个异常,[  Column 'proctime' not found in table 'o' 
],这个问题是传统的通过别名 [ o ]去找 [ o ]表中的proctime列,但是显然表中是没有proctime列的,
请问我该怎么解决这个问题呢?我应该去了解熟悉哪方面的知识?Mysql支持ANSI-2011标准的Temporal table语义吧,请赐教~


祝好






在 2020-01-04 12:10:34,"Benchao Li"  写道:
>hi 世民,
>
>邮件列表里不支持直接发送图片,你可以用一些图床工具来发送图片。
>根据你的描述,我猜测你应该是join维表的语法写的不对,写成了普通的join的方式。这种情况下,会把jdbc的表解析成`JDBCInputFormat`,一次性读取全部数据。
>维表join的SQL写法如下所示:
>
>SELECT
>  o.amout, o.currency, r.rate, o.amount * r.rateFROM
>  Orders AS o*  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
>*  ON r.currency = o.currency
>
>详细内容可以参考文档:
>https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#joins
>
>刘世民  于2020年1月4日周六 上午11:27写道:
>
>> hi~
>> 如图所示,在做kafka和jdbc
>> join的时候,jdbc数据全量加载并为Finished状态,这种情况下请问怎么感知jdbc表的数据更新呢?还是我哪里的配置不对,还请赐教
>>
>> 小白敬上~
>>
>>
>>
>>
>
>
>-- 
>
>Benchao Li
>School of Electronics Engineering and Computer Science, Peking University
>Tel:+86-15650713730
>Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: SQL层应用维表join jdbc的时候,请问怎么动态感知维表数据的变化呢?

2020-01-03 Thread Benchao Li
hi 世民,

邮件列表里不支持直接发送图片,你可以用一些图床工具来发送图片。
根据你的描述,我猜测你应该是join维表的语法写的不对,写成了普通的join的方式。这种情况下,会把jdbc的表解析成`JDBCInputFormat`,一次性读取全部数据。
维表join的SQL写法如下所示:

SELECT
  o.amout, o.currency, r.rate, o.amount * r.rateFROM
  Orders AS o*  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
*  ON r.currency = o.currency

详细内容可以参考文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#joins

刘世民  于2020年1月4日周六 上午11:27写道:

> hi~
> 如图所示,在做kafka和jdbc
> join的时候,jdbc数据全量加载并为Finished状态,这种情况下请问怎么感知jdbc表的数据更新呢?还是我哪里的配置不对,还请赐教
>
> 小白敬上~
>
>
>
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


SQL层应用维表join jdbc的时候,请问怎么动态感知维表数据的变化呢?

2020-01-03 Thread 刘世民
hi~
如图所示,在做kafka和jdbc 
join的时候,jdbc数据全量加载并为Finished状态,这种情况下请问怎么感知jdbc表的数据更新呢?还是我哪里的配置不对,还请赐教


小白敬上~

Re: Controlling the Materialization of JOIN updates

2020-01-03 Thread Kurt Young
Hi Benoît,

Before discussing all the options you listed, I'd like understand more
about your requirements.

The part I don't fully understand is, both your fact (Event) and dimension
(DimensionAtJoinTimeX) tables are
coming from the same table, Event or EventRawInput in your case. So it will
result that both your fact and
dimension tables are changing with time.

My understanding is, when your DimensionAtJoinTimeX table emit the results,
you don't want to change the
result again. You want the fact table only join whatever data currently the
dimension table have? I'm asking
because your dimension table was calculated with a window aggregation, but
your join logic seems doesn't
care about the time attribute (LEFT JOIN DimensionAtJoinTime1 d1 ON e.uid =
d1.uid). It's possible that
when a record with uid=x comes from Event table, but the dimension table
doesn't have any data around
uid=x yet due to the window aggregation. In this case, you don't want them
to join?

Best,
Kurt


On Fri, Jan 3, 2020 at 1:11 AM Benoît Paris <
benoit.pa...@centraliens-lille.org> wrote:

> Hello all!
>
> I'm trying to design a stream pipeline, and have trouble controlling when
> a JOIN is triggering an update:
>
> Setup:
>
>- The Event table; "probe side", "query side", the result of earlier
>stream processing
>- The DimensionAtJoinTimeX tables; of updating nature, "build side",
>the results of earlier stream processing
>
> Joining them:
>
> SELECT*
> FROM  Event e
> LEFT JOIN DimensionAtJoinTime1 d1
>   ON  e.uid = d1.uid
> LEFT JOIN DimensionAtJoinTime2 d2
>   ON  e.uid = d2.uid
>
> The DimensionAtJoinTimeX Tables being the result of earlier stream
> processing, possibly from the same Event table:
>
> SELECT   uid,
>  hop_start(...),
>  sum(...)
> FROM Event e
> GROUP BY uid,
>  hop(...)
>
> The Event Table being:
>
> SELECT ...
> FROM   EventRawInput i
> WHERE  i.some_field = 'some_value'
>
> Requirements:
>
>- I need the JOINs to only be executed once, only when a new line is
>appended to the probe / query / Event table.
>- I also need the full pipeline to be defined in SQL.
>- I very strongly prefer the Blink planner (mainly for Deduplication,
>TopN and LAST_VALUE features).
>
> Problem exploration so far:
>
>- Option 1, "FOR SYSTEM_TIME AS OF" [1]: I need to have the solution
>in SQL: it doesn't work out. But I might explore the following: insert
>DimensionAtJoinTimeX into a special Sink, and use it in a
>LookupableTableSource (I'm at a loss on how to do that, though. Do I need
>an external kv store?).
>- Option 2, "FOR SYSTEM_TIME AS OF" [1], used in SQL: Is there a
>version of "FOR SYSTEM_TIME AS OF" readily usable in SQL? I might have
>missed something in the documentation.
>- Option 3, "LATERAL TABLE table_function" [2], on the Legacy planner:
>It does not work with two tables [3], and I don't get to have the Blink
>planner features.
>- Option 4, "LATERAL TABLE table_function" [2], on the Blink planner:
>It does not work with the "probe side" being the results of earlier stream
>processing [4].
>- Option 5, let a regular JOIN materialize the updates, and somehow
>find how to filter the ones coming from the build sides (I'm at a loss on
>how to do that).
>- Option 6, "TVR": I read this paper [5], which mentions "Time-Varying
>Relation"s; Speculating here: could there be a way, to say that the build
>side is not a TVR. Aka declare the stream as being somehow "static", while
>still being updated (but I guess we're back to "FOR SYSTEM_TIME AS OF").
>- Option 7: Is there some features being developed, or hints, or
>workarounds to control the JOIN updates that I have not considered so far?
>- Remark 1: I believe that FLINK-15112 and FLINK-14200 are of the same
>bug nature, even though they occur in different situations on different
>planners (same Exception Stack Trace on files that have the same historical
>parent before the Blink fork). FLINK-15112 has a workaround, but
>FLINK-14200 does not. The existence of that workaround IMHO signals that
>there is a simple fix for both bugs. I have tried to find it in Flink for a
>few days, but no success so far. If you guys have pointers helping me
>provide a fix, I'll gladly listen. So far I have progressed to: It revolves
>around Calcite-based Flink streaming rules transforming a temporal table
>function correlate into a Join on 2*Scan, and crashes when it encounters
>something that is not a table that can be readily scanned. Also, there are
>shenanigans on trying to find the right schema in the Catalog. But I am
>blocked now, and not accustomed to the Flink internal code (would like to
>though, if Alibaba/Ververica are recruiting remote workers, wink wink,
>nudge nudge).
>
> All opinions very much welcomed on all Options and Remarks!
>
> Cheers, and a 

Re: Flink group with time-windowed join

2020-01-03 Thread Kurt Young
Looks like a bug to me, could you fire an issue for this?

Best,
Kurt


On Thu, Jan 2, 2020 at 9:06 PM jeremyji <18868129...@163.com> wrote:

> Two stream as table1, table2. We know that group with regular join won't
> work
> so we have to use time-windowed join. So here is my flink sql looks like:
>
> *SELECT
> a.account account,
> SUM(a.value) + SUM(b.value),
> UNIX_TIMESTAMP(TUMBLE_START(a.producer_timestamp, INTERVAL '3'
> MINUTE))
> FROM
> (SELECT
> account,
> value,
> producer_timestamp
> FROM
> table1) a,
> (SELECT
> account,
> value,
> producer_timestamp
> FROM
> table2) b
> WHERE
> a.account = b.account AND
> a.producer_timestamp BETWEEN b.producer_timestamp - INTERVAL '3'
> MINUTE AND b.producer_timestamp)
> group by
> a.account,
> TUMBLE(a.producer_timestamp, INTERVAL '3' MINUTE)*
> But i still got error from flink:
>
> /Rowtime attributes must not be in the input rows of a regular join. As a
> workaround you can cast the time attributes of input tables to TIMESTAMP
> before.
> Please check the documentation for the set of currently supported SQL
> features.
> at
>
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:450)
> at
>
> org.apache.flink.table.api.TableEnvironment.optimizePhysicalPlan(TableEnvironment.scala:369)
> at
>
> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:814)
> at
>
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
> at
>
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
> at
>
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:1048)
> at
>
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:962)
> at
>
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:922)
> /
> I think i use time-windowed join just like this doc
> says:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#time-windowed-joins
> .
> But flink told me its a regular join. Is there anything wrong i haven't
> notice?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Duplicate tasks for the same query

2020-01-03 Thread Kurt Young
Hi RKandoji,

It looks like you have a data skew issue with your input data. Some or
maybe only one "userId" appears more frequent than others. For join
operator to work correctly, Flink will apply "shuffle by join key" before
the
operator, so same "userId" will go to the same sub-task to perform join
operation. In this case, I'm afraid there is nothing much you can do for
now.

BTW, for the DeDuplicate, do you keep the latest record or the earliest? If
you keep the latest version, Flink will tigger retraction and then send the
latest
record again every time when your user table changes.

Best,
Kurt


On Sat, Jan 4, 2020 at 5:09 AM RKandoji  wrote:

> Hi,
>
> Thanks a ton for the help with earlier questions, I updated code to
> version 1.9 and started using Blink Planner (DeDuplication). This is
> working as expected!
>
> I have a new question, but thought of asking in the same email chain as
> this has more context about my use case etc.
>
> Workflow:
> Currently I'm reading from a couple of Kafka topics, DeDuplicating the
> input data, performing JOINs and writing the joined data to another Kafka
> topic.
>
> Issue:
> I set Parallelism to 8 and on analyzing the subtasks found that the data
> is not distributed well among 8 parallel tasks for the last Join query. One
> of a subtask is taking huge load, whereas others taking pretty low load.
>
> Tried a couple of things below, but no use. Not sure if they are actually
> related to the problem as I couldn't yet understand what's the issue here.
> 1. increasing the number of partitions of output Kafka topic.
> 2. tried adding keys to output so key partitioning happens at Kafka end.
>
> Below is a snapshot for reference:
> [image: image.png]
>
> Below are the config changes I made:
>
> taskmanager.numberOfTaskSlots: 8
> parallelism.default: 8
> jobmanager.heap.size: 5000m
> taskmanager.heap.size: 5000m
> state.backend: rocksdb
> state.checkpoints.dir: file:///Users/username/flink-1.9.1/checkpoints
> state.backend.incremental: true
>
> I don't see any errors and job seems to be running smoothly (and slowly).
> I need to make it distribute the load well for faster processing, any
> pointers on what could be wrong and how to fix it would be very helpful.
>
> Thanks,
> RKandoji
>
>
> On Fri, Jan 3, 2020 at 1:06 PM RKandoji  wrote:
>
>> Thanks!
>>
>> On Thu, Jan 2, 2020 at 9:45 PM Jingsong Li 
>> wrote:
>>
>>> Yes,
>>>
>>> 1.9.2 or Coming soon 1.10
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Fri, Jan 3, 2020 at 12:43 AM RKandoji  wrote:
>>>
 Ok thanks, does it mean version 1.9.2 is what I need to use?

 On Wed, Jan 1, 2020 at 10:25 PM Jingsong Li 
 wrote:

> Blink planner was introduced in 1.9. We recommend use blink planner
> after 1.9.
> After some bug fix, I think the latest version of 1.9 is OK. The
> production environment has also been set up in some places.
>
> Best,
> Jingsong Lee
>
> On Wed, Jan 1, 2020 at 3:24 AM RKandoji  wrote:
>
>> Thanks Jingsong and Kurt for more details.
>>
>> Yes, I'm planning to try out DeDuplication when I'm done upgrading to
>> version 1.9. Hopefully deduplication is done by only one task and reused
>> everywhere else.
>>
>> One more follow-up question, I see "For production use cases, we
>> recommend the old planner that was present before Flink 1.9 for now." 
>> warning
>> here
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/
>> This is actually the reason why started with version 1.8, could you
>> please let me know your opinion about this? and do you think there is any
>> production code running on version 1.9
>>
>> Thanks,
>> Reva
>>
>>
>>
>>
>> On Mon, Dec 30, 2019 at 9:02 PM Kurt Young  wrote:
>>
>>> BTW, you could also have a more efficient version of deduplicating
>>> user table by using the topn feature [1].
>>>
>>> Best,
>>> Kurt
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#top-n
>>>
>>>
>>> On Tue, Dec 31, 2019 at 9:24 AM Jingsong Li 
>>> wrote:
>>>
 Hi RKandoji,

 In theory, you don't need to do something.
 First, the optimizer will optimize by doing duplicate nodes.
 Second, after SQL optimization, if the optimized plan still has
 duplicate nodes, the planner will automatically reuse them.
 There are config options to control whether we should reuse plan,
 their default value is true. So you don't need modify them.
 - table.optimizer.reuse-sub-plan-enabled
 - table.optimizer.reuse-source-enabled

 Best,
 Jingsong Lee

 On Tue, Dec 31, 2019 at 6:29 AM RKandoji 
 wrote:

> Thanks Terry and Jingsong,
>
> Currently I'm on 1.8 version using Flink planner for stream
> proessing, I'll 

Flink logging issue with logback

2020-01-03 Thread Bajaj, Abhinav
Hi,

I am investigating a logging issue with Flink.

Setup

  *   Using Flink-1.7.1 using logback as suggested in Flink documentation 
here.
  *   Submitting the Flink job from the Flink dashboard.

Observations

  *   Logs from main method(outside of job graph) do not show up in jobmanager 
logs.
  *   Logs from the operators like map or custom operators do show up in the 
taskmanager logs.
  *   Logs from main method do show up in jobmanager logs when using log4j in 
place of logback.

Has anyone else noticed similar behavior or is this a known issue with logback 
integration in Flink?
Any suggestions on potential workaround or fix?

Appreciate your time and help.

~ Abhinav Bajaj



Table API: Joining on Tables of Complex Types

2020-01-03 Thread Hailu, Andreas
Hi folks,

I'm trying to join two Tables which are composed of complex types, Avro's 
GenericRecord to be exact. I have to use a custom UDF to extract fields out of 
the record and I'm having some trouble on how to do joins on them as I need to 
call this UDF to read what I need. Example below:

batchTableEnvironment.registerFunction("getField", new GRFieldExtractor()); // 
GenericRecord field extractor
Table users = batchTableEnvironment.fromDataSet(usersDataset); // Converting 
from some pre-existing DataSet
Table otherDataset = batchTableEnvironment.fromDataSet(someOtherDataset);
Table userNames = t.select("getField(f0, userName)"); // This is how the UDF is 
used, as GenericRecord is a complex type requiring you to invoke a get() method 
on the field you're interested in. Here we get a get on field 'userName'

I'd like to do something using the Table API similar to the query "SELECT * 
from otherDataset WHERE otherDataset.userName = users.userName". How is this 
done?

Best,
Andreas

The Goldman Sachs Group, Inc. All rights reserved.
See http://www.gs.com/disclaimer/global_email for important risk disclosures, 
conflicts of interest and other terms and conditions relating to this e-mail 
and your reliance on information contained in it.  This message may contain 
confidential or privileged information.  If you are not the intended recipient, 
please advise us immediately and delete this message.  See 
http://www.gs.com/disclaimer/email for further information on confidentiality 
and the risks of non-secure electronic communication.  If you cannot access 
these links, please notify us by reply message and we will send the contents to 
you.




Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices


Re: Duplicate tasks for the same query

2020-01-03 Thread RKandoji
Hi,

Thanks a ton for the help with earlier questions, I updated code to version
1.9 and started using Blink Planner (DeDuplication). This is working as
expected!

I have a new question, but thought of asking in the same email chain as
this has more context about my use case etc.

Workflow:
Currently I'm reading from a couple of Kafka topics, DeDuplicating the
input data, performing JOINs and writing the joined data to another Kafka
topic.

Issue:
I set Parallelism to 8 and on analyzing the subtasks found that the data is
not distributed well among 8 parallel tasks for the last Join query. One of
a subtask is taking huge load, whereas others taking pretty low load.

Tried a couple of things below, but no use. Not sure if they are actually
related to the problem as I couldn't yet understand what's the issue here.
1. increasing the number of partitions of output Kafka topic.
2. tried adding keys to output so key partitioning happens at Kafka end.

Below is a snapshot for reference:
[image: image.png]

Below are the config changes I made:

taskmanager.numberOfTaskSlots: 8
parallelism.default: 8
jobmanager.heap.size: 5000m
taskmanager.heap.size: 5000m
state.backend: rocksdb
state.checkpoints.dir: file:///Users/username/flink-1.9.1/checkpoints
state.backend.incremental: true

I don't see any errors and job seems to be running smoothly (and slowly). I
need to make it distribute the load well for faster processing, any
pointers on what could be wrong and how to fix it would be very helpful.

Thanks,
RKandoji


On Fri, Jan 3, 2020 at 1:06 PM RKandoji  wrote:

> Thanks!
>
> On Thu, Jan 2, 2020 at 9:45 PM Jingsong Li  wrote:
>
>> Yes,
>>
>> 1.9.2 or Coming soon 1.10
>>
>> Best,
>> Jingsong Lee
>>
>> On Fri, Jan 3, 2020 at 12:43 AM RKandoji  wrote:
>>
>>> Ok thanks, does it mean version 1.9.2 is what I need to use?
>>>
>>> On Wed, Jan 1, 2020 at 10:25 PM Jingsong Li 
>>> wrote:
>>>
 Blink planner was introduced in 1.9. We recommend use blink planner
 after 1.9.
 After some bug fix, I think the latest version of 1.9 is OK. The
 production environment has also been set up in some places.

 Best,
 Jingsong Lee

 On Wed, Jan 1, 2020 at 3:24 AM RKandoji  wrote:

> Thanks Jingsong and Kurt for more details.
>
> Yes, I'm planning to try out DeDuplication when I'm done upgrading to
> version 1.9. Hopefully deduplication is done by only one task and reused
> everywhere else.
>
> One more follow-up question, I see "For production use cases, we
> recommend the old planner that was present before Flink 1.9 for now." 
> warning
> here https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/
>
> This is actually the reason why started with version 1.8, could you
> please let me know your opinion about this? and do you think there is any
> production code running on version 1.9
>
> Thanks,
> Reva
>
>
>
>
> On Mon, Dec 30, 2019 at 9:02 PM Kurt Young  wrote:
>
>> BTW, you could also have a more efficient version of deduplicating
>> user table by using the topn feature [1].
>>
>> Best,
>> Kurt
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#top-n
>>
>>
>> On Tue, Dec 31, 2019 at 9:24 AM Jingsong Li 
>> wrote:
>>
>>> Hi RKandoji,
>>>
>>> In theory, you don't need to do something.
>>> First, the optimizer will optimize by doing duplicate nodes.
>>> Second, after SQL optimization, if the optimized plan still has
>>> duplicate nodes, the planner will automatically reuse them.
>>> There are config options to control whether we should reuse plan,
>>> their default value is true. So you don't need modify them.
>>> - table.optimizer.reuse-sub-plan-enabled
>>> - table.optimizer.reuse-source-enabled
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Tue, Dec 31, 2019 at 6:29 AM RKandoji  wrote:
>>>
 Thanks Terry and Jingsong,

 Currently I'm on 1.8 version using Flink planner for stream
 proessing, I'll switch to 1.9 version to try out blink planner.

 Could you please point me to any examples (Java preferred) using
 SubplanReuser?

 Thanks,
 RK

 On Sun, Dec 29, 2019 at 11:32 PM Jingsong Li <
 jingsongl...@gmail.com> wrote:

> Hi RKandoji,
>
> FYI: Blink-planner subplan reusing: [1] 1.9 available.
>
>Join  Join
>  /  \  /  \
>  Filter1  Filter2  Filter1  Filter2
> ||=>   \ /
>  Project1 Project2Project1
> ||   |
>   Scan1Scan2   Scan1
>
>
> [1]
> 

Re: Duplicate tasks for the same query

2020-01-03 Thread RKandoji
Thanks!

On Thu, Jan 2, 2020 at 9:45 PM Jingsong Li  wrote:

> Yes,
>
> 1.9.2 or Coming soon 1.10
>
> Best,
> Jingsong Lee
>
> On Fri, Jan 3, 2020 at 12:43 AM RKandoji  wrote:
>
>> Ok thanks, does it mean version 1.9.2 is what I need to use?
>>
>> On Wed, Jan 1, 2020 at 10:25 PM Jingsong Li 
>> wrote:
>>
>>> Blink planner was introduced in 1.9. We recommend use blink planner
>>> after 1.9.
>>> After some bug fix, I think the latest version of 1.9 is OK. The
>>> production environment has also been set up in some places.
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Wed, Jan 1, 2020 at 3:24 AM RKandoji  wrote:
>>>
 Thanks Jingsong and Kurt for more details.

 Yes, I'm planning to try out DeDuplication when I'm done upgrading to
 version 1.9. Hopefully deduplication is done by only one task and reused
 everywhere else.

 One more follow-up question, I see "For production use cases, we
 recommend the old planner that was present before Flink 1.9 for now." 
 warning
 here https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/
 This is actually the reason why started with version 1.8, could you
 please let me know your opinion about this? and do you think there is any
 production code running on version 1.9

 Thanks,
 Reva




 On Mon, Dec 30, 2019 at 9:02 PM Kurt Young  wrote:

> BTW, you could also have a more efficient version of deduplicating
> user table by using the topn feature [1].
>
> Best,
> Kurt
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#top-n
>
>
> On Tue, Dec 31, 2019 at 9:24 AM Jingsong Li 
> wrote:
>
>> Hi RKandoji,
>>
>> In theory, you don't need to do something.
>> First, the optimizer will optimize by doing duplicate nodes.
>> Second, after SQL optimization, if the optimized plan still has
>> duplicate nodes, the planner will automatically reuse them.
>> There are config options to control whether we should reuse plan,
>> their default value is true. So you don't need modify them.
>> - table.optimizer.reuse-sub-plan-enabled
>> - table.optimizer.reuse-source-enabled
>>
>> Best,
>> Jingsong Lee
>>
>> On Tue, Dec 31, 2019 at 6:29 AM RKandoji  wrote:
>>
>>> Thanks Terry and Jingsong,
>>>
>>> Currently I'm on 1.8 version using Flink planner for stream
>>> proessing, I'll switch to 1.9 version to try out blink planner.
>>>
>>> Could you please point me to any examples (Java preferred) using
>>> SubplanReuser?
>>>
>>> Thanks,
>>> RK
>>>
>>> On Sun, Dec 29, 2019 at 11:32 PM Jingsong Li 
>>> wrote:
>>>
 Hi RKandoji,

 FYI: Blink-planner subplan reusing: [1] 1.9 available.

Join  Join
  /  \  /  \
  Filter1  Filter2  Filter1  Filter2
 ||=>   \ /
  Project1 Project2Project1
 ||   |
   Scan1Scan2   Scan1


 [1]
 https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala

 Best,
 Jingsong Lee

 On Mon, Dec 30, 2019 at 12:28 PM Terry Wang 
 wrote:

> Hi RKandoji~
>
> Could you provide more info about your poc environment?
> Stream or batch? Flink planner or blink planner?
> AFAIK, blink planner has done some optimization to deal such
> duplicate task for one same query. You can have a try with blink 
> planner :
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment
>
> Best,
> Terry Wang
>
>
>
> 2019年12月30日 03:07,RKandoji  写道:
>
> Hi Team,
>
> I'm doing a POC with flink to understand if it's a good fit for my
> use case.
>
> As part of the process, I need to filter duplicate items and
> created below query to get only the latest records based on 
> timestamp. For
> instance, I have "Users" table which may contain multiple messages 
> for the
> same "userId". So I wrote below query to get only the latest message 
> for a
> given "userId"
>
> Table uniqueUsers = tEnv.sqlQuery("SELECT * FROM Users WHERE
> (userId, userTimestamp) IN (SELECT userId, MAX(userTimestamp) FROM 
> Users
> GROUP BY userId)");
>
> The above query works as expected and contains only the latest
> users based on timestamp.
>
> The issue is when I use 

Re: Flink task node shut it self off.

2020-01-03 Thread John Smith
Well there was this huge IO wait like over 140% spike. IO wait rose slowly
for couple hours then at some time it spiked at 140% and then after IO wait
dropped back to "normal" the CPU 1min 5min 15min spiked to like 3 times the
number of cores for a bit.

We where at "peek" operation. I.e we where running a batch job when this
hapenned. On average operation the "business" requests per second from our
services is about 15 RPS when we do batches we can hit 600 RPS for a few
hours and then back down. Each business request underneath does a few round
trips back and forth between Kafka, cache systems Flink, DBs etc... So
Flink jobs are a subset of some parts of that 600 RPS.

On Flink side we 3 task managers of 4 cores 8GB which are configured as 8
slots, 5.4GB JVM, 3.77GB flink managed mem per task manager. We have 8 jobs
and 9 slots free. So the cluster isn't full yet. But we do see one node is
full.

We use disk FS state (backed by GlusterFS) not rocks DB. We had enabled 5
second checkpointing for 6 of the jobs... So just wondering if that was
possibly the reason for the IO wait... But regardless of the RPS mentioned
above the jobs will always checkpoint every 5 seconds... I had the chance
to increase checkpointing for a few of the jobs before the holidays. I am
back on Monday...

On Fri., Jan. 3, 2020, 11:16 a.m. Chesnay Schepler, 
wrote:

> The logs show 2 interesting pieces of information:
>
> 
> ...
> 2019-12-19 18:33:23,278 INFO
> org.apache.kafka.clients.FetchSessionHandler  - [Consumer
> clientId=consumer-4, groupId=ccdb-prod-import] Error sending fetch
> request (sessionId=INVALID, epoch=INITIAL) to node 0:
> org.apache.kafka.common.errors.DisconnectException.
> ...
> 2019-12-19 19:37:06,732 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
> resolve ResourceManager address 
> akka.tcp://flink@xx-job-0002:36835/user/resourcemanager,
> retrying in 1 ms: Ask timed out on
> [ActorSelection[Anchor(akka.tcp://flink@xx-job-0002:36835/),
> Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent message
> of type "akka.actor.Identify"..
>
> This reads like the machine lost network connectivity for some reason. The
> tasks start failing because kafka cannot be reached, and the TM then shuts
> down because it can neither reach the ResourceManager.
>
> On 25/12/2019 04:34, Zhijiang wrote:
>
> If you use rocksDB state backend, it might consume extra native memory.
> Some resource framework cluster like yarn would kill the container if the
> memory usage exceeds some threshold. You can also double check whether it
> exists in your case.
>
> --
> From:John Smith  
> Send Time:2019 Dec. 25 (Wed.) 03:40
> To:Zhijiang  
> Cc:user  
> Subject:Re: Flink task node shut it self off.
>
> The shutdown happened after the massive IO wait. I don't use any state
> Checkpoints are disk based...
>
> On Mon., Dec. 23, 2019, 1:42 a.m. Zhijiang, 
> wrote:
> Hi John,
>
> Thanks for the positive comments of Flink usage. No matter at least-once
> or exactly-once you used for checkpoint, it would never lose one message
> during failure recovery.
>
> Unfortunatelly I can not visit the logs you posted. Generally speaking the
> longer internal checkpoint would mean replaying more source data after
> failure recovery.
> In my experience the 5 seconds interval for checkpoint is too frequently
> in my experience, and you might increase it to 1 minute or so. You can also
> monitor how long will the checkpoint finish in your application, then you
> can adjust the interval accordingly.
>
> Concerning of the node shutdown you mentioned, I am not quite sure whether
> it is relevant to your short checkpoint interval. Do you config to use heap
> state backend?  The hs_err file really indicated that you job had
> encountered the memory issue, then it is better to somehow increase your
> task manager memory. But if you can analyze the dump hs_err file via some
> profiler tool for checking the memory usage, it might be more helpful to
> find the root cause.
>
> Best,
> Zhijiang
>
> --
> From:John Smith 
> Send Time:2019 Dec. 21 (Sat.) 05:26
> To:user 
> Subject:Flink task node shut it self off.
>
> Hi, using Flink 1.8.0
>
> 1st off I must say Flink resiliency is very impressive, we lost a node and
> never lost one message by using checkpoints and Kafka. Thanks!
>
> The cluster is a self hosted cluster and we use our own zookeeper cluster.
> We have...
> 3 zookeepers: 4 cpu, 8GB (each)
> 3 job nodes: 4 cpu, 8GB (each)
> 3 task nodes: 4 cpu, 8GB (each)
> The nodes also share GlusterFS for storing savepoints and checkpoints,
> GlusterFS is running on the same machines.
>
> Yesterday a node shut itself off we the following log messages...
> - Stopping TaskExecutor
> akka.tcp://fl...@xxx.xxx.xxx.73:34697/user/taskmanager_0.
> - Stop job leader service.
> - 

Re: Checkpoints issue and job failing

2020-01-03 Thread Navneeth Krishnan
Thanks Congxian & Vino.

Yes, the file do exist and I don't see any problem in accessing it.

Regarding flink 1.9, we haven't migrated yet but we are planning to do.
Since we have to test it might take sometime.

Thanks

On Fri, Jan 3, 2020 at 2:14 AM Congxian Qiu  wrote:

> Hi
>
> Do you have ever check that this problem exists on Flink 1.9?
>
> Best,
> Congxian
>
>
> vino yang  于2020年1月3日周五 下午3:54写道:
>
>> Hi Navneeth,
>>
>> Did you check if the path contains in the exception is really can not be
>> found?
>>
>> Best,
>> Vino
>>
>> Navneeth Krishnan  于2020年1月3日周五 上午8:23写道:
>>
>>> Hi All,
>>>
>>> We are running into checkpoint timeout issue more frequently in
>>> production and we also see the below exception. We are running flink 1.4.0
>>> and the checkpoints are saved on NFS. Can someone suggest how to overcome
>>> this?
>>>
>>> [image: image.png]
>>>
>>> java.lang.IllegalStateException: Could not initialize operator state 
>>> backend.
>>> at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:302)
>>> at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:249)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.io.FileNotFoundException: 
>>> /mnt/checkpoints/02c4f8d5c11921f363b98c5959cc4f06/chk-101/e71d8eaf-ff4a-4783-92bd-77e3d8978e01
>>>  (No such file or directory)
>>> at java.io.FileInputStream.open0(Native Method)
>>> at java.io.FileInputStream.open(FileInputStream.java:195)
>>> at java.io.FileInputStream.(FileInputStream.java:138)
>>> at 
>>> org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)
>>>
>>>
>>> Thanks
>>>
>>>


Re: 使用influxdb作为flink metrics reporter

2020-01-03 Thread Yun Tang
Hi 张江


  *   Retention policy 需要现在InfluxDB端创建,InfluxDBReporter不会自行创建不存在的 retention 
policy.
  *   kafka的一些metrics在使用influxDB reporter的时候,会出现一些cast exception,可以参考 
[1],在Flink-1.9 版本下可以忽略这些异常。

[1] https://issues.apache.org/jira/browse/FLINK-12147

祝好
唐云

From: 张江 
Sent: Friday, January 3, 2020 21:22
To: user-zh@flink.apache.org 
Subject: 使用influxdb作为flink metrics reporter

大家好,


我按照官网所介绍的flink metrics reporter设置,选用了influxdb,进行了如下设置:
metrics.reporter.influxdb.class:org.apache.flink.metrics.influxdb.InfluxdbReportermetrics.reporter.influxdb.host:localhostmetrics.reporter.influxdb.port:8086metrics.reporter.influxdb.db:flinkmetrics.reporter.influxdb.username:flink-metrics
metrics.reporter.influxdb.password:qwerty
metrics.reporter.influxdb.retentionPolicy:one_hour
但是,启动flink作业(on yarn per job模式)和flinxdb后,发现一直报错:
error  [500] - "retention policy not found: one_hour" {"log_id": "OK6nejJI000", 
"service": "httpd"} [httpd] 10.90.*.* - flinkuser [03/Jan/2020:19:35:58 +0800] 
"POST /write? db=flink=one_hour=n=one HTTP/1.1" 500 49 
"-" "okhttp/3.11.0" 3637af63-2e1d-11ea-802a-000c2947e206 165


我使用的是 flink 1.9.1,influxdb版本是1.79.


而且,当我不设置retentionPolicy时,还是会报错,提示:
org.apache.flink.metrics.influxdb.shaded.org.influxdb.InfluxDBException$UnableToParseException:
 partial write: unable to parse 
"taskmanager_job_task_operator_sync-time-avg,host=master,job_id=03136f4c1a78e9930262b455ef0657e2,job_name=Flink-app,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=XXX,task_attempt_num=0,task_id=
 
cbc357ccb763df2852fee8c4fc7d55f2,task_name=XX,tm_id=container_1577507646998_0054_01_02
 value=? 157805124760500": invalid boolean


求问各位大佬,这些问题怎么解决?
谢谢


祝好,





Re: Flink task node shut it self off.

2020-01-03 Thread Chesnay Schepler

The logs show 2 interesting pieces of information:


...
2019-12-19 18:33:23,278 INFO 
org.apache.kafka.clients.FetchSessionHandler  - 
[Consumer clientId=consumer-4, groupId=ccdb-prod-import] Error 
sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 0: 
org.apache.kafka.common.errors.DisconnectException.

...
2019-12-19 19:37:06,732 INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor    - Could 
not resolve ResourceManager address 
akka.tcp://flink@xx-job-0002:36835/user/resourcemanager, retrying in 
1 ms: Ask timed out on 
[ActorSelection[Anchor(akka.tcp://flink@xx-job-0002:36835/), 
Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent 
message of type "akka.actor.Identify"..


This reads like the machine lost network connectivity for some reason. 
The tasks start failing because kafka cannot be reached, and the TM then 
shuts down because it can neither reach the ResourceManager.


On 25/12/2019 04:34, Zhijiang wrote:

If you use rocksDB state backend, it might consume extra native memory.
Some resource framework cluster like yarn would kill the container if 
the memory usage exceeds some threshold. You can also double check 
whether it exists in your case.


--
From:John Smith 
Send Time:2019 Dec. 25 (Wed.) 03:40
To:Zhijiang 
Cc:user 
Subject:Re: Flink task node shut it self off.

The shutdown happened after the massive IO wait. I don't use any
state Checkpoints are disk based...

On Mon., Dec. 23, 2019, 1:42 a.m. Zhijiang,
mailto:wangzhijiang...@aliyun.com>>
wrote:
Hi John,

Thanks for the positive comments of Flink usage. No matter
at least-once or exactly-once you used for checkpoint, it would
never lose one message during failure recovery.

Unfortunatelly I can not visit the logs you posted. Generally
speaking the longer internal checkpoint would mean replaying more
source data after failure recovery.
In my experience the 5 seconds interval for checkpoint is too
frequently in my experience, and you might increase it to 1 minute
or so. You can also monitor how long will the checkpoint finish in
your application, then you can adjust the interval accordingly.

Concerning of the node shutdown you mentioned, I am not quite sure
whether it is relevant to your short checkpoint interval. Do you
config to use heap state backend? The hs_err file really indicated
that you job had encountered the memory issue, then it is better
to somehow increase your task manager memory. But if you can
analyze the dump hs_err file via some profiler tool for checking
the memory usage, it might be more helpful to find the root cause.

Best,
Zhijiang

--
From:John Smith mailto:java.dev@gmail.com>>
Send Time:2019 Dec. 21 (Sat.) 05:26
To:user mailto:user@flink.apache.org>>
Subject:Flink task node shut it self off.

Hi, using Flink 1.8.0

1st off I must say Flink resiliency is very impressive, we lost a
node and never lost one message by using checkpoints and Kafka.
Thanks!

The cluster is a self hosted cluster and we use our own zookeeper
cluster. We have...
3 zookeepers: 4 cpu, 8GB (each)
3 job nodes: 4 cpu, 8GB (each)
3 task nodes: 4 cpu, 8GB (each)
The nodes also share GlusterFS for storing savepoints and
checkpoints, GlusterFS is running on the same machines.

Yesterday a node shut itself off we the following log messages...
- Stopping TaskExecutor
akka.tcp://fl...@xxx.xxx.xxx.73:34697/user/taskmanager_0.
- Stop job leader service.
- Stopping ZooKeeperLeaderRetrievalService
/leader/resource_manager_lock.
- Shutting down TaskExecutorLocalStateStoresManager.
- Shutting down BLOB cache
- Shutting down BLOB cache
- removed file cache directory
/tmp/flink-dist-cache-4b60d79b-1cef-4ffb-8837-3a9c9a205000
- I/O manager removed spill file directory
/tmp/flink-io-c9d01b92-2809-4a55-8ab3-6920487da0ed
- Shutting down the network environment and its components.

Prior to the node shutting off we noticed massive IOWAIT of 140%
and CPU load 1minute of 15. And we also got an hs_err file which
sais we should increase the memory.

I'm attaching the logs here:
https://www.dropbox.com/sh/vp1ytpguimiayw7/AADviCPED47QEy_4rHsGI1Nya?dl=0

I wonder if my 5 second checkpointing is too much for gluster.

Any thoughts?










Re: Change Akka Ask Timeout for Job Submission Only

2020-01-03 Thread Chesnay Schepler

There are 3 communication layers involved here:

1) client <=> server (REST API)

This goes through REST and does not use timeouts AFAIK. We wait until a 
response comes or the connection terminates.


2) server (REST API) <=> processes (JM, Dispatcher)

This goes through akka, with "web.timeout" being used for the timeout.

3) processes <=> processes

Also akka, with "akka.ask.timeout" being used.


The timeout in question occurs on layer 2) due to the JM being 
incredibly busy, possible due to some heavy-weight computation in the 
job setup.

In any case, you can try increasing web.timeout to maybe resolve this issue.


On 20/12/2019 06:13, tison wrote:

Forward to user list.

Best,
tison.


Abdul Qadeer mailto:quadeer@gmail.com>> 
于2019年12月20日周五 下午12:57写道:


Around submission time, logs from jobmanager:


{"timeMillis":1576764854245,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","message":"Received
JobGraph submission 714829e8f6c8cd0daaed335c1b8c588a

(sample).","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M

{"timeMillis":1576764854247,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","message":"Submitting
job 714829e8f6c8cd0daaed335c1b8c588a

(sample).","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M

{"timeMillis":1576764856119,"thread":"flink-akka.actor.default-dispatcher-1036","level":"INFO","loggerName":"akka.actor.RepointableActorRef","message":"Message
[org.apache.flink.runtime.rpc.messages.LocalFencedMessage] from
Actor[akka://flink/deadLetters] to
Actor[akka://flink/user/jobmanager_4#-2122695705] was not
delivered. [87] dead letters encountered. This logging can be
turned off or adjusted with configuration settings
'akka.log-dead-letters' and

'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1150,"threadPriority":5}^M

{"timeMillis":1576764877732,"thread":"flink-akka.actor.default-dispatcher-1039","level":"INFO","loggerName":"akka.actor.RepointableActorRef","message":"Message
[org.apache.flink.runtime.rpc.messages.LocalFencedMessage] from
Actor[akka://flink/deadLetters] to
Actor[akka://flink/user/jobmanager_4#-2122695705] was not
delivered. [88] dead letters encountered. This logging can be
turned off or adjusted with configuration settings
'akka.log-dead-letters' and

'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1155,"threadPriority":5}^M

{"timeMillis":1576764877732,"thread":"flink-scheduler-1","level":"ERROR","loggerName":"org.apache.flink.runtime.rest.handler.job.JobSubmitHandler","message":"Unhandled
exception.","thrown":{"commonElementCount":0,"localizedMessage":"Ask
timed out on [Actor[akka://flink/user/dispatcher#1899316777]]
after [1 ms]. Sender[null] sent message of type

\"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".","message":"Ask
timed out on [Actor[akka://flink/user/dispatcher#1899316777]]
after [1 ms]. Sender[null] sent message of type


Re: kafka: how to stop consumption temporarily

2020-01-03 Thread Chesnay Schepler
Are you asking how to detect from within the job whether the dump is 
complete, or how to combine these 2 jobs?


If you had a way to notice whether the dump is complete, then I would 
suggest to create a custom source that wraps 2 kafka sources, and switch 
between them at will based on your conditions.



On 03/01/2020 03:53, Terry Wang wrote:

Hi,

I’d like to share my opinion here. It seems that you need adjust the Kafka 
consumer to have communication each other. When your begin the dump process, 
you need to notify another CDC-topic consumer to wait idle.


Best,
Terry Wang




2020年1月2日 16:49,David Morin  写道:

Hi,

Is there a way to stop temporarily to consume one kafka source in streaming 
mode ?
Use case: I have to consume 2 topics but in fact one of them is more 
prioritized.
One of this topic is dedicated to ingest data from db (change data capture) and 
one of them is dedicated to make a synchronization (a dump i.e. a SELECT ... 
from db). At the moment the last one is performed by one Flink job and we start 
this one after stop the previous one (CDC) manually
I want to merge these 2 modes and automatically stop consumption of the topic 
dedicated to the CDC mode when a dump is done.
How to handle that with Flink in a streaming way ? backpressure ? ...
Thx in advance for your insights

David






Re: Migrate custom partitioner from Flink 1.7 to Flink 1.9

2020-01-03 Thread Chesnay Schepler
You should be able to implement this on the DataStream API level using 
DataStream#broadcast and #union like this:


input = ...

singleChannel = input.filter(x -> !x.isBroadCastPartitioning);

broadcastChannel = input.filter(x -> x.isBroadCastPartitioning);

result = broadcastChannel.broadcast().union(singleChannel)

// apply operations on result


On 26/12/2019 08:20, Salva Alcántara wrote:

I am trying to migrate a custom dynamic partitioner from Flink 1.7 to Flink
1.9. The original partitioner implemented the `selectChannels` method within
the `StreamPartitioner` interface like this:

```java
 // Original: working for Flink 1.7
 //@Override
 public int[] selectChannels(SerializationDelegate>
streamRecordSerializationDelegate,
 int numberOfOutputChannels) {
 T value =
streamRecordSerializationDelegate.getInstance().getValue();
 if (value.f0.isBroadCastPartitioning()) {
 // send to all channels
 int[] channels = new int[numberOfOutputChannels];
 for (int i = 0; i < numberOfOutputChannels; ++i) {
 channels[i] = i;
 }
 return channels;
 } else if (value.f0.getPartitionKey() == -1) {
 // random partition
 returnChannels[0] = random.nextInt(numberOfOutputChannels);
 } else {
 returnChannels[0] =
partitioner.partition(value.f0.getPartitionKey(), numberOfOutputChannels);
 }
 return returnChannels;
 }

```

I am not sure how to migrate this to Flink 1.9, since the
`StreamPartitioner` interface has changed as illustrated below:


```java
 // New: required by Flink 1.9
 @Override
 public int selectChannel(SerializationDelegate>
streamRecordSerializationDelegate) {
 T value =
streamRecordSerializationDelegate.getInstance().getValue();
 if (value.f0.isBroadCastPartitioning()) {
 /*
 It is illegal to call this method for broadcast channel
selectors and this method can remain not
 implemented in that case (for example by throwing
UnsupportedOperationException).
 */
 } else if (value.f0.getPartitionKey() == -1) {
 // random partition
 returnChannels[0] = random.nextInt(numberOfChannels);
 } else {
 returnChannels[0] =
partitioner.partition(value.f0.getPartitionKey(), numberOfChannels);
 }
 //return returnChannels;
 return returnChannels[0];
 }
```

Note that `selectChannels` has been replaced with `selectChannel`. So, it is
no longer possible to return multiple output channels as originally done
above for the case of broadcasted elements. As a matter of fact,
`selectChannel` should not be invoked for this particular case. Any thoughts
on how to tackle this?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Yarn Kerberos issue

2020-01-03 Thread Chesnay Schepler
From what I understand from the documentation, if you want to use 
delegation tokens you always first have to issue a ticket using kinit; 
so you did everything correctly?


On 02/01/2020 13:00, Juan Gentile wrote:


Hello,

Im trying to submit a job (batch worcount) to a Yarn cluster. I’m 
trying to use delegation tokens and I’m getting the following error:


/org.apache.flink.client.deployment.ClusterDeploymentException: 
Couldn't deploy Yarn session cluster/


/at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:423)/


/at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:262)/


/at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)/

/at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)/


/at 
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)/


/at java.security.AccessController.doPrivileged(Native Method)/

/at javax.security.auth.Subject.doAs(Subject.java:422)/

/at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)/


/at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)/


/at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)/

/Caused by: 
org.apache.hadoop.ipc.RemoteException(java.io.IOException): Delegation 
Token can be issued only with kerberos or web authentication/


/at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7560)/


/at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:548)/


/at 
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getDelegationToken(AuthorizationProviderProxyClientProtocol.java:663)/


/at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:981)/


/at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)/


/at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)/


/at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)/

/at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2221)/

/at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)/

/at java.security.AccessController.doPrivileged(Native Method)/

/at javax.security.auth.Subject.doAs(Subject.java:422)/

/at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)/


/at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2215) /

/at org.apache.hadoop.ipc.Client.call(Client.java:1472)/

/at org.apache.hadoop.ipc.Client.call(Client.java:1409)/

/at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)/


/at com.sun.proxy.$Proxy18.getDelegationToken(Unknown Source)/

/at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:928)/


/at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)/

/at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)/


/at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)/


/at java.lang.reflect.Method.invoke(Method.java:498)/

/at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)/


/at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)/


/at com.sun.proxy.$Proxy19.getDelegationToken(Unknown Source)/

/at 
org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1082)/


/at 
org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1499)/


/at 
org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:546)/


/at 
org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:557)/


/at 
org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:524)/


/at 
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:140)/


/at 
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)/


/at 
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)/


/at org.apache.flink.yarn.Utils.setTokensFor(Utils.java:235)/

/at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:972)/


/at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:545)/


/at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:416)/


//

The kerberos configuration in this case is the default one. Then I 
tried with this option set to false 

单个算子处理一条数据的真实处理时间

2020-01-03 Thread 张江
大家好,



之前问过类似的问题,可能表达不太清楚。向各位大佬再请教一下。

有人知道怎么获取以下的flink metrics么:




这个是flink forward asia 
2019会议上马庆祥老师讲的flink动态资源调整里的内容。但我自己在flink官网上没有看到metrics有这个指标信息。

谢谢。




祝好,







使用influxdb作为flink metrics reporter

2020-01-03 Thread 张江
大家好,


我按照官网所介绍的flink metrics reporter设置,选用了influxdb,进行了如下设置:
metrics.reporter.influxdb.class:org.apache.flink.metrics.influxdb.InfluxdbReportermetrics.reporter.influxdb.host:localhostmetrics.reporter.influxdb.port:8086metrics.reporter.influxdb.db:flinkmetrics.reporter.influxdb.username:flink-metrics
metrics.reporter.influxdb.password:qwerty 
metrics.reporter.influxdb.retentionPolicy:one_hour
但是,启动flink作业(on yarn per job模式)和flinxdb后,发现一直报错:
error  [500] - "retention policy not found: one_hour" {"log_id": "OK6nejJI000", 
"service": "httpd"} [httpd] 10.90.*.* - flinkuser [03/Jan/2020:19:35:58 +0800] 
"POST /write? db=flink=one_hour=n=one HTTP/1.1" 500 49 
"-" "okhttp/3.11.0" 3637af63-2e1d-11ea-802a-000c2947e206 165


我使用的是 flink 1.9.1,influxdb版本是1.79.


而且,当我不设置retentionPolicy时,还是会报错,提示:
org.apache.flink.metrics.influxdb.shaded.org.influxdb.InfluxDBException$UnableToParseException:
 partial write: unable to parse 
"taskmanager_job_task_operator_sync-time-avg,host=master,job_id=03136f4c1a78e9930262b455ef0657e2,job_name=Flink-app,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=XXX,task_attempt_num=0,task_id=
 
cbc357ccb763df2852fee8c4fc7d55f2,task_name=XX,tm_id=container_1577507646998_0054_01_02
 value=? 157805124760500": invalid boolean


求问各位大佬,这些问题怎么解决?
谢谢


祝好,





Re: Late outputs for Session Window

2020-01-03 Thread KristoffSC
After following suggestion from SO
I added few changes, so now I'm using Event Time
Water marks are progressing, I've checked them in Flink's metrics. The
Window operator is triggered but still I don't see any late outputs for
this. 


StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000,
1000));
env.setParallelism(1);
env.disableOperatorChaining();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);


DataStream rawBusinessTransaction = env
.addSource(new FlinkKafkaConsumer<>("business",
new JSONKeyValueDeserializationSchema(false),
properties))
.map(new KafkaTransactionObjectMapOperator())
.assignTimestampsAndWatermarks(new
AssignerWithPeriodicWatermarks() {

@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(System.currentTimeMillis());
}

@Override
public long extractTimestamp(RawMessage element, long
previousElementTimestamp) {
return element.messageCreationTime;
}
})
.name("Kafka Transaction Raw Data Source.");

messageStream
 .keyBy(tradeKeySelector)
 .window(EventTimeSessionWindows.withDynamicGap(new
TradeAggregationGapExtractor()))
 .sideOutputLateData(lateTradeMessages)
 .process(new CumulativeTransactionOperator())
 .name("Aggregate Transaction Builder");






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [DISCUSS] Set default planner for SQL Client to Blink planner in 1.10 release

2020-01-03 Thread Jark Wu
Hi Benoît,

Thanks for the reminder. I will look into the issue and hopefully we can
target it into 1.9.2 and 1.10.

Cheers,
Jark

On Fri, 3 Jan 2020 at 18:21, Benoît Paris <
benoit.pa...@centraliens-lille.org> wrote:

> >  If anyone finds that blink planner has any significant defects and has
> a larger regression than the old planner, please let us know.
>
> Overall, the Blink-exclusive features are must (TopN, deduplicate,
> LAST_VALUE, plan reuse, etc)! But all use cases of the Legacy planner in
> production are not covered:
> An edge case of Temporal Table Functions does not allow computed Tables
> (as opposed to TableSources) to be used on the query side in Blink (
> https://issues.apache.org/jira/browse/FLINK-14200)
>
> Cheers
> Ben
>
>
> On Fri, Jan 3, 2020 at 10:00 AM Jeff Zhang  wrote:
>
>> +1, I have already made blink as the default planner of flink interpreter
>> in Zeppelin
>>
>>
>> Jingsong Li  于2020年1月3日周五 下午4:37写道:
>>
>>> Hi Jark,
>>>
>>> +1 for default blink planner in SQL-CLI.
>>> I believe this new planner can be put into practice in production.
>>> We've worked hard for nearly a year, but the old planner didn't move on.
>>>
>>> And I'd like to cc to user@flink.apache.org.
>>> If anyone finds that blink planner has any significant defects and has a
>>> larger regression than the old planner, please let us know. We will be very
>>> grateful.
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Fri, Jan 3, 2020 at 4:14 PM Leonard Xu  wrote:
>>>
 +1 for this.
 We bring many SQL/API features and enhance stability in 1.10 release,
 and almost all of them happens in Blink planner.
 SQL CLI is the most convenient entrypoint for me, I believe many users
 will have a better experience If we set Blink planner as default planner.

 Best,
 Leonard

 > 在 2020年1月3日,15:16,Terry Wang  写道:
 >
 > Since what blink planner can do is a superset of flink planner, big
 +1 for changing the default planner to Blink planner from my side.
 >
 > Best,
 > Terry Wang
 >
 >
 >
 >> 2020年1月3日 15:00,Jark Wu  写道:
 >>
 >> Hi everyone,
 >>
 >> In 1.10 release, Flink SQL supports many awesome features and
 improvements,
 >> including:
 >> - support watermark statement and computed column in DDL
 >> - fully support all data types in Hive
 >> - Batch SQL performance improvements (TPC-DS 7x than Hive MR)
 >> - support INSERT OVERWRITE and INSERT PARTITION
 >>
 >> However, all the features and improvements are only avaiable in Blink
 >> planner, not in Old planner.
 >> There are also some other features are limited in Blink planner, e.g.
 >> Dimension Table Join [1],
 >> TopN [2], Deduplicate [3], streaming aggregates optimization [4],
 and so on.
 >>
 >> But Old planner is still the default planner in Table API & SQL. It
 is
 >> frustrating for users to set
 >> to blink planner manually when every time start a SQL CLI. And it's
 >> surprising to see unsupported
 >> exception if they trying out the new features but not switch planner.
 >>
 >> SQL CLI is a very important entrypoint for trying out new feautures
 and
 >> prototyping for users.
 >> In order to give new planner more exposures, I would like to suggest
 to set
 >> default planner
 >> for SQL Client to Blink planner before 1.10 release.
 >>
 >> The approach is just changing the default SQL CLI yaml
 configuration[5]. In
 >> this way, the existing
 >> environment is still compatible and unaffected.
 >>
 >> Changing the default planner for the whole Table API & SQL is
 another topic
 >> and is out of scope of this discussion.
 >>
 >> What do you think?
 >>
 >> Best,
 >> Jark
 >>
 >> [1]:
 >>
 https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
 >> [2]:
 >>
 https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#top-n
 >> [3]:
 >>
 https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
 >> [4]:
 >>
 https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html
 >> [5]:
 >>
 https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/conf/sql-client-defaults.yaml#L100
 >


>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>
> --
> Benoît Paris
> Ingénieur Machine Learning Explicable
> Tél : +33 6 60 74 23 00
> http://benoit.paris
> http://explicable.ml
>


Re: [DISCUSS] Set default planner for SQL Client to Blink planner in 1.10 release

2020-01-03 Thread Benoît Paris
>  If anyone finds that blink planner has any significant defects and has a
larger regression than the old planner, please let us know.

Overall, the Blink-exclusive features are must (TopN, deduplicate,
LAST_VALUE, plan reuse, etc)! But all use cases of the Legacy planner in
production are not covered:
An edge case of Temporal Table Functions does not allow computed Tables (as
opposed to TableSources) to be used on the query side in Blink (
https://issues.apache.org/jira/browse/FLINK-14200)

Cheers
Ben


On Fri, Jan 3, 2020 at 10:00 AM Jeff Zhang  wrote:

> +1, I have already made blink as the default planner of flink interpreter
> in Zeppelin
>
>
> Jingsong Li  于2020年1月3日周五 下午4:37写道:
>
>> Hi Jark,
>>
>> +1 for default blink planner in SQL-CLI.
>> I believe this new planner can be put into practice in production.
>> We've worked hard for nearly a year, but the old planner didn't move on.
>>
>> And I'd like to cc to user@flink.apache.org.
>> If anyone finds that blink planner has any significant defects and has a
>> larger regression than the old planner, please let us know. We will be very
>> grateful.
>>
>> Best,
>> Jingsong Lee
>>
>> On Fri, Jan 3, 2020 at 4:14 PM Leonard Xu  wrote:
>>
>>> +1 for this.
>>> We bring many SQL/API features and enhance stability in 1.10 release,
>>> and almost all of them happens in Blink planner.
>>> SQL CLI is the most convenient entrypoint for me, I believe many users
>>> will have a better experience If we set Blink planner as default planner.
>>>
>>> Best,
>>> Leonard
>>>
>>> > 在 2020年1月3日,15:16,Terry Wang  写道:
>>> >
>>> > Since what blink planner can do is a superset of flink planner, big +1
>>> for changing the default planner to Blink planner from my side.
>>> >
>>> > Best,
>>> > Terry Wang
>>> >
>>> >
>>> >
>>> >> 2020年1月3日 15:00,Jark Wu  写道:
>>> >>
>>> >> Hi everyone,
>>> >>
>>> >> In 1.10 release, Flink SQL supports many awesome features and
>>> improvements,
>>> >> including:
>>> >> - support watermark statement and computed column in DDL
>>> >> - fully support all data types in Hive
>>> >> - Batch SQL performance improvements (TPC-DS 7x than Hive MR)
>>> >> - support INSERT OVERWRITE and INSERT PARTITION
>>> >>
>>> >> However, all the features and improvements are only avaiable in Blink
>>> >> planner, not in Old planner.
>>> >> There are also some other features are limited in Blink planner, e.g.
>>> >> Dimension Table Join [1],
>>> >> TopN [2], Deduplicate [3], streaming aggregates optimization [4], and
>>> so on.
>>> >>
>>> >> But Old planner is still the default planner in Table API & SQL. It is
>>> >> frustrating for users to set
>>> >> to blink planner manually when every time start a SQL CLI. And it's
>>> >> surprising to see unsupported
>>> >> exception if they trying out the new features but not switch planner.
>>> >>
>>> >> SQL CLI is a very important entrypoint for trying out new feautures
>>> and
>>> >> prototyping for users.
>>> >> In order to give new planner more exposures, I would like to suggest
>>> to set
>>> >> default planner
>>> >> for SQL Client to Blink planner before 1.10 release.
>>> >>
>>> >> The approach is just changing the default SQL CLI yaml
>>> configuration[5]. In
>>> >> this way, the existing
>>> >> environment is still compatible and unaffected.
>>> >>
>>> >> Changing the default planner for the whole Table API & SQL is another
>>> topic
>>> >> and is out of scope of this discussion.
>>> >>
>>> >> What do you think?
>>> >>
>>> >> Best,
>>> >> Jark
>>> >>
>>> >> [1]:
>>> >>
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
>>> >> [2]:
>>> >>
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#top-n
>>> >> [3]:
>>> >>
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
>>> >> [4]:
>>> >>
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html
>>> >> [5]:
>>> >>
>>> https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/conf/sql-client-defaults.yaml#L100
>>> >
>>>
>>>
>>
>> --
>> Best, Jingsong Lee
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


-- 
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00
http://benoit.paris
http://explicable.ml


Re: Checkpoints issue and job failing

2020-01-03 Thread Congxian Qiu
Hi

Do you have ever check that this problem exists on Flink 1.9?

Best,
Congxian


vino yang  于2020年1月3日周五 下午3:54写道:

> Hi Navneeth,
>
> Did you check if the path contains in the exception is really can not be
> found?
>
> Best,
> Vino
>
> Navneeth Krishnan  于2020年1月3日周五 上午8:23写道:
>
>> Hi All,
>>
>> We are running into checkpoint timeout issue more frequently in
>> production and we also see the below exception. We are running flink 1.4.0
>> and the checkpoints are saved on NFS. Can someone suggest how to overcome
>> this?
>>
>> [image: image.png]
>>
>> java.lang.IllegalStateException: Could not initialize operator state backend.
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:302)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:249)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>  at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.io.FileNotFoundException: 
>> /mnt/checkpoints/02c4f8d5c11921f363b98c5959cc4f06/chk-101/e71d8eaf-ff4a-4783-92bd-77e3d8978e01
>>  (No such file or directory)
>>  at java.io.FileInputStream.open0(Native Method)
>>  at java.io.FileInputStream.open(FileInputStream.java:195)
>>  at java.io.FileInputStream.(FileInputStream.java:138)
>>  at 
>> org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)
>>
>>
>> Thanks
>>
>>


Re: [DISCUSS] Set default planner for SQL Client to Blink planner in 1.10 release

2020-01-03 Thread Jeff Zhang
+1, I have already made blink as the default planner of flink interpreter
in Zeppelin


Jingsong Li  于2020年1月3日周五 下午4:37写道:

> Hi Jark,
>
> +1 for default blink planner in SQL-CLI.
> I believe this new planner can be put into practice in production.
> We've worked hard for nearly a year, but the old planner didn't move on.
>
> And I'd like to cc to user@flink.apache.org.
> If anyone finds that blink planner has any significant defects and has a
> larger regression than the old planner, please let us know. We will be very
> grateful.
>
> Best,
> Jingsong Lee
>
> On Fri, Jan 3, 2020 at 4:14 PM Leonard Xu  wrote:
>
>> +1 for this.
>> We bring many SQL/API features and enhance stability in 1.10 release, and
>> almost all of them happens in Blink planner.
>> SQL CLI is the most convenient entrypoint for me, I believe many users
>> will have a better experience If we set Blink planner as default planner.
>>
>> Best,
>> Leonard
>>
>> > 在 2020年1月3日,15:16,Terry Wang  写道:
>> >
>> > Since what blink planner can do is a superset of flink planner, big +1
>> for changing the default planner to Blink planner from my side.
>> >
>> > Best,
>> > Terry Wang
>> >
>> >
>> >
>> >> 2020年1月3日 15:00,Jark Wu  写道:
>> >>
>> >> Hi everyone,
>> >>
>> >> In 1.10 release, Flink SQL supports many awesome features and
>> improvements,
>> >> including:
>> >> - support watermark statement and computed column in DDL
>> >> - fully support all data types in Hive
>> >> - Batch SQL performance improvements (TPC-DS 7x than Hive MR)
>> >> - support INSERT OVERWRITE and INSERT PARTITION
>> >>
>> >> However, all the features and improvements are only avaiable in Blink
>> >> planner, not in Old planner.
>> >> There are also some other features are limited in Blink planner, e.g.
>> >> Dimension Table Join [1],
>> >> TopN [2], Deduplicate [3], streaming aggregates optimization [4], and
>> so on.
>> >>
>> >> But Old planner is still the default planner in Table API & SQL. It is
>> >> frustrating for users to set
>> >> to blink planner manually when every time start a SQL CLI. And it's
>> >> surprising to see unsupported
>> >> exception if they trying out the new features but not switch planner.
>> >>
>> >> SQL CLI is a very important entrypoint for trying out new feautures and
>> >> prototyping for users.
>> >> In order to give new planner more exposures, I would like to suggest
>> to set
>> >> default planner
>> >> for SQL Client to Blink planner before 1.10 release.
>> >>
>> >> The approach is just changing the default SQL CLI yaml
>> configuration[5]. In
>> >> this way, the existing
>> >> environment is still compatible and unaffected.
>> >>
>> >> Changing the default planner for the whole Table API & SQL is another
>> topic
>> >> and is out of scope of this discussion.
>> >>
>> >> What do you think?
>> >>
>> >> Best,
>> >> Jark
>> >>
>> >> [1]:
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
>> >> [2]:
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#top-n
>> >> [3]:
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
>> >> [4]:
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html
>> >> [5]:
>> >>
>> https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/conf/sql-client-defaults.yaml#L100
>> >
>>
>>
>
> --
> Best, Jingsong Lee
>


-- 
Best Regards

Jeff Zhang


Re: [DISCUSS] Set default planner for SQL Client to Blink planner in 1.10 release

2020-01-03 Thread Jingsong Li
Hi Jark,

+1 for default blink planner in SQL-CLI.
I believe this new planner can be put into practice in production.
We've worked hard for nearly a year, but the old planner didn't move on.

And I'd like to cc to user@flink.apache.org.
If anyone finds that blink planner has any significant defects and has a
larger regression than the old planner, please let us know. We will be very
grateful.

Best,
Jingsong Lee

On Fri, Jan 3, 2020 at 4:14 PM Leonard Xu  wrote:

> +1 for this.
> We bring many SQL/API features and enhance stability in 1.10 release, and
> almost all of them happens in Blink planner.
> SQL CLI is the most convenient entrypoint for me, I believe many users
> will have a better experience If we set Blink planner as default planner.
>
> Best,
> Leonard
>
> > 在 2020年1月3日,15:16,Terry Wang  写道:
> >
> > Since what blink planner can do is a superset of flink planner, big +1
> for changing the default planner to Blink planner from my side.
> >
> > Best,
> > Terry Wang
> >
> >
> >
> >> 2020年1月3日 15:00,Jark Wu  写道:
> >>
> >> Hi everyone,
> >>
> >> In 1.10 release, Flink SQL supports many awesome features and
> improvements,
> >> including:
> >> - support watermark statement and computed column in DDL
> >> - fully support all data types in Hive
> >> - Batch SQL performance improvements (TPC-DS 7x than Hive MR)
> >> - support INSERT OVERWRITE and INSERT PARTITION
> >>
> >> However, all the features and improvements are only avaiable in Blink
> >> planner, not in Old planner.
> >> There are also some other features are limited in Blink planner, e.g.
> >> Dimension Table Join [1],
> >> TopN [2], Deduplicate [3], streaming aggregates optimization [4], and
> so on.
> >>
> >> But Old planner is still the default planner in Table API & SQL. It is
> >> frustrating for users to set
> >> to blink planner manually when every time start a SQL CLI. And it's
> >> surprising to see unsupported
> >> exception if they trying out the new features but not switch planner.
> >>
> >> SQL CLI is a very important entrypoint for trying out new feautures and
> >> prototyping for users.
> >> In order to give new planner more exposures, I would like to suggest to
> set
> >> default planner
> >> for SQL Client to Blink planner before 1.10 release.
> >>
> >> The approach is just changing the default SQL CLI yaml
> configuration[5]. In
> >> this way, the existing
> >> environment is still compatible and unaffected.
> >>
> >> Changing the default planner for the whole Table API & SQL is another
> topic
> >> and is out of scope of this discussion.
> >>
> >> What do you think?
> >>
> >> Best,
> >> Jark
> >>
> >> [1]:
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
> >> [2]:
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#top-n
> >> [3]:
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
> >> [4]:
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html
> >> [5]:
> >>
> https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/conf/sql-client-defaults.yaml#L100
> >
>
>

-- 
Best, Jingsong Lee