Re: 向社区提交代码怎么自己验证

2019-09-27 Thread Dian Fu
>> 1. 我在本地执行doc目录下边的build_docs.sh
>> -i是没有问题的。那么我能否重启开一个PR,重新提交我的修改,如果不能,我在现在这个PR的基础上应该怎么做。

不要重新提交PR。如果想重新触发build,可以用命令:@flinkbot run travis


>> 2. 在我的PR中,我只有一个commit,为什么flinkbot会对3个comimt进行build,其中一个失败,两个成功,参考链接:
每次更新PR的时候,都会重新触发build,之前的build的历史也会保留下来

>> 3. 我提交的改动是文档内容,怎么会造成avro模块报错呢。

这个错误是个不稳定case,已经有一个JIRA在跟踪了[1]

[1] https://issues.apache.org/jira/browse/FLINK-14235 


Best,
Dian

> 在 2019年9月27日,下午10:01,Zili Chen  写道:
> 
> 你可以报个不稳定测试(x
> 
> Flink 很多测试跟并发相关,有一定可能在 check in 的时候通过 CI 后续阴魂不散的 fail(x
> 
> Best,
> tison.
> 
> 
> gaofeilong198...@163.com  于2019年9月27日周五 下午9:53写道:
> 
>> Dian Fu 谢谢你的答复,
>> 
>> build失败的log我拿出来放在这里了:
>> https://issues.apache.org/jira/projects/FLINK/issues/FLINK-14115?filter=myopenissues
>> 
>> 从这里还是看不出来原因。现在我有几个问题:
>> 1. 我在本地执行doc目录下边的build_docs.sh
>> -i是没有问题的。那么我能否重启开一个PR,重新提交我的修改,如果不能,我在现在这个PR的基础上应该怎么做。
>> 2. 在我的PR中,我只有一个commit,为什么flinkbot会对3个comimt进行build,其中一个失败,两个成功,参考链接:
>> https://github.com/apache/flink/pull/9749#issuecomment-534149758
>> 3. 我提交的改动是文档内容,怎么会造成avro模块报错呢。
>> 
>> 
>> 
>> gaofeilong198...@163.com
>> 
>> 发件人: Dian Fu
>> 发送时间: 2019-09-26 22:29
>> 收件人: user-zh
>> 主题: Re: 向社区提交代码怎么自己验证
>> 1)build失败的话,可以看一下失败原因,如果和这个PR没有关系,可以通过“@flinkbot run travis”重新触发travis
>> 2)本地可以通过“mvn clean verify”验证一下,详细可以看一下[1],我看你这个改动是doc相关的,一般来说,不会导致build失败
>> 
>> [1] https://flink.apache.org/contributing/contribute-code.html <
>> https://flink.apache.org/contributing/contribute-code.html>
>>> 在 2019年9月26日,下午9:56,高飞龙  写道:
>>> 
>>> hi,我在向社区提交PR时,提示build失败(
>> https://github.com/apache/flink/pull/9749#issuecomment-534149758)
>>> 
>>> 
>>> 我应该怎么做?在提交PR之前我可以执行什么脚本先在本地进行build测试吗?
>>> 
>>> 
>>> 
>>> 
>>> 
>>> --
>>> 
>>> 
>>> 
>>> gaofeilong198...@163.com
>> 
>> 



task-manager??taskslot??classloader????????

2019-09-27 Thread Ever
??job??2??task??task??3??subtask(3), 



subtask??taskslot?? 
??job??task??subtask??taskslot?? 
??taskslot2??subtask??
??share??taskslot??subtask?? ??classloader?? 
??subtask??classloader


job??(Scala??Objectjava??)?? 
??subtask??
 jvm??

Re: Best way to link static data to event data?

2019-09-27 Thread Sameer Wadkar
The main consideration in these type of scenarios is not the type of source 
function you use. The key point is how does the event operator get the slow 
moving master data and cache it. And then recover it if it fails and restarts 
again. 

It does not matter that the csv file does not change often. It is possible that 
the event operator may fail and restart. The csv data needs to made available 
to it again. 

In that scenario the initial suggestion I made to pass the csv data in the 
constructor is not adequate by itself. You need to store it in the operator 
state which allows it to recover it when it restarts  on failure.

As long as the above takes place you have resiliency and you can use any 
suitable method or source. I have not used Table source as much but connected 
streams and operator state has worked out for me in similar scenarios. 

Sameer

Sent from my iPhone

> On Sep 27, 2019, at 4:38 PM, John Smith  wrote:
> 
> It's a fairly small static file that may update once in a blue moon lol But 
> I'm hopping to use existing functions. Why can't I just use CSV to table 
> source?
> 
> Why should I have to now either write my own CSV parser or look for 3rd 
> party, then what put in a Java Map and lookup that map? I'm finding Flink to 
> be a bit of death by 1000 paper cuts lol
> 
> if i put the CSV in a table I can then use it to join across it with the 
> event no?
> 
>> On Fri, 27 Sep 2019 at 16:25, Sameer W  wrote:
>> Connected Streams is one option. But may be an overkill in your scenario if 
>> your CSV does not refresh. If your CSV is small enough (number of records 
>> wise), you could parse it and load it into an object (serializable) and pass 
>> it to the constructor of the operator where you will be streaming the data. 
>> 
>> If the CSV can be made available via a shared network folder (or S3 in case 
>> of AWS) you could also read it in the open function (if you use Rich 
>> versions of the operator).
>> 
>> The real problem I guess is how frequently does the CSV update. If you want 
>> the updates to propagate in near real time (or on schedule) the option 1  ( 
>> parse in driver and send it via constructor does not work). Also in the 
>> second option you need to be responsible for refreshing the file read from 
>> the shared folder.
>> 
>> In that case use Connected Streams where the stream reading in the file (the 
>> other stream reads the events) periodically re-reads the file and sends it 
>> down the stream. The refresh interval is your tolerance of stale data in the 
>> CSV.
>> 
>>> On Fri, Sep 27, 2019 at 3:49 PM John Smith  wrote:
>>> I don't think I need state for this...
>>> 
>>> I need to load a CSV. I'm guessing as a table and then filter my events 
>>> parse the number, transform the event into geolocation data and sink that 
>>> downstream data source.
>>> 
>>> So I'm guessing i need a CSV source and my Kafka source and somehow join 
>>> those transform the event...
>>> 
 On Fri, 27 Sep 2019 at 14:43, Oytun Tez  wrote:
 Hi,
 
 You should look broadcast state pattern in Flink docs.
 
 ---
 Oytun Tez
 
 M O T A W O R D
 The World's Fastest Human Translation Platform.
 oy...@motaword.com — www.motaword.com
 
 
> On Fri, Sep 27, 2019 at 2:42 PM John Smith  wrote:
> Using 1.8
> 
> I have a list of phone area codes, cities and their geo location in CSV 
> file. And my events from Kafka contain phone numbers.
> 
> I want to parse the phone number get it's area code and then associate 
> the phone number to a city, geo location and as well count how many 
> numbers are in that city/geo location.


Re: Best way to link static data to event data?

2019-09-27 Thread John Smith
It's a fairly small static file that may update once in a blue moon lol But
I'm hopping to use existing functions. Why can't I just use CSV to table
source?

Why should I have to now either write my own CSV parser or look for 3rd
party, then what put in a Java Map and lookup that map? I'm finding Flink
to be a bit of death by 1000 paper cuts lol

if i put the CSV in a table I can then use it to join across it with the
event no?

On Fri, 27 Sep 2019 at 16:25, Sameer W  wrote:

> Connected Streams is one option. But may be an overkill in your scenario
> if your CSV does not refresh. If your CSV is small enough (number of
> records wise), you could parse it and load it into an object (serializable)
> and pass it to the constructor of the operator where you will be streaming
> the data.
>
> If the CSV can be made available via a shared network folder (or S3 in
> case of AWS) you could also read it in the open function (if you use Rich
> versions of the operator).
>
> The real problem I guess is how frequently does the CSV update. If you
> want the updates to propagate in near real time (or on schedule) the option
> 1  ( parse in driver and send it via constructor does not work). Also in
> the second option you need to be responsible for refreshing the file read
> from the shared folder.
>
> In that case use Connected Streams where the stream reading in the file
> (the other stream reads the events) periodically re-reads the file and
> sends it down the stream. The refresh interval is your tolerance of stale
> data in the CSV.
>
> On Fri, Sep 27, 2019 at 3:49 PM John Smith  wrote:
>
>> I don't think I need state for this...
>>
>> I need to load a CSV. I'm guessing as a table and then filter my events
>> parse the number, transform the event into geolocation data and sink that
>> downstream data source.
>>
>> So I'm guessing i need a CSV source and my Kafka source and somehow join
>> those transform the event...
>>
>> On Fri, 27 Sep 2019 at 14:43, Oytun Tez  wrote:
>>
>>> Hi,
>>>
>>> You should look broadcast state pattern in Flink docs.
>>>
>>> ---
>>> Oytun Tez
>>>
>>> *M O T A W O R D*
>>> The World's Fastest Human Translation Platform.
>>> oy...@motaword.com — www.motaword.com
>>>
>>>
>>> On Fri, Sep 27, 2019 at 2:42 PM John Smith 
>>> wrote:
>>>
 Using 1.8

 I have a list of phone area codes, cities and their geo location in CSV
 file. And my events from Kafka contain phone numbers.

 I want to parse the phone number get it's area code and then associate
 the phone number to a city, geo location and as well count how many numbers
 are in that city/geo location.

>>>


Re: Best way to link static data to event data?

2019-09-27 Thread Sameer W
Connected Streams is one option. But may be an overkill in your scenario if
your CSV does not refresh. If your CSV is small enough (number of records
wise), you could parse it and load it into an object (serializable) and
pass it to the constructor of the operator where you will be streaming the
data.

If the CSV can be made available via a shared network folder (or S3 in case
of AWS) you could also read it in the open function (if you use Rich
versions of the operator).

The real problem I guess is how frequently does the CSV update. If you want
the updates to propagate in near real time (or on schedule) the option 1  (
parse in driver and send it via constructor does not work). Also in the
second option you need to be responsible for refreshing the file read from
the shared folder.

In that case use Connected Streams where the stream reading in the file
(the other stream reads the events) periodically re-reads the file and
sends it down the stream. The refresh interval is your tolerance of stale
data in the CSV.

On Fri, Sep 27, 2019 at 3:49 PM John Smith  wrote:

> I don't think I need state for this...
>
> I need to load a CSV. I'm guessing as a table and then filter my events
> parse the number, transform the event into geolocation data and sink that
> downstream data source.
>
> So I'm guessing i need a CSV source and my Kafka source and somehow join
> those transform the event...
>
> On Fri, 27 Sep 2019 at 14:43, Oytun Tez  wrote:
>
>> Hi,
>>
>> You should look broadcast state pattern in Flink docs.
>>
>> ---
>> Oytun Tez
>>
>> *M O T A W O R D*
>> The World's Fastest Human Translation Platform.
>> oy...@motaword.com — www.motaword.com
>>
>>
>> On Fri, Sep 27, 2019 at 2:42 PM John Smith 
>> wrote:
>>
>>> Using 1.8
>>>
>>> I have a list of phone area codes, cities and their geo location in CSV
>>> file. And my events from Kafka contain phone numbers.
>>>
>>> I want to parse the phone number get it's area code and then associate
>>> the phone number to a city, geo location and as well count how many numbers
>>> are in that city/geo location.
>>>
>>


Re: Best way to link static data to event data?

2019-09-27 Thread John Smith
I don't think I need state for this...

I need to load a CSV. I'm guessing as a table and then filter my events
parse the number, transform the event into geolocation data and sink that
downstream data source.

So I'm guessing i need a CSV source and my Kafka source and somehow join
those transform the event...

On Fri, 27 Sep 2019 at 14:43, Oytun Tez  wrote:

> Hi,
>
> You should look broadcast state pattern in Flink docs.
>
> ---
> Oytun Tez
>
> *M O T A W O R D*
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com
>
>
> On Fri, Sep 27, 2019 at 2:42 PM John Smith  wrote:
>
>> Using 1.8
>>
>> I have a list of phone area codes, cities and their geo location in CSV
>> file. And my events from Kafka contain phone numbers.
>>
>> I want to parse the phone number get it's area code and then associate
>> the phone number to a city, geo location and as well count how many numbers
>> are in that city/geo location.
>>
>


Re: Best way to link static data to event data?

2019-09-27 Thread Oytun Tez
Hi,

You should look broadcast state pattern in Flink docs.

---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


On Fri, Sep 27, 2019 at 2:42 PM John Smith  wrote:

> Using 1.8
>
> I have a list of phone area codes, cities and their geo location in CSV
> file. And my events from Kafka contain phone numbers.
>
> I want to parse the phone number get it's area code and then associate the
> phone number to a city, geo location and as well count how many numbers are
> in that city/geo location.
>


Best way to link static data to event data?

2019-09-27 Thread John Smith
Using 1.8

I have a list of phone area codes, cities and their geo location in CSV
file. And my events from Kafka contain phone numbers.

I want to parse the phone number get it's area code and then associate the
phone number to a city, geo location and as well count how many numbers are
in that city/geo location.


Re: Re: 向社区提交代码怎么自己验证

2019-09-27 Thread Zili Chen
你可以报个不稳定测试(x

Flink 很多测试跟并发相关,有一定可能在 check in 的时候通过 CI 后续阴魂不散的 fail(x

Best,
tison.


gaofeilong198...@163.com  于2019年9月27日周五 下午9:53写道:

>  Dian Fu 谢谢你的答复,
>
> build失败的log我拿出来放在这里了:
> https://issues.apache.org/jira/projects/FLINK/issues/FLINK-14115?filter=myopenissues
>
> 从这里还是看不出来原因。现在我有几个问题:
> 1. 我在本地执行doc目录下边的build_docs.sh
> -i是没有问题的。那么我能否重启开一个PR,重新提交我的修改,如果不能,我在现在这个PR的基础上应该怎么做。
> 2. 在我的PR中,我只有一个commit,为什么flinkbot会对3个comimt进行build,其中一个失败,两个成功,参考链接:
> https://github.com/apache/flink/pull/9749#issuecomment-534149758
> 3. 我提交的改动是文档内容,怎么会造成avro模块报错呢。
>
>
>
> gaofeilong198...@163.com
>
> 发件人: Dian Fu
> 发送时间: 2019-09-26 22:29
> 收件人: user-zh
> 主题: Re: 向社区提交代码怎么自己验证
> 1)build失败的话,可以看一下失败原因,如果和这个PR没有关系,可以通过“@flinkbot run travis”重新触发travis
> 2)本地可以通过“mvn clean verify”验证一下,详细可以看一下[1],我看你这个改动是doc相关的,一般来说,不会导致build失败
>
> [1] https://flink.apache.org/contributing/contribute-code.html <
> https://flink.apache.org/contributing/contribute-code.html>
> > 在 2019年9月26日,下午9:56,高飞龙  写道:
> >
> > hi,我在向社区提交PR时,提示build失败(
> https://github.com/apache/flink/pull/9749#issuecomment-534149758)
> >
> >
> > 我应该怎么做?在提交PR之前我可以执行什么脚本先在本地进行build测试吗?
> >
> >
> >
> >
> >
> > --
> >
> >
> >
> > gaofeilong198...@163.com
>
>


回复: Re: 向社区提交代码怎么自己验证

2019-09-27 Thread gaofeilong198...@163.com
 Dian Fu 谢谢你的答复,

build失败的log我拿出来放在这里了:https://issues.apache.org/jira/projects/FLINK/issues/FLINK-14115?filter=myopenissues

从这里还是看不出来原因。现在我有几个问题:
1. 我在本地执行doc目录下边的build_docs.sh 
-i是没有问题的。那么我能否重启开一个PR,重新提交我的修改,如果不能,我在现在这个PR的基础上应该怎么做。
2. 
在我的PR中,我只有一个commit,为什么flinkbot会对3个comimt进行build,其中一个失败,两个成功,参考链接:https://github.com/apache/flink/pull/9749#issuecomment-534149758
3. 我提交的改动是文档内容,怎么会造成avro模块报错呢。



gaofeilong198...@163.com
 
发件人: Dian Fu
发送时间: 2019-09-26 22:29
收件人: user-zh
主题: Re: 向社区提交代码怎么自己验证
1)build失败的话,可以看一下失败原因,如果和这个PR没有关系,可以通过“@flinkbot run travis”重新触发travis
2)本地可以通过“mvn clean verify”验证一下,详细可以看一下[1],我看你这个改动是doc相关的,一般来说,不会导致build失败
 
[1] https://flink.apache.org/contributing/contribute-code.html 

> 在 2019年9月26日,下午9:56,高飞龙  写道:
> 
> hi,我在向社区提交PR时,提示build失败(https://github.com/apache/flink/pull/9749#issuecomment-534149758)
> 
> 
> 我应该怎么做?在提交PR之前我可以执行什么脚本先在本地进行build测试吗?
> 
> 
> 
> 
> 
> --
> 
> 
> 
> gaofeilong198...@163.com
 


flink kafka consumer部分消息未消费

2019-09-27 Thread zenglong chen
一个test topic,手动往里面添加消息,flink消费后print输出,发现会漏掉部分添加的消息,问题可能出在哪里?前辈有人遇到过类似问题吗?


Re: 订阅邮件

2019-09-27 Thread Wesley Peng
Hello

You should not pose across multiple groups.
One thread to one group is more graceful.


杨利君 于2019年9月26日 周四下午3:42写道:

> 订阅flink社区邮件


Re: Flink- Heap Space running out

2019-09-27 Thread Nishant Gupta
Appoligies correction done to previous email

Hi Fabian and Mike

*flink-conf.yaml  [In a 3 node cluster having 120 GB memory each and 3 TB
hard disk ]*
jobmanager.heap.size: 50120m
taskmanager.heap.size: 50120m

*With Idle state retention having below configuration  (Same heap space
issue)  *
*execution:*
  planner: old
  type: streaming
  time-characteristic: event-time
  periodic-watermarks-interval: 200
  result-mode: table
  max-table-result-rows: 100
  parallelism: 3
  max-parallelism: 128

*  min-idle-state-retention: 30  max-idle-state-retention: 60  *

*With time-windowed join (Records gets missed out and duplicated based on
the timeinterval I push badips)*
*SELECT K.* FROM KafkaSource AS K, BadIP AS B WHERE K.sourceip = B.bad_ip
AND B.b_proctime BETWEEN K.k_proctime - INTERVAL '20' MINUTE AND
K.k_proctime + INTERVAL '5' MINUTE*

*I have tried Temporal functions - It is working fine*

I was really wishing to make it work with idle state and time window join.
Could you please check the configuration and query.
Please let me know if any other details are required

On Fri, Sep 27, 2019 at 12:41 PM Nishant Gupta 
wrote:

>
> Hi Fabian and Mike
>
> *flink-conf.yaml  [In a 3 node cluster having 120 GB memory each and 3 TB
> hard disk ]*
> jobmanager.heap.size: 50120m
> taskmanager.heap.size: 50120m
>
> *With Idle state retention having below configuration  (Same heap space
> issue)  *
> *execution:*
>   planner: old
>   type: streaming
>   time-characteristic: event-time
>   periodic-watermarks-interval: 200
>   result-mode: table
>   max-table-result-rows: 100
>   parallelism: 3
>   max-parallelism: 128
>
> *  min-idle-state-retention: 30  max-idle-state-retention: 60  *
>
> *With time-windowed join (Same heap space issue)*
> *SELECT K.* FROM KafkaSource AS K, BadIP AS B WHERE K.sourceip = B.bad_ip
> AND B.b_proctime BETWEEN K.k_proctime - INTERVAL '20' MINUTE AND
> K.k_proctime + INTERVAL '5' MINUTE*
>
> *I have tried Temporal functions - It is working fine*
>
> I was really wishing to make it work with idle state and time window join.
> Could you please check the configuration and query.
> Please let me know if any other details are required
>
>
> On Thu, Sep 26, 2019 at 8:46 PM Fabian Hueske  wrote:
>
>> Hi,
>>
>> I don' think that the memory configuration is the issue.
>> The problem is the join query. The join does not have any temporal
>> boundaries.
>> Therefore, both tables are completely stored in memory and never released.
>>
>> You can configure a memory eviction strategy via idle state retention [1]
>> but you should make sure that this is really what you want.
>> Alternatively, try a time-windowed join or a join with a temporal table
>> function.
>>
>> Best, Fabian
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/query_configuration.html#idle-state-retention-time
>>
>> Am Do., 26. Sept. 2019 um 17:08 Uhr schrieb miki haiat <
>> miko5...@gmail.com>:
>>
>>> You can configure the task manager memory in the config.yaml file.
>>> What is the current configuration?
>>>
>>> On Thu, Sep 26, 2019, 17:14 Nishant Gupta 
>>> wrote:
>>>
  am running a query to join a stream and a table as below. It is
 running out of heap space. Even though it has enough heap space in flink
 cluster (60GB * 3)

 Is there an eviction strategy needed for this query ?

 *SELECT sourceKafka.* FROM sourceKafka INNER JOIN DefaulterTable ON
 sourceKafka.CC=DefaulterTable.CC;  *

 Thanks

 Nishant

>>>


Increasing trend for state size of keyed stream using ProcessWindowFunction with ProcessingTimeSessionWindows

2019-09-27 Thread Oliwer Kostera
Hi all,


I'm using ProcessWindowFunction in a keyed stream with the following definition:

final SingleOutputStreamOperator processWindowFunctionStream =
 
keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100)))
.process(new CustomProcessWindowFunction())
.uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID)
.name("Process window function");


My checkpointing configuration is set to use RocksDB state backend with 
incremental checkpointing and EXACTLY_ONCE mode.

In a runtime I noticed that even though data ingestion is static - same keys 
and frequency of messages the size of the process window operator keeps 
increasing. I tried to reproduce it with minimal similar setup here: 
https://github.com/loliver1234/flink-process-window-function and was successful 
to do so.

Testing conditions:

  *   RabbitMQ source with Exactly-once guarantee and 65k prefetch count
  *   RabbitMQ sink to collect messages
  *   Simple ProcessWindowFunction that only pass messages through
  *   Stream time characteristic set to TimeCharacteristic.ProcessingTime

Testing scenario:

  *   Start flink job and check initial state size - State Size: 127 KB
  *   Start sending messages, 1000 same unique keys every 1s (they are not 
falling into defined time window gap set to 100ms, each message should create 
new window)
  *   State of the process window operator keeps increasing - after 1mln 
messages state ended up to be around 2mb
  *   Stop sending messages and wait till rabbit queue is fully consumed and 
few checkpoints go by
  *   Was expected to see state size to decrease to base value but it stayed at 
2mb
  *   Continue to send messages with the same keys and state kept increasing 
trend.

What I checked:

  *   Registration and deregistration of timers set for time windows - each 
registration matched its deregistration
  *   Checked that in fact there are no window merges
  *   Tried custom Trigger disabling window merges and setting onProcessingTime 
trigger to TriggerResult.FIRE_AND_PURGE - same state behavior

Tested with:

  *   Local Flink Mini Cluster running from IDE
  *   Flink ha standalone cluster  run in docker

On staging environment, we noticed that state for that operator keeps 
increasing indefinitely, after some months reaching even 1,5gb for 100k unique 
keys

With best regards

Oliwer

[https://www.adbglobal.com/wp-content/uploads/adb.png]
adbglobal.com
This message (including any attachments) may contain confidential, proprietary, 
privileged and/or private information. The information is intended for the use 
of the individual or entity designated above. If you are not the intended 
recipient of this message, please notify the sender immediately, and delete the 
message and any attachments. Any disclosure, reproduction, distribution or 
other use of this message or any attachments by an individual or entity other 
than the intended recipient is STRICTLY PROHIBITED.
Please note that ADB protects your privacy. Any personal information we collect 
from you is used in accordance with our Privacy 
Policy and in compliance with 
applicable European data protection law (Regulation (EU) 2016/679, General Data 
Protection Regulation) and other statutory provisions.


Flink ColumnStats

2019-09-27 Thread Flavio Pompermaier
Hi all,
I've seen that recently there was an ongoing effort about Flink ColumnStats
but I can't find a Flink job that computes Flink table stats, I found only
a code that does the conversion from Hive catalog.
Is there any Flink utility I can call to compute them on a Table?

We've tried to implement a specific job at
https://github.com/okkam-it/flink-descriptive-stats/blob/master/src/main/java/jar/ProfileJob.java
that
works on a DataSet but we didn't get any feedback about it..is there any
better way to achieve this?

Best,
Flavio


Re: Flink- Heap Space running out

2019-09-27 Thread Nishant Gupta
Hi Fabian and Mike

*flink-conf.yaml  [In a 3 node cluster having 120 GB memory each and 3 TB
hard disk ]*
jobmanager.heap.size: 50120m
taskmanager.heap.size: 50120m

*With Idle state retention having below configuration  (Same heap space
issue)  *
*execution:*
  planner: old
  type: streaming
  time-characteristic: event-time
  periodic-watermarks-interval: 200
  result-mode: table
  max-table-result-rows: 100
  parallelism: 3
  max-parallelism: 128

*  min-idle-state-retention: 30  max-idle-state-retention: 60  *

*With time-windowed join (Same heap space issue)*
*SELECT K.* FROM KafkaSource AS K, BadIP AS B WHERE K.sourceip = B.bad_ip
AND B.b_proctime BETWEEN K.k_proctime - INTERVAL '20' MINUTE AND
K.k_proctime + INTERVAL '5' MINUTE*

*I have tried Temporal functions - It is working fine*

I was really wishing to make it work with idle state and time window join.
Could you please check the configuration and query.
Please let me know if any other details are required


On Thu, Sep 26, 2019 at 8:46 PM Fabian Hueske  wrote:

> Hi,
>
> I don' think that the memory configuration is the issue.
> The problem is the join query. The join does not have any temporal
> boundaries.
> Therefore, both tables are completely stored in memory and never released.
>
> You can configure a memory eviction strategy via idle state retention [1]
> but you should make sure that this is really what you want.
> Alternatively, try a time-windowed join or a join with a temporal table
> function.
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/query_configuration.html#idle-state-retention-time
>
> Am Do., 26. Sept. 2019 um 17:08 Uhr schrieb miki haiat  >:
>
>> You can configure the task manager memory in the config.yaml file.
>> What is the current configuration?
>>
>> On Thu, Sep 26, 2019, 17:14 Nishant Gupta 
>> wrote:
>>
>>>  am running a query to join a stream and a table as below. It is running
>>> out of heap space. Even though it has enough heap space in flink cluster
>>> (60GB * 3)
>>>
>>> Is there an eviction strategy needed for this query ?
>>>
>>> *SELECT sourceKafka.* FROM sourceKafka INNER JOIN DefaulterTable ON
>>> sourceKafka.CC=DefaulterTable.CC;  *
>>>
>>> Thanks
>>>
>>> Nishant
>>>
>>


ClassNotFoundException when submitting a job

2019-09-27 Thread 163
Hi Guys,

Flink version is 1.9.0 and built against HDP.

I got the following exceptions when submitting a job using Hadoop input to read 
sequence file in hdfs.

Thanks for your help!

Qi


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: Could not submit 
job (JobID: 78025b8d9cfd49d2f94190fb11849033)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:244)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
at 
org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:280)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:382)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal 
server error., (JobManagerRunner.java:152)
at 
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:375)
at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: org.apache.flink.runtime.JobException: Creating the input splits 
caused an error: 
org/apache/flink/hadoop2/shaded/com/google/re2j/PatternSyntaxException
at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:270)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:907)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:230)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106)
at 
org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207)
at 
org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184)
at 
org.apache.flink.runtime.scheduler.LegacyScheduler.(LegacyScheduler.java:176)
at 
org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70)
at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275)
at 
org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:265)
at 

Re: Reading Key-Value from Kafka | Eviction policy.

2019-09-27 Thread srikanth flink
Hi Miki,

What are those several ways? could you help me with references?

Use case:

We have a continuous credit card transaction stream flowing into a Kafka
topic, along with a set of defaulters of credit card in a .csv file(which
gets updated every day).


Thanks

Srikanth


On Fri, Sep 27, 2019 at 11:11 AM miki haiat  wrote:

> I'm sure there is several ways to implement it. Can you elaborate more on
> your use case ?
>
> On Fri, Sep 27, 2019, 08:37 srikanth flink  wrote:
>
>> Hi,
>>
>> My data source is Kafka, all these days have been reading the values from
>> Kafka stream to a table. The table just grows and runs into a heap issue.
>>
>> Came across the eviction policy that works on only keys, right?
>>
>> Have researched to configure the environment file(Flink SLQ) to read both
>> key and value, so as the eviction works on the keys and older data is
>> cleared. I found nothing in the docs, so far.
>>
>> Could someone help with that?
>> If there's no support for reading key and value, can someone help me to
>> assign a key to the table I'm building from stream?
>>
>> Thanks
>> Srikanth
>>
>


Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-09-27 Thread Vijay Bhaskar
I don't think HA will help to recover from cluster crash, for that we
should take periodic savepoint right? Please correct me in case i am wrong

Regards
Bhaskar

On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar 
wrote:

> Suppose my cluster got crashed and need to bring up the entire cluster
> back? Does HA still helps to run the cluster from latest save point?
>
> Regards
> Bhaskar
>
> On Thu, Sep 26, 2019 at 7:44 PM Sean Hester 
> wrote:
>
>> thanks to everyone for all the replies.
>>
>> i think the original concern here with "just" relying on the HA option is
>> that there are some disaster recovery and data center migration use cases
>> where the continuity of the job managers is difficult to preserve. but
>> those are admittedly very edgy use cases. i think it's definitely worth
>> reviewing the SLAs with our site reliability engineers to see how likely it
>> would be to completely lose all job managers under an HA configuration.
>> that small a risk might be acceptable/preferable to a one-off solution.
>>
>> @Aleksander, would love to learn more about Zookeeper-less HA. i think i
>> spotted a thread somewhere between Till and someone (perhaps you) about
>> that. feel free to DM me.
>>
>> thanks again to everyone!
>>
>> On Thu, Sep 26, 2019 at 7:32 AM Yang Wang  wrote:
>>
>>> Hi, Aleksandar
>>>
>>> Savepoint option in standalone job cluster is optional. If you want to
>>> always recover
>>> from the latest checkpoint, just as Aleksandar and Yun Tang said you
>>> could use the
>>> high-availability configuration. Make sure the cluster-id is not
>>> changed, i think the job
>>> could recover both at exceptionally crash and restart by expectation.
>>>
>>> @Aleksandar Mastilovic , we are also have
>>> an zookeeper-less high-availability implementation[1].
>>> Maybe we could have some discussion and contribute this useful feature
>>> to the community.
>>>
>>> [1].
>>> https://docs.google.com/document/d/1Z-VdJlPPEQoWT1WLm5woM4y0bFep4FrgdJ9ipQuRv8g/edit
>>>
>>> Best,
>>> Yang
>>>
>>> Aleksandar Mastilovic  于2019年9月26日周四
>>> 上午4:11写道:
>>>
 Would you guys (Flink devs) be interested in our solution for
 zookeeper-less HA? I could ask the managers how they feel about
 open-sourcing the improvement.

 On Sep 25, 2019, at 11:49 AM, Yun Tang  wrote:

 As Aleksandar said, k8s with HA configuration could solve your problem.
 There already have some discussion about how to implement such HA in k8s if
 we don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2].
 Currently, you might only have to choose zookeeper as high-availability
 service.

 [1] https://issues.apache.org/jira/browse/FLINK-11105
 [2] https://issues.apache.org/jira/browse/FLINK-12884

 Best
 Yun Tang
 --
 *From:* Aleksandar Mastilovic 
 *Sent:* Thursday, September 26, 2019 1:57
 *To:* Sean Hester 
 *Cc:* Hao Sun ; Yuval Itzchakov ;
 user 
 *Subject:* Re: Challenges Deploying Flink With Savepoints On Kubernetes

 Can’t you simply use JobManager in HA mode? It would pick up where it
 left off if you don’t provide a Savepoint.

 On Sep 25, 2019, at 6:07 AM, Sean Hester 
 wrote:

 thanks for all replies! i'll definitely take a look at the Flink k8s
 Operator project.

 i'll try to restate the issue to clarify. this issue is specific to
 starting a job from a savepoint in job-cluster mode. in these cases the Job
 Manager container is configured to run a single Flink job at start-up. the
 savepoint needs to be provided as an argument to the entrypoint. the Flink
 documentation for this approach is here:


 https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint

 the issue is that taking this approach means that the job will *always*
  start from the savepoint provided as the start argument in the
 Kubernetes YAML. this includes unplanned restarts of the job manager, but
 we'd really prefer any *unplanned* restarts resume for the most recent
 checkpoint instead of restarting from the configured savepoint. so in a
 sense we want the savepoint argument to be transient, only being used
 during the initial deployment, but this runs counter to the design of
 Kubernetes which always wants to restore a deployment to the "goal state"
 as defined in the YAML.

 i hope this helps. if you want more details please let me know, and
 thanks again for your time.


 On Tue, Sep 24, 2019 at 1:09 PM Hao Sun  wrote:

 I think I overlooked it. Good point. I am using Redis to save the path
 to my savepoint, I might be able to set a TTL to avoid such issue.

 Hao Sun


 On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov 
 wrote:

 Hi Hao,

 I think he's exactly talking about the usecase where the JM/TM restart
 and they 

Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-09-27 Thread Vijay Bhaskar
Suppose my cluster got crashed and need to bring up the entire cluster
back? Does HA still helps to run the cluster from latest save point?

Regards
Bhaskar

On Thu, Sep 26, 2019 at 7:44 PM Sean Hester 
wrote:

> thanks to everyone for all the replies.
>
> i think the original concern here with "just" relying on the HA option is
> that there are some disaster recovery and data center migration use cases
> where the continuity of the job managers is difficult to preserve. but
> those are admittedly very edgy use cases. i think it's definitely worth
> reviewing the SLAs with our site reliability engineers to see how likely it
> would be to completely lose all job managers under an HA configuration.
> that small a risk might be acceptable/preferable to a one-off solution.
>
> @Aleksander, would love to learn more about Zookeeper-less HA. i think i
> spotted a thread somewhere between Till and someone (perhaps you) about
> that. feel free to DM me.
>
> thanks again to everyone!
>
> On Thu, Sep 26, 2019 at 7:32 AM Yang Wang  wrote:
>
>> Hi, Aleksandar
>>
>> Savepoint option in standalone job cluster is optional. If you want to
>> always recover
>> from the latest checkpoint, just as Aleksandar and Yun Tang said you
>> could use the
>> high-availability configuration. Make sure the cluster-id is not changed,
>> i think the job
>> could recover both at exceptionally crash and restart by expectation.
>>
>> @Aleksandar Mastilovic , we are also have
>> an zookeeper-less high-availability implementation[1].
>> Maybe we could have some discussion and contribute this useful feature to
>> the community.
>>
>> [1].
>> https://docs.google.com/document/d/1Z-VdJlPPEQoWT1WLm5woM4y0bFep4FrgdJ9ipQuRv8g/edit
>>
>> Best,
>> Yang
>>
>> Aleksandar Mastilovic  于2019年9月26日周四
>> 上午4:11写道:
>>
>>> Would you guys (Flink devs) be interested in our solution for
>>> zookeeper-less HA? I could ask the managers how they feel about
>>> open-sourcing the improvement.
>>>
>>> On Sep 25, 2019, at 11:49 AM, Yun Tang  wrote:
>>>
>>> As Aleksandar said, k8s with HA configuration could solve your problem.
>>> There already have some discussion about how to implement such HA in k8s if
>>> we don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2].
>>> Currently, you might only have to choose zookeeper as high-availability
>>> service.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-11105
>>> [2] https://issues.apache.org/jira/browse/FLINK-12884
>>>
>>> Best
>>> Yun Tang
>>> --
>>> *From:* Aleksandar Mastilovic 
>>> *Sent:* Thursday, September 26, 2019 1:57
>>> *To:* Sean Hester 
>>> *Cc:* Hao Sun ; Yuval Itzchakov ;
>>> user 
>>> *Subject:* Re: Challenges Deploying Flink With Savepoints On Kubernetes
>>>
>>> Can’t you simply use JobManager in HA mode? It would pick up where it
>>> left off if you don’t provide a Savepoint.
>>>
>>> On Sep 25, 2019, at 6:07 AM, Sean Hester 
>>> wrote:
>>>
>>> thanks for all replies! i'll definitely take a look at the Flink k8s
>>> Operator project.
>>>
>>> i'll try to restate the issue to clarify. this issue is specific to
>>> starting a job from a savepoint in job-cluster mode. in these cases the Job
>>> Manager container is configured to run a single Flink job at start-up. the
>>> savepoint needs to be provided as an argument to the entrypoint. the Flink
>>> documentation for this approach is here:
>>>
>>>
>>> https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint
>>>
>>> the issue is that taking this approach means that the job will *always* 
>>> start
>>> from the savepoint provided as the start argument in the Kubernetes YAML.
>>> this includes unplanned restarts of the job manager, but we'd really prefer
>>> any *unplanned* restarts resume for the most recent checkpoint instead
>>> of restarting from the configured savepoint. so in a sense we want the
>>> savepoint argument to be transient, only being used during the initial
>>> deployment, but this runs counter to the design of Kubernetes which always
>>> wants to restore a deployment to the "goal state" as defined in the YAML.
>>>
>>> i hope this helps. if you want more details please let me know, and
>>> thanks again for your time.
>>>
>>>
>>> On Tue, Sep 24, 2019 at 1:09 PM Hao Sun  wrote:
>>>
>>> I think I overlooked it. Good point. I am using Redis to save the path
>>> to my savepoint, I might be able to set a TTL to avoid such issue.
>>>
>>> Hao Sun
>>>
>>>
>>> On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov 
>>> wrote:
>>>
>>> Hi Hao,
>>>
>>> I think he's exactly talking about the usecase where the JM/TM restart
>>> and they come back up from the latest savepoint which might be stale by
>>> that time.
>>>
>>> On Tue, 24 Sep 2019, 19:24 Hao Sun,  wrote:
>>>
>>> We always make a savepoint before we shutdown the job-cluster. So the
>>> savepoint is always the latest. When we fix a bug or change the job graph,
>>> it can resume well.
>>> We only use checkpoints for 

使用flink-sql实现mysql维表的join的ddl和dml的示列

2019-09-27 Thread yelun
Hi,各位大佬:

有没有使用flink-sql实现mysql维表的join的ddl和dml的示列的demo能够参考一下,非常感谢。