Migrate custom partitioner from Flink 1.7 to Flink 1.9

2019-12-25 Thread Salva Alcántara
I am trying to migrate a custom dynamic partitioner from Flink 1.7 to Flink
1.9. The original partitioner implemented the `selectChannels` method within
the `StreamPartitioner` interface like this:

```java
// Original: working for Flink 1.7
//@Override
public int[] selectChannels(SerializationDelegate>
streamRecordSerializationDelegate,
int numberOfOutputChannels) {
T value =
streamRecordSerializationDelegate.getInstance().getValue();
if (value.f0.isBroadCastPartitioning()) {
// send to all channels
int[] channels = new int[numberOfOutputChannels];
for (int i = 0; i < numberOfOutputChannels; ++i) {
channels[i] = i;
}
return channels;
} else if (value.f0.getPartitionKey() == -1) {
// random partition
returnChannels[0] = random.nextInt(numberOfOutputChannels);
} else {
returnChannels[0] =
partitioner.partition(value.f0.getPartitionKey(), numberOfOutputChannels);
}
return returnChannels;
}

```

I am not sure how to migrate this to Flink 1.9, since the
`StreamPartitioner` interface has changed as illustrated below:


```java
// New: required by Flink 1.9
@Override
public int selectChannel(SerializationDelegate>
streamRecordSerializationDelegate) {
T value =
streamRecordSerializationDelegate.getInstance().getValue();
if (value.f0.isBroadCastPartitioning()) {
/* 
It is illegal to call this method for broadcast channel
selectors and this method can remain not 
implemented in that case (for example by throwing
UnsupportedOperationException).
*/
} else if (value.f0.getPartitionKey() == -1) {
// random partition
returnChannels[0] = random.nextInt(numberOfChannels);
} else {
returnChannels[0] =
partitioner.partition(value.f0.getPartitionKey(), numberOfChannels);
}
//return returnChannels;
return returnChannels[0];
}
```

Note that `selectChannels` has been replaced with `selectChannel`. So, it is
no longer possible to return multiple output channels as originally done
above for the case of broadcasted elements. As a matter of fact,
`selectChannel` should not be invoked for this particular case. Any thoughts
on how to tackle this?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink1.9批任务yn和ys对任务的影响

2019-12-25 Thread JingsongLee
Hi faaron zheng,

如kurt所说,强烈建议使用1.10,现在已拉分支。

TM运行的一个经验值是:TM有10个Slot,TM内存10G:JVM堆内4G、1G网络buffer、manage内存5G(也就是说单个slot的manage内存500M)。

Best,
Jingsong Lee


--
From:Kurt Young 
Send Time:2019年12月26日(星期四) 14:07
To:user-zh 
Subject:Re: Flink1.9批任务yn和ys对任务的影响

也可以试下最新的1.10版本,这个版本里面 sql 的算子已经不再申请固定写死的内存数量,
而是根据当时 slot 能提供多少 managed 内存来自适应了。

Best,
Kurt


On Thu, Dec 26, 2019 at 1:36 PM Xintong Song  wrote:

> slot需要多少内存是和具体作业相关的,不同作业差别会比较大。
>
> slot的资源需求是根据所有算子的资源需求相加得到的,如果你对你的作业用到了哪些算子比较了解的话,可以根据算子的资源需求推算出来。
> 算子的默认资源需求可以参考 [1],里面有五个“table.exec.resource.*”的配置项,也可以调整这些配置项来更改算子使用的内存。
>
> 如果对作业使用到的算子不是很了解的话,那比较简单的办法还是直接提交作业试试看,去日志里面搜"Request slot with
> profile"就能够看到slot的资源需求。
>
> Thank you~
>
> Xintong Song
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html#execution-options
>
> On Thu, Dec 26, 2019 at 11:36 AM faaron zheng 
> wrote:
>
> > 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed
> > memory为2g,也就是一个slot平均200m,所以任务没调度起来。
> > 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱:
> > faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月26日 11:23,faaron zheng 写道:
> > 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed
> > memory为2g,也就是一个slot平均200m,所以任务没调度起来。
> > 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱:
> > faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月25日 11:30,Xintong Song 写道:
> > Hi faaron, Flink 1.9 中 -yn参数应该是不生效的,后续版本中已经删除了这个参数。 根据你的参数,在每个 TM
> > 的内存为30G不变的情况下,每个 TM 中的slot个数(-ys)从5变成10,也就意味着平均每个slot占用的内存变为了原来的一半。 Flink
> > 1.9 的sql batch 算子对 flink managed memory 是有确定的需求的,很可能是这个变化导致单个 slot
> 的managed
> > memory无法满足算子的资源需求了。 Thank you~ Xintong Song On Wed, Dec 25, 2019 at 11:09
> > AM faaron zheng  wrote: > 跑tpcds的query1: flink
> run
> > -m yarn-cluster -d -p 100 -yn 20 -ys 5 -yjm 60g > -ytm 30g 任务可以正常执行 flink
> > run -m yarn-cluster -d -p 100 -yn 10 -ys 10 -yjm > 60g -ytm 30g
> > 任务在做hashjoin的时候就会失败 报错是No pooled slot available and request to >
> > ResourceManager for new slot failed 搞不懂这有啥关系,求指教 faaron zheng 邮箱: >
> > faaronzh...@gmail.com 签名由 网易邮箱大师 定制
>


Re: Flink 1.9 SQL Kafka Connector,Json format,how to deal with not json message?

2019-12-25 Thread Jark Wu
Hi LakeShen,

I'm sorry there is no such configuration for json format currently.
I think it makes sense to add such configuration like
'format.ignore-parse-errors' in csv format.
I created FLINK-15396[1] to track this.

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-15396

On Thu, 26 Dec 2019 at 11:44, LakeShen  wrote:

> Hi community,when I write the flink ddl sql like this:
>
> CREATE TABLE kafka_src (
>   id varchar,
>   a varchar,
>   b TIMESTAMP,
>   c TIMESTAMP
> )
>   with (
>...
> 'format.type' = 'json',
> 'format.property-version' = '1',
> 'format.derive-schema' = 'true',
> 'update-mode' = 'append'
> );
>
> If the message is not the json format ,there is a error in the log。
> My question is that how to deal with the message which it not json format?
> My thought is that I can catch the exception
> in JsonRowDeserializationSchema deserialize() method,is there any
> parameters to do this?
> Thanks your replay.
>
>


Re: Flink 1.9 SQL Kafka Connector,Json format,how to deal with not json message?

2019-12-25 Thread Jark Wu
Hi LakeShen,

I'm sorry there is no such configuration for json format currently.
I think it makes sense to add such configuration like
'format.ignore-parse-errors' in csv format.
I created FLINK-15396[1] to track this.

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-15396

On Thu, 26 Dec 2019 at 11:44, LakeShen  wrote:

> Hi community,when I write the flink ddl sql like this:
>
> CREATE TABLE kafka_src (
>   id varchar,
>   a varchar,
>   b TIMESTAMP,
>   c TIMESTAMP
> )
>   with (
>...
> 'format.type' = 'json',
> 'format.property-version' = '1',
> 'format.derive-schema' = 'true',
> 'update-mode' = 'append'
> );
>
> If the message is not the json format ,there is a error in the log。
> My question is that how to deal with the message which it not json format?
> My thought is that I can catch the exception
> in JsonRowDeserializationSchema deserialize() method,is there any
> parameters to do this?
> Thanks your replay.
>
>


Re: Flink1.9批任务yn和ys对任务的影响

2019-12-25 Thread Kurt Young
也可以试下最新的1.10版本,这个版本里面 sql 的算子已经不再申请固定写死的内存数量,
而是根据当时 slot 能提供多少 managed 内存来自适应了。

Best,
Kurt


On Thu, Dec 26, 2019 at 1:36 PM Xintong Song  wrote:

> slot需要多少内存是和具体作业相关的,不同作业差别会比较大。
>
> slot的资源需求是根据所有算子的资源需求相加得到的,如果你对你的作业用到了哪些算子比较了解的话,可以根据算子的资源需求推算出来。
> 算子的默认资源需求可以参考 [1],里面有五个“table.exec.resource.*”的配置项,也可以调整这些配置项来更改算子使用的内存。
>
> 如果对作业使用到的算子不是很了解的话,那比较简单的办法还是直接提交作业试试看,去日志里面搜"Request slot with
> profile"就能够看到slot的资源需求。
>
> Thank you~
>
> Xintong Song
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html#execution-options
>
> On Thu, Dec 26, 2019 at 11:36 AM faaron zheng 
> wrote:
>
> > 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed
> > memory为2g,也就是一个slot平均200m,所以任务没调度起来。
> > 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱:
> > faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月26日 11:23,faaron zheng 写道:
> > 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed
> > memory为2g,也就是一个slot平均200m,所以任务没调度起来。
> > 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱:
> > faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月25日 11:30,Xintong Song 写道:
> > Hi faaron, Flink 1.9 中 -yn参数应该是不生效的,后续版本中已经删除了这个参数。 根据你的参数,在每个 TM
> > 的内存为30G不变的情况下,每个 TM 中的slot个数(-ys)从5变成10,也就意味着平均每个slot占用的内存变为了原来的一半。 Flink
> > 1.9 的sql batch 算子对 flink managed memory 是有确定的需求的,很可能是这个变化导致单个 slot
> 的managed
> > memory无法满足算子的资源需求了。 Thank you~ Xintong Song On Wed, Dec 25, 2019 at 11:09
> > AM faaron zheng  wrote: > 跑tpcds的query1: flink
> run
> > -m yarn-cluster -d -p 100 -yn 20 -ys 5 -yjm 60g > -ytm 30g 任务可以正常执行 flink
> > run -m yarn-cluster -d -p 100 -yn 10 -ys 10 -yjm > 60g -ytm 30g
> > 任务在做hashjoin的时候就会失败 报错是No pooled slot available and request to >
> > ResourceManager for new slot failed 搞不懂这有啥关系,求指教 faaron zheng 邮箱: >
> > faaronzh...@gmail.com 签名由 网易邮箱大师 定制
>


Re: testing - syncing data timeline

2019-12-25 Thread Kurt Young
Lets say you keep your #1, which does hourly counting, and emit result to
the merged
new #2. The new #2 would first keep all hourly result in state, and also
keep tracking
whether it already receive all 24 results belong to same day. Once you
received all 24
count belong to the same day, you can start your logic. You could also
determine what
kind of data you want to keep in state after that.

Best,
Kurt


On Thu, Dec 26, 2019 at 1:14 PM Avi Levi  wrote:

> not sure that I can see how it is simpler. #2 is time window per day it
> emits the highest hour for that day. #4 is not a time window it keeps
> history of several days . if I want to put the logic of #2 I will need to
> manage it with timers, correct ?
>
> On Thu, Dec 26, 2019 at 6:28 AM Kurt Young  wrote:
>
>> *This Message originated outside your organization.*
>> --
>> Hi,
>>
>> You can merge the logic of #2 into #4, it will be much simpler.
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Dec 25, 2019 at 7:36 PM Avi Levi  wrote:
>>
>>>  Hi ,
>>>
>>> I have the following pipeline :
>>> 1. single hour window that counts the number of records
>>> 2. single day window that accepts the aggregated data from #1 and emits
>>> the highest hour count of that day
>>> 3. union #1 + #2
>>> 4. Logic operator that accepts the data from #3 and keep a listState of
>>> #2 and apply some logic on #1 based on that state (e.g comparing a single
>>> hour the history of the max hours at the last X days ) and emits the result
>>>
>>> the timestamsAndWaterMarks is
>>> using BoundedOutOfOrdernessTimestampExtractor (event-time)  and I allow
>>> lateness of 3 hours
>>>
>>>  the problem is that when I try to do unit tests of all the pipeline,
>>> the data from #1 rich #4 before the latter accepts the data from #3 hence
>>> it doesn't have any state yet (state is always empty when the stream from
>>> #1 arrives ).
>>> My source in the tests is a collection that represents the records.
>>>  is there anyway I can solve this ?
>>> [image: Screen Shot 2019-12-25 at 13.04.17.png]
>>> I appreciate any help you can provide
>>> Cheers
>>> Avi
>>>
>>>
>>>


Re: Flink1.9批任务yn和ys对任务的影响

2019-12-25 Thread Xintong Song
slot需要多少内存是和具体作业相关的,不同作业差别会比较大。

slot的资源需求是根据所有算子的资源需求相加得到的,如果你对你的作业用到了哪些算子比较了解的话,可以根据算子的资源需求推算出来。
算子的默认资源需求可以参考 [1],里面有五个“table.exec.resource.*”的配置项,也可以调整这些配置项来更改算子使用的内存。

如果对作业使用到的算子不是很了解的话,那比较简单的办法还是直接提交作业试试看,去日志里面搜"Request slot with
profile"就能够看到slot的资源需求。

Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html#execution-options

On Thu, Dec 26, 2019 at 11:36 AM faaron zheng  wrote:

> 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed
> memory为2g,也就是一个slot平均200m,所以任务没调度起来。
> 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱:
> faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月26日 11:23,faaron zheng 写道:
> 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed
> memory为2g,也就是一个slot平均200m,所以任务没调度起来。
> 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱:
> faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月25日 11:30,Xintong Song 写道:
> Hi faaron, Flink 1.9 中 -yn参数应该是不生效的,后续版本中已经删除了这个参数。 根据你的参数,在每个 TM
> 的内存为30G不变的情况下,每个 TM 中的slot个数(-ys)从5变成10,也就意味着平均每个slot占用的内存变为了原来的一半。 Flink
> 1.9 的sql batch 算子对 flink managed memory 是有确定的需求的,很可能是这个变化导致单个 slot 的managed
> memory无法满足算子的资源需求了。 Thank you~ Xintong Song On Wed, Dec 25, 2019 at 11:09
> AM faaron zheng  wrote: > 跑tpcds的query1: flink run
> -m yarn-cluster -d -p 100 -yn 20 -ys 5 -yjm 60g > -ytm 30g 任务可以正常执行 flink
> run -m yarn-cluster -d -p 100 -yn 10 -ys 10 -yjm > 60g -ytm 30g
> 任务在做hashjoin的时候就会失败 报错是No pooled slot available and request to >
> ResourceManager for new slot failed 搞不懂这有啥关系,求指教 faaron zheng 邮箱: >
> faaronzh...@gmail.com 签名由 网易邮箱大师 定制


Re: testing - syncing data timeline

2019-12-25 Thread Avi Levi
not sure that I can see how it is simpler. #2 is time window per day it
emits the highest hour for that day. #4 is not a time window it keeps
history of several days . if I want to put the logic of #2 I will need to
manage it with timers, correct ?

On Thu, Dec 26, 2019 at 6:28 AM Kurt Young  wrote:

> *This Message originated outside your organization.*
> --
> Hi,
>
> You can merge the logic of #2 into #4, it will be much simpler.
>
> Best,
> Kurt
>
>
> On Wed, Dec 25, 2019 at 7:36 PM Avi Levi  wrote:
>
>>  Hi ,
>>
>> I have the following pipeline :
>> 1. single hour window that counts the number of records
>> 2. single day window that accepts the aggregated data from #1 and emits
>> the highest hour count of that day
>> 3. union #1 + #2
>> 4. Logic operator that accepts the data from #3 and keep a listState of
>> #2 and apply some logic on #1 based on that state (e.g comparing a single
>> hour the history of the max hours at the last X days ) and emits the result
>>
>> the timestamsAndWaterMarks is
>> using BoundedOutOfOrdernessTimestampExtractor (event-time)  and I allow
>> lateness of 3 hours
>>
>>  the problem is that when I try to do unit tests of all the pipeline, the
>> data from #1 rich #4 before the latter accepts the data from #3 hence it
>> doesn't have any state yet (state is always empty when the stream from #1
>> arrives ).
>> My source in the tests is a collection that represents the records.
>>  is there anyway I can solve this ?
>> [image: Screen Shot 2019-12-25 at 13.04.17.png]
>> I appreciate any help you can provide
>> Cheers
>> Avi
>>
>>
>>


Re: Aggregating Movie Rental information in a DynamoDB table using DynamoDB streams and Flink

2019-12-25 Thread Zhijiang
Hi Joe,

Your requirement is the effective exactly-once for external sink. I think your 
option 4 with TwoPhaseCommitSinkFunction is the right way to go.
Unfortunately I am not quite familiar with this part, so can not give you 
specific suggestions for using it, especially for your concern of storing 
checkpoint id.
After the holiday some guys with rich experienced with it can provide you more 
professional ideas I guess. :)

ATM you can refer to the simple implementation 
TwoPhaseCommitSinkFunctionTest#ContentDumpSinkFunction and complex one 
FlinkKafkaProducer for more insights.
In addition, the StreamingFileSink also implements the exactly-once for sink. 
You might also refer to it to get some insights if possible.

Best,
Zhijiang




--
From:Joe Hansen 
Send Time:2019 Dec. 26 (Thu.) 01:42
To:user 
Subject:Aggregating Movie Rental information in a DynamoDB table using DynamoDB 
streams and Flink

Happy Holidays everyone!

tl;dr: I need to aggregate movie rental information that is being
stored in one DynamoDB table and store running total of the
aggregation in another table. How do I ensure exactly-once
aggregation.

I currently store movie rental information in a DynamoDB table named
MovieRentals: {movie_title, rental_period_in_days, order_date,
rent_amount}

We have millions of movie rentals happening on any given day.  Our web
application needs to display the aggregated rental amount for any
given movie title.

I am planning to use Flink to aggregate rental amounts by movie_title
on the MovieRental DynamoDB stream and store the aggregated rental
amounts in another DynamoDB table named RentalAmountsByMovie:
{movie_title, total_rental_amount}

How do I ensure that RentalAmountsByMovie amounts are accurate. i.e.
How do I prevent results from any checkpoint from not updating the
RentalAmountsByMovie table records more than once?

1) Do I need to store checkpoint ids in the RentalAmountsByMovie table
and do conditional updates to handle the scenario described above?
2) I can possibly implement TwoPhaseCommitSinkFunction that talks to
DynamoDB. However, according to Flink documentation the commit
function can be called more than once and hence needs to be
idempotent. So even this solution requires checkpoint-ids to be stored
on the target store.
3) Another pattern seems to be storing the time-window aggregation
results in the RentalAmountsByMovie table. And the webapp will have to
compute the running total on the fly. I don't like this solution for
its latency implications to the webapp.
4) May be I can use Flink's Queryable state feature. However, that
feature seems to be in Beta:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/queryable_state.html

I imagine this is a very common aggregation use case. How do folks
usually handle **updating aggregated results in Flink external
sinks**?

I appreciate any pointers. Happy to provide more details if needed.

Thanks!



Re: testing - syncing data timeline

2019-12-25 Thread Kurt Young
Hi,

You can merge the logic of #2 into #4, it will be much simpler.

Best,
Kurt


On Wed, Dec 25, 2019 at 7:36 PM Avi Levi  wrote:

>  Hi ,
>
> I have the following pipeline :
> 1. single hour window that counts the number of records
> 2. single day window that accepts the aggregated data from #1 and emits
> the highest hour count of that day
> 3. union #1 + #2
> 4. Logic operator that accepts the data from #3 and keep a listState of #2
> and apply some logic on #1 based on that state (e.g comparing a single hour
> the history of the max hours at the last X days ) and emits the result
>
> the timestamsAndWaterMarks is
> using BoundedOutOfOrdernessTimestampExtractor (event-time)  and I allow
> lateness of 3 hours
>
>  the problem is that when I try to do unit tests of all the pipeline, the
> data from #1 rich #4 before the latter accepts the data from #3 hence it
> doesn't have any state yet (state is always empty when the stream from #1
> arrives ).
> My source in the tests is a collection that represents the records.
>  is there anyway I can solve this ?
> [image: Screen Shot 2019-12-25 at 13.04.17.png]
> I appreciate any help you can provide
> Cheers
> Avi
>
>
>


source并行度不同导致任务没有数据落地

2019-12-25 Thread zhaorui_9...@163.com
hi all:
   
最近碰到一个很头疼的事情,两个任务相同的sql语句不同的source,任务的并行度为8,一个source是kafka一个source是rabbitmq,kafka和rabbitmq中加载相同的数据后,source为rabbitmq的任务有数据落地,source为kafka的任务运行好几次都不见有数据落地。因为sql中涉及到了窗口,所以考虑过kafka多partition对数据读取顺序的影响,将所有数据都加载到kafka的同一个partition中重启任务后发现还是没有数据落地。考虑到这两个任务唯一的不同点就是源为rabbitmq的任务source算子的并行度为1,所以将源为kafka的任务的source并行度也设为1,运行任务后发现有数据落地了。source并行度的改变应该只是改变了一下source与其它算子之间的数据传递方式,这种改变会对最终的结果造成影响吗?有没有大佬碰到过相同的问题?
flink版本1.9.1
sql:select count(ps_comment) col1,ceil(stddev_pop(ps_availqty)) col2,
   tumble_start(over_time,interval '72' hour) col3,
   tumble_end(over_time,interval '72' hour) col4,
   ps_date
from cirrostream_kafka_ck_source_03_8x3
where ps_availqty <= 489
  and ps_supplycost > 998
  and ps_comment not like '%ff%'
  and ps_partkey <= 3751122
   or ps_suppkey = 723
group by ps_date,ps_availqty,tumble(over_time,interval '72' hour)
having min(ps_partkey) not in (3525711,3738707,3740245)



zhaorui_9...@163.com


Flink 1.9 SQL Kafka Connector,Json format,how to deal with not json message?

2019-12-25 Thread LakeShen
Hi community,when I write the flink ddl sql like this:

CREATE TABLE kafka_src (
  id varchar,
  a varchar,
  b TIMESTAMP,
  c TIMESTAMP
)
  with (
   ...
'format.type' = 'json',
'format.property-version' = '1',
'format.derive-schema' = 'true',
'update-mode' = 'append'
);

If the message is not the json format ,there is a error in the log。
My question is that how to deal with the message which it not json format?
My thought is that I can catch the exception
in JsonRowDeserializationSchema deserialize() method,is there any
parameters to do this?
Thanks your replay.


Flink 1.9 SQL Kafka Connector,Json format,how to deal with not json message?

2019-12-25 Thread LakeShen
Hi community,when I write the flink ddl sql like this:

CREATE TABLE kafka_src (
  id varchar,
  a varchar,
  b TIMESTAMP,
  c TIMESTAMP
)
  with (
   ...
'format.type' = 'json',
'format.property-version' = '1',
'format.derive-schema' = 'true',
'update-mode' = 'append'
);

If the message is not the json format ,there is a error in the log。
My question is that how to deal with the message which it not json format?
My thought is that I can catch the exception
in JsonRowDeserializationSchema deserialize() method,is there any
parameters to do this?
Thanks your replay.


Re: Flink实现Kafka到Mysql的 End-To-End Exactly-Once中遇到的问题

2019-12-25 Thread LakeShen
是否可以尝试使用幂等来解决 端到端的一致性

Best wishes,
沈磊

卢伟楠  于2019年12月25日周三 下午4:09写道:

> 各位大佬好:
>
> 最近是实现Kafka到Mysql的 End-To-End Exactly-Once中遇到以下2个问题:
> 1:com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException:
> Communications link failure during commit(). Transaction resolution unknown.
> 2:org.apache.flink.streaming.runtime.tasks.TimerException:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
>
> 已经做了一个最简单的复现问题的demo,求指教
> git clone https://github.com/lusecond/flink_help --depth=1
>
>
> 测试过程中,发现继承TwoPhaseCommitSinkFunction类的4个重写方法beginTransaction、preCommit、commit、abort
> 分别在不同的线程工作,怀疑过因为线程切换导致jdbc的事务提交出问题,已经做过相关测试排除不是由此引起的问题


回复:Flink1.9批任务yn和ys对任务的影响

2019-12-25 Thread faaron zheng
感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed 
memory为2g,也就是一个slot平均200m,所以任务没调度起来。 
但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 
邮箱:faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月26日 11:23,faaron zheng 写道: 
感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed 
memory为2g,也就是一个slot平均200m,所以任务没调度起来。 
但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 
邮箱:faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月25日 11:30,Xintong Song 写道: Hi 
faaron, Flink 1.9 中 -yn参数应该是不生效的,后续版本中已经删除了这个参数。 根据你的参数,在每个 TM 的内存为30G不变的情况下,每个 
TM 中的slot个数(-ys)从5变成10,也就意味着平均每个slot占用的内存变为了原来的一半。 Flink 1.9 的sql batch 算子对 
flink managed memory 是有确定的需求的,很可能是这个变化导致单个 slot 的managed memory无法满足算子的资源需求了。 
Thank you~ Xintong Song On Wed, Dec 25, 2019 at 11:09 AM faaron zheng 
 wrote: > 跑tpcds的query1: flink run -m yarn-cluster -d -p 
100 -yn 20 -ys 5 -yjm 60g > -ytm 30g 任务可以正常执行 flink run -m yarn-cluster -d -p 
100 -yn 10 -ys 10 -yjm > 60g -ytm 30g 任务在做hashjoin的时候就会失败 报错是No pooled slot 
available and request to > ResourceManager for new slot failed 搞不懂这有啥关系,求指教 
faaron zheng 邮箱: > faaronzh...@gmail.com 签名由 网易邮箱大师 定制

Re: Rewind offset to a previous position and ensure certainty.

2019-12-25 Thread Zhijiang
If I understood correctly, different partitions of Kafka would be emitted by 
different source tasks with different watermark progress.  And the Flink 
framework would align the different watermarks to only output the smallest 
watermark among them, so the events from slow partitions would not be discarded 
because the downstream operator would only see the watermark based on the slow 
partition atm. You can refer to [1] for some details.

As for rewinding the offset of partition position, I guess it only happens in 
failure recovery case or you manually restart the job. Anyway all the topology 
tasks would be restarted and previous received watermarks are cleared.
So it would also not discard the events in this case.  Unless you can only 
rewind some source task to previous positions and keep other downstream tasks 
still running, it might have the issues you concern. But Flink can not support 
such operation/function atm. :) 

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_timestamps_watermarks.html

Best,
Zhijiang
--
From:邢瑞斌 
Send Time:2019 Dec. 25 (Wed.) 20:27
To:user-zh ; user 
Subject:Rewind offset to a previous position and ensure certainty.

Hi,

I'm trying to use Kafka as an event store and I want to create several 
partitions to improve read/write throughput. Occasionally I need to rewind 
offset to a previous position for recomputing. Since order isn't guaranteed 
among partitions in Kafka, does this mean that Flink won't produce the same 
results as before when rewind even if it uses event time? For example, consumer 
for a partition progresses extremely fast and raises watermark, so events from 
other partitions are discarded. Is there any ways to prevent this from 
happening?

Thanks in advance!

Ruibin



Re: Rewind offset to a previous position and ensure certainty.

2019-12-25 Thread Zhijiang
If I understood correctly, different partitions of Kafka would be emitted by 
different source tasks with different watermark progress.  And the Flink 
framework would align the different watermarks to only output the smallest 
watermark among them, so the events from slow partitions would not be discarded 
because the downstream operator would only see the watermark based on the slow 
partition atm. You can refer to [1] for some details.

As for rewinding the offset of partition position, I guess it only happens in 
failure recovery case or you manually restart the job. Anyway all the topology 
tasks would be restarted and previous received watermarks are cleared.
So it would also not discard the events in this case.  Unless you can only 
rewind some source task to previous positions and keep other downstream tasks 
still running, it might have the issues you concern. But Flink can not support 
such operation/function atm. :) 

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_timestamps_watermarks.html

Best,
Zhijiang
--
From:邢瑞斌 
Send Time:2019 Dec. 25 (Wed.) 20:27
To:user-zh ; user 
Subject:Rewind offset to a previous position and ensure certainty.

Hi,

I'm trying to use Kafka as an event store and I want to create several 
partitions to improve read/write throughput. Occasionally I need to rewind 
offset to a previous position for recomputing. Since order isn't guaranteed 
among partitions in Kafka, does this mean that Flink won't produce the same 
results as before when rewind even if it uses event time? For example, consumer 
for a partition progresses extremely fast and raises watermark, so events from 
other partitions are discarded. Is there any ways to prevent this from 
happening?

Thanks in advance!

Ruibin



回复:Flink1.9批任务yn和ys对任务的影响

2019-12-25 Thread faaron zheng
感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed 
memory为2g,也就是一个slot平均200m,所以任务没调度起来。 
但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 
邮箱:faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月25日 11:30,Xintong Song 写道: Hi 
faaron, Flink 1.9 中 -yn参数应该是不生效的,后续版本中已经删除了这个参数。 根据你的参数,在每个 TM 的内存为30G不变的情况下,每个 
TM 中的slot个数(-ys)从5变成10,也就意味着平均每个slot占用的内存变为了原来的一半。 Flink 1.9 的sql batch 算子对 
flink managed memory 是有确定的需求的,很可能是这个变化导致单个 slot 的managed memory无法满足算子的资源需求了。 
Thank you~ Xintong Song On Wed, Dec 25, 2019 at 11:09 AM faaron zheng 
 wrote: > 跑tpcds的query1: flink run -m yarn-cluster -d -p 
100 -yn 20 -ys 5 -yjm 60g > -ytm 30g 任务可以正常执行 flink run -m yarn-cluster -d -p 
100 -yn 10 -ys 10 -yjm > 60g -ytm 30g 任务在做hashjoin的时候就会失败 报错是No pooled slot 
available and request to > ResourceManager for new slot failed 搞不懂这有啥关系,求指教 
faaron zheng 邮箱: > faaronzh...@gmail.com 签名由 网易邮箱大师 定制

Re: 关于 FLink historyserver没有completed-jobs的问题

2019-12-25 Thread pengchenglin

flink-conf.yaml里需要有这些配置
historyserver.web.port: 8082
historyserver.web.address: 0.0.0.0
historyserver.archive.fs.refresh-interval: 1
historyserver.archive.fs.dir: hdfs://127.0.0.1:8020/flink/v1.1/completed-jobs/
jobmanager.archive.fs.dir: hdfs://127.0.0.1:8020/flink/v1.1/completed-jobs/
#多少秒后,会将完成的任务提交到history
jobstore.expiration-time: 14400
jobmanager.archive.fs.dir和historyserver.archive.fs.dir一样即可
然后启动bin/historyserver.sh start
访问ip:8082,需要跑一个任务,并且等待jobstore.expiration-time这个时间,才会有数据
 
发件人: 起子
发送时间: 2019-12-25 15:57
收件人: user-zh
主题: 关于 FLink historyserver没有completed-jobs的问题
大神们:
我启动了flink的historyserver,但是里面并没有已完成的任务
配置如下:

结果界面如下:
hdfs如下:
麻烦大神们给与指导
 部门 / 数据平台
 花名 / 起子
 Mobile :159 8810 1848
 WeChat :159 8810 1848
 Email :q...@dian.so
 Address :浙江省杭州市余杭区文一西路998号5#705



Re: flink 维表关联

2019-12-25 Thread lucas.wu
Hi 李现
现实确实很难做到对流表进行全量的join,如需全量,state会占用很大的存储,而且后续迁移很困难。请问一下你说的这个方案可以举个例子吗?


原始邮件
发件人:李现stormallin2...@gmail.com
收件人:user-zhuser...@flink.apache.org
发送时间:2019年12月26日(周四) 08:44
主题:Re: flink 维表关联


流的大小应该不是无限制的,应该是有个窗口期?窗口期之外的数据离线处理? xin Destiny 
nj18652727...@gmail.com于2019年12月25日 周三18:13写道:  Hi,lucas.wu:   
我个人觉得可以把join的条件和流对应的数据存放在mapstate中,每次维表的缓存更新数据之后,去mapstate中查询,如果存在对应的KV,将新关联后的数据下发;
  不过这样state会占用很大的内存,需要主意state的清理   lucas.wu lucas...@xiaoying.com 
于2019年12月25日周三 下午5:13写道:hi all:   
flink的kafka流表与hbase维表关联,维表后面有变动的话,如何将之前关联过的数据进行更新?

Re: Rewind offset to a previous position and ensure certainty.

2019-12-25 Thread vino yang
Hi Ruibin,

Are you finding how to generate watermark pre Kafka partition?
Flink provides Kafka-partition-aware watermark generation. [1]

Best,
Vino

[1]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition

邢瑞斌  于2019年12月25日周三 下午8:27写道:

> Hi,
>
> I'm trying to use Kafka as an event store and I want to create several
> partitions to improve read/write throughput. Occasionally I need to rewind
> offset to a previous position for recomputing. Since order isn't guaranteed
> among partitions in Kafka, does this mean that Flink won't produce the same
> results as before when rewind even if it uses event time? For example,
> consumer for a partition progresses extremely fast and raises watermark, so
> events from other partitions are discarded. Is there any ways to prevent
> this from happening?
>
> Thanks in advance!
>
> Ruibin
>


Re: Rewind offset to a previous position and ensure certainty.

2019-12-25 Thread vino yang
Hi Ruibin,

Are you finding how to generate watermark pre Kafka partition?
Flink provides Kafka-partition-aware watermark generation. [1]

Best,
Vino

[1]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition

邢瑞斌  于2019年12月25日周三 下午8:27写道:

> Hi,
>
> I'm trying to use Kafka as an event store and I want to create several
> partitions to improve read/write throughput. Occasionally I need to rewind
> offset to a previous position for recomputing. Since order isn't guaranteed
> among partitions in Kafka, does this mean that Flink won't produce the same
> results as before when rewind even if it uses event time? For example,
> consumer for a partition progresses extremely fast and raises watermark, so
> events from other partitions are discarded. Is there any ways to prevent
> this from happening?
>
> Thanks in advance!
>
> Ruibin
>


回复: flink 维表关联

2019-12-25 Thread 叶贤勋
可以使用guava实现维表数据缓存在jvm,可以设置缓存数据有效期


| |
叶贤勋
|
|
yxx_c...@163.com
|
签名由网易邮箱大师定制


在2019年12月26日 08:44,李现 写道:
流的大小应该不是无限制的,应该是有个窗口期?窗口期之外的数据离线处理?

xin Destiny 于2019年12月25日 周三18:13写道:

Hi,lucas.wu:

我个人觉得可以把join的条件和流对应的数据存放在mapstate中,每次维表的缓存更新数据之后,去mapstate中查询,如果存在对应的KV,将新关联后的数据下发;
不过这样state会占用很大的内存,需要主意state的清理

lucas.wu  于2019年12月25日周三 下午5:13写道:

hi all:
flink的kafka流表与hbase维表关联,维表后面有变动的话,如何将之前关联过的数据进行更新?



Re: flink 维表关联

2019-12-25 Thread 李现
流的大小应该不是无限制的,应该是有个窗口期?窗口期之外的数据离线处理?

xin Destiny 于2019年12月25日 周三18:13写道:

> Hi,lucas.wu:
>
> 我个人觉得可以把join的条件和流对应的数据存放在mapstate中,每次维表的缓存更新数据之后,去mapstate中查询,如果存在对应的KV,将新关联后的数据下发;
> 不过这样state会占用很大的内存,需要主意state的清理
>
> lucas.wu  于2019年12月25日周三 下午5:13写道:
>
> > hi all:
> > flink的kafka流表与hbase维表关联,维表后面有变动的话,如何将之前关联过的数据进行更新?
>


Aggregating Movie Rental information in a DynamoDB table using DynamoDB streams and Flink

2019-12-25 Thread Joe Hansen
Happy Holidays everyone!

tl;dr: I need to aggregate movie rental information that is being
stored in one DynamoDB table and store running total of the
aggregation in another table. How do I ensure exactly-once
aggregation.

I currently store movie rental information in a DynamoDB table named
MovieRentals: {movie_title, rental_period_in_days, order_date,
rent_amount}

We have millions of movie rentals happening on any given day.  Our web
application needs to display the aggregated rental amount for any
given movie title.

I am planning to use Flink to aggregate rental amounts by movie_title
on the MovieRental DynamoDB stream and store the aggregated rental
amounts in another DynamoDB table named RentalAmountsByMovie:
{movie_title, total_rental_amount}

How do I ensure that RentalAmountsByMovie amounts are accurate. i.e.
How do I prevent results from any checkpoint from not updating the
RentalAmountsByMovie table records more than once?

1) Do I need to store checkpoint ids in the RentalAmountsByMovie table
and do conditional updates to handle the scenario described above?
2) I can possibly implement TwoPhaseCommitSinkFunction that talks to
DynamoDB. However, according to Flink documentation the commit
function can be called more than once and hence needs to be
idempotent. So even this solution requires checkpoint-ids to be stored
on the target store.
3) Another pattern seems to be storing the time-window aggregation
results in the RentalAmountsByMovie table. And the webapp will have to
compute the running total on the fly. I don't like this solution for
its latency implications to the webapp.
4) May be I can use Flink's Queryable state feature. However, that
feature seems to be in Beta:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/queryable_state.html

I imagine this is a very common aggregation use case. How do folks
usually handle **updating aggregated results in Flink external
sinks**?

I appreciate any pointers. Happy to provide more details if needed.

Thanks!


Re: Apache Flink - Flink Metrics collection using Prometheus on EMR from streaming mode

2019-12-25 Thread M Singh
 Thanks Vino and Rafi for your references.
Regarding push gateway recommendations for batch - I am following this 
reference (https://prometheus.io/docs/practices/pushing/).
The scenario that I have is that we start Flink Apps on EMR whenever we need 
them. Sometimes the task manager gets killed and then restarted on another 
node.  In order to keep up with registering new task/job managers and 
de-registering the stopped/removed ones, I wanted to see if there is any 
service discovery integration with Flink apps.  
Thanks again for your help and let me know if you have any additional pointers.
On Wednesday, December 25, 2019, 03:39:31 AM EST, Rafi Aroch 
 wrote:  
 
 Hi,
Take a look here: https://github.com/eastcirclek/flink-service-discovery
I used it successfully quite a while ago, so things might have changed since.
Thanks, Rafi 
On Wed, Dec 25, 2019, 05:54 vino yang  wrote:

Hi Mans,
IMO, the mechanism of metrics reporter does not depend on any deployment mode.
>> is there any Prometheus configuration or service discovery option available 
>>that will dynamically pick up the metrics from the Filnk job and task 
>>managers running in cluster ?
Can you share more information about your scene?
>> I believe for a batch job I can configure flink config to use Prometheus 
>>gateway configuration but I think this is not recommended for a streaming job.
What does this mean? Why the Prometheus gateway configuration for Flink batch 
job is not recommended for a streaming job?
Best,Vino
M Singh  于2019年12月24日周二 下午4:02写道:

Hi:
I wanted to find out what's the best way of collecting Flink metrics using 
Prometheus in a streaming application on EMR/Hadoop.
Since the Flink streaming jobs could be running on any node - is there any 
Prometheus configuration or service discovery option available that will 
dynamically pick up the metrics from the Filnk job and task managers running in 
cluster ?  
I believe for a batch job I can configure flink config to use Prometheus 
gateway configuration but I think this is not recommended for a streaming job.
Please let me know if you have any advice.
Thanks
Mans

  

Re: The assigned slot bae00218c818157649eb9e3c533b86af_11 was removed

2019-12-25 Thread jingjing bai
tm挂掉了,可以看下是否存在checkpoint连续失败导致OOM, 或者是大数据集大窗口运算,如果数据量大也会导致这个问题。

Xintong Song  于2019年12月25日周三 上午10:28写道:

> 这个应该不是root cause,slot was removed通常是tm挂掉了导致的,需要找下对应的tm日志看下挂掉的原因。
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Dec 24, 2019 at 10:06 PM hiliuxg <736742...@qq.com> wrote:
>
> > 偶尔发现,分配好的slot突然就被remove了,导致作业重启,看不出是什么原因导致?CPU和FULL GC都没有,异常信息如下:
> >
> > org.apache.flink.util.FlinkException: The assigned slot
> > bae00218c818157649eb9e3c533b86af_11 was removed.
> > at
> >
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893)
> > at
> >
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863)
> > at
> >
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058)
> > at
> >
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385)
> > at
> >
> org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:847)
> > at
> >
> org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run(ResourceManager.java:1161)
> > at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
> > at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
> > at
> >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> > at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
> > at
> >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> > at
> >
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> > at
> > akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> > at
> > akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> > at
> > akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> > at
> > akka.actor.ActorCell.invoke(ActorCell.scala:495)
> > at
> > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> > at
> akka.dispatch.Mailbox.run(Mailbox.scala:224)
> > at
> > akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> > at
> > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > at
> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>


Rewind offset to a previous position and ensure certainty.

2019-12-25 Thread 邢瑞斌
Hi,

I'm trying to use Kafka as an event store and I want to create several
partitions to improve read/write throughput. Occasionally I need to rewind
offset to a previous position for recomputing. Since order isn't guaranteed
among partitions in Kafka, does this mean that Flink won't produce the same
results as before when rewind even if it uses event time? For example,
consumer for a partition progresses extremely fast and raises watermark, so
events from other partitions are discarded. Is there any ways to prevent
this from happening?

Thanks in advance!

Ruibin


Rewind offset to a previous position and ensure certainty.

2019-12-25 Thread 邢瑞斌
Hi,

I'm trying to use Kafka as an event store and I want to create several
partitions to improve read/write throughput. Occasionally I need to rewind
offset to a previous position for recomputing. Since order isn't guaranteed
among partitions in Kafka, does this mean that Flink won't produce the same
results as before when rewind even if it uses event time? For example,
consumer for a partition progresses extremely fast and raises watermark, so
events from other partitions are discarded. Is there any ways to prevent
this from happening?

Thanks in advance!

Ruibin


testing - syncing data timeline

2019-12-25 Thread Avi Levi
 Hi ,

I have the following pipeline :
1. single hour window that counts the number of records
2. single day window that accepts the aggregated data from #1 and emits the
highest hour count of that day
3. union #1 + #2
4. Logic operator that accepts the data from #3 and keep a listState of #2
and apply some logic on #1 based on that state (e.g comparing a single hour
the history of the max hours at the last X days ) and emits the result

the timestamsAndWaterMarks is using BoundedOutOfOrdernessTimestampExtractor
(event-time)  and I allow lateness of 3 hours

 the problem is that when I try to do unit tests of all the pipeline, the
data from #1 rich #4 before the latter accepts the data from #3 hence it
doesn't have any state yet (state is always empty when the stream from #1
arrives ).
My source in the tests is a collection that represents the records.
 is there anyway I can solve this ?
[image: Screen Shot 2019-12-25 at 13.04.17.png]
I appreciate any help you can provide
Cheers
Avi


Re: flink 维表关联

2019-12-25 Thread xin Destiny
Hi,lucas.wu:
我个人觉得可以把join的条件和流对应的数据存放在mapstate中,每次维表的缓存更新数据之后,去mapstate中查询,如果存在对应的KV,将新关联后的数据下发;
不过这样state会占用很大的内存,需要主意state的清理

lucas.wu  于2019年12月25日周三 下午5:13写道:

> hi all:
> flink的kafka流表与hbase维表关联,维表后面有变动的话,如何将之前关联过的数据进行更新?


Re: question: jvm heap size per task?

2019-12-25 Thread Li Zhao
Understood, thank you for the quick response!

Thanks,
Li

Xintong Song  于2019年12月25日周三 下午5:05写道:

> Hi Li,
>
> It is true that currently all the task managers have the same heap size,
> and it's fixed ever since started. Unfortunately, your needs cannot be
> satisfied at the moment.
>
> Heap size of task managers cannot be changed once started, because flink
> task managers run in JVMs and JVM does not support resizing after started.
>
> However, there is an ongoing approach towards your needs. The community is
> working on fine-grained resource management, which in general allows
> specify per task/operator resource requirements and allocate task manager
> resources accordingly. You can refers to FLIP-53 [1] and FLIP-56 [2] for
> more details. Another related effort is pluggable slot manager [3], which
> allows having pluggable resource scheduling strategies such as launch task
> managers with customized resources according to the tasks' requirements.
>
> Thank you~
>
> Xintong Song
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
> [3] https://issues.apache.org/jira/browse/FLINK-14106
>
> On Wed, Dec 25, 2019 at 4:18 PM Li Zhao  wrote:
>
>> Hi,
>>
>> Greetings, hope this is the proper place to ask questions, apologize if
>> not.
>>
>> We have a shared flink cluster running with docker, want to set different
>> heap size per task(some tasks require larger heap size, while most tasks
>> only need a little), is it feasible?
>>
>> I've gone through [1], [2] and [3], my current understanding is all task
>> managers have the same heap size which is set by `taskmanager.heap.size`
>> and is fixed ever since the task manager is started, and all tasks running
>> in that task manager will share that heap size.
>>
>> Am I understanding it correctly? And any approaches to our needs?
>>
>> Thanks in advance!
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/cli.html
>> [2]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html
>> [3]:
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
>>
>> Thanks,
>> Li
>>
>


flink 维表关联

2019-12-25 Thread lucas.wu
hi all:
flink的kafka流表与hbase维表关联,维表后面有变动的话,如何将之前关联过的数据进行更新?

Re: question: jvm heap size per task?

2019-12-25 Thread Xintong Song
Hi Li,

It is true that currently all the task managers have the same heap size,
and it's fixed ever since started. Unfortunately, your needs cannot be
satisfied at the moment.

Heap size of task managers cannot be changed once started, because flink
task managers run in JVMs and JVM does not support resizing after started.

However, there is an ongoing approach towards your needs. The community is
working on fine-grained resource management, which in general allows
specify per task/operator resource requirements and allocate task manager
resources accordingly. You can refers to FLIP-53 [1] and FLIP-56 [2] for
more details. Another related effort is pluggable slot manager [3], which
allows having pluggable resource scheduling strategies such as launch task
managers with customized resources according to the tasks' requirements.

Thank you~

Xintong Song


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
[3] https://issues.apache.org/jira/browse/FLINK-14106

On Wed, Dec 25, 2019 at 4:18 PM Li Zhao  wrote:

> Hi,
>
> Greetings, hope this is the proper place to ask questions, apologize if
> not.
>
> We have a shared flink cluster running with docker, want to set different
> heap size per task(some tasks require larger heap size, while most tasks
> only need a little), is it feasible?
>
> I've gone through [1], [2] and [3], my current understanding is all task
> managers have the same heap size which is set by `taskmanager.heap.size`
> and is fixed ever since the task manager is started, and all tasks running
> in that task manager will share that heap size.
>
> Am I understanding it correctly? And any approaches to our needs?
>
> Thanks in advance!
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/cli.html
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html
> [3]:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
>
> Thanks,
> Li
>


Re: Apache Flink - Flink Metrics collection using Prometheus on EMR from streaming mode

2019-12-25 Thread Rafi Aroch
Hi,

Take a look here: https://github.com/eastcirclek/flink-service-discovery

I used it successfully quite a while ago, so things might have changed
since.

Thanks,
Rafi

On Wed, Dec 25, 2019, 05:54 vino yang  wrote:

> Hi Mans,
>
> IMO, the mechanism of metrics reporter does not depend on any deployment
> mode.
>
> >> is there any Prometheus configuration or service discovery option
> available that will dynamically pick up the metrics from the Filnk job and
> task managers running in cluster ?
>
> Can you share more information about your scene?
>
> >> I believe for a batch job I can configure flink config to use
> Prometheus gateway configuration but I think this is not recommended for a
> streaming job.
>
> What does this mean? Why the Prometheus gateway configuration for Flink
> batch job is not recommended for a streaming job?
>
> Best,
> Vino
>
> M Singh  于2019年12月24日周二 下午4:02写道:
>
>> Hi:
>>
>> I wanted to find out what's the best way of collecting Flink metrics
>> using Prometheus in a streaming application on EMR/Hadoop.
>>
>> Since the Flink streaming jobs could be running on any node - is there
>> any Prometheus configuration or service discovery option available that
>> will dynamically pick up the metrics from the Filnk job and task managers
>> running in cluster ?
>>
>> I believe for a batch job I can configure flink config to use Prometheus
>> gateway configuration but I think this is not recommended for a streaming
>> job.
>>
>> Please let me know if you have any advice.
>>
>> Thanks
>>
>> Mans
>>
>


question: jvm heap size per task?

2019-12-25 Thread Li Zhao
Hi,

Greetings, hope this is the proper place to ask questions, apologize if not.

We have a shared flink cluster running with docker, want to set different
heap size per task(some tasks require larger heap size, while most tasks
only need a little), is it feasible?

I've gone through [1], [2] and [3], my current understanding is all task
managers have the same heap size which is set by `taskmanager.heap.size`
and is fixed ever since the task manager is started, and all tasks running
in that task manager will share that heap size.

Am I understanding it correctly? And any approaches to our needs?

Thanks in advance!

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/cli.html
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html
[3]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors

Thanks,
Li


Flink实现Kafka到Mysql的 End-To-End Exactly-Once中遇到的问题

2019-12-25 Thread 卢伟楠
各位大佬好:

最近是实现Kafka到Mysql的 End-To-End Exactly-Once中遇到以下2个问题:
1:com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: 
Communications link failure during commit(). Transaction resolution unknown.
2:org.apache.flink.streaming.runtime.tasks.TimerException: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator

已经做了一个最简单的复现问题的demo,求指教
git clone https://github.com/lusecond/flink_help --depth=1

测试过程中,发现继承TwoPhaseCommitSinkFunction类的4个重写方法beginTransaction、preCommit、commit、abort
分别在不同的线程工作,怀疑过因为线程切换导致jdbc的事务提交出问题,已经做过相关测试排除不是由此引起的问题

关于 FLink historyserver没有completed-jobs的问题

2019-12-25 Thread 起子
大神们:
我启动了flink的historyserver,但是里面并没有已完成的任务
配置如下:

结果界面如下:

hdfs如下:

麻烦大神们给与指导

 部门 / 数据平台
 花名 / 起子
 Mobile :159 8810 1848
 WeChat :159 8810 1848
 Email :q...@dian.so
 Address :浙江省杭州市余杭区文一西路998号5#705