Migrate custom partitioner from Flink 1.7 to Flink 1.9
I am trying to migrate a custom dynamic partitioner from Flink 1.7 to Flink 1.9. The original partitioner implemented the `selectChannels` method within the `StreamPartitioner` interface like this: ```java // Original: working for Flink 1.7 //@Override public int[] selectChannels(SerializationDelegate> streamRecordSerializationDelegate, int numberOfOutputChannels) { T value = streamRecordSerializationDelegate.getInstance().getValue(); if (value.f0.isBroadCastPartitioning()) { // send to all channels int[] channels = new int[numberOfOutputChannels]; for (int i = 0; i < numberOfOutputChannels; ++i) { channels[i] = i; } return channels; } else if (value.f0.getPartitionKey() == -1) { // random partition returnChannels[0] = random.nextInt(numberOfOutputChannels); } else { returnChannels[0] = partitioner.partition(value.f0.getPartitionKey(), numberOfOutputChannels); } return returnChannels; } ``` I am not sure how to migrate this to Flink 1.9, since the `StreamPartitioner` interface has changed as illustrated below: ```java // New: required by Flink 1.9 @Override public int selectChannel(SerializationDelegate> streamRecordSerializationDelegate) { T value = streamRecordSerializationDelegate.getInstance().getValue(); if (value.f0.isBroadCastPartitioning()) { /* It is illegal to call this method for broadcast channel selectors and this method can remain not implemented in that case (for example by throwing UnsupportedOperationException). */ } else if (value.f0.getPartitionKey() == -1) { // random partition returnChannels[0] = random.nextInt(numberOfChannels); } else { returnChannels[0] = partitioner.partition(value.f0.getPartitionKey(), numberOfChannels); } //return returnChannels; return returnChannels[0]; } ``` Note that `selectChannels` has been replaced with `selectChannel`. So, it is no longer possible to return multiple output channels as originally done above for the case of broadcasted elements. As a matter of fact, `selectChannel` should not be invoked for this particular case. Any thoughts on how to tackle this? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Flink1.9批任务yn和ys对任务的影响
Hi faaron zheng, 如kurt所说,强烈建议使用1.10,现在已拉分支。 TM运行的一个经验值是:TM有10个Slot,TM内存10G:JVM堆内4G、1G网络buffer、manage内存5G(也就是说单个slot的manage内存500M)。 Best, Jingsong Lee -- From:Kurt Young Send Time:2019年12月26日(星期四) 14:07 To:user-zh Subject:Re: Flink1.9批任务yn和ys对任务的影响 也可以试下最新的1.10版本,这个版本里面 sql 的算子已经不再申请固定写死的内存数量, 而是根据当时 slot 能提供多少 managed 内存来自适应了。 Best, Kurt On Thu, Dec 26, 2019 at 1:36 PM Xintong Song wrote: > slot需要多少内存是和具体作业相关的,不同作业差别会比较大。 > > slot的资源需求是根据所有算子的资源需求相加得到的,如果你对你的作业用到了哪些算子比较了解的话,可以根据算子的资源需求推算出来。 > 算子的默认资源需求可以参考 [1],里面有五个“table.exec.resource.*”的配置项,也可以调整这些配置项来更改算子使用的内存。 > > 如果对作业使用到的算子不是很了解的话,那比较简单的办法还是直接提交作业试试看,去日志里面搜"Request slot with > profile"就能够看到slot的资源需求。 > > Thank you~ > > Xintong Song > > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html#execution-options > > On Thu, Dec 26, 2019 at 11:36 AM faaron zheng > wrote: > > > 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed > > memory为2g,也就是一个slot平均200m,所以任务没调度起来。 > > 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱: > > faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月26日 11:23,faaron zheng 写道: > > 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed > > memory为2g,也就是一个slot平均200m,所以任务没调度起来。 > > 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱: > > faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月25日 11:30,Xintong Song 写道: > > Hi faaron, Flink 1.9 中 -yn参数应该是不生效的,后续版本中已经删除了这个参数。 根据你的参数,在每个 TM > > 的内存为30G不变的情况下,每个 TM 中的slot个数(-ys)从5变成10,也就意味着平均每个slot占用的内存变为了原来的一半。 Flink > > 1.9 的sql batch 算子对 flink managed memory 是有确定的需求的,很可能是这个变化导致单个 slot > 的managed > > memory无法满足算子的资源需求了。 Thank you~ Xintong Song On Wed, Dec 25, 2019 at 11:09 > > AM faaron zheng wrote: > 跑tpcds的query1: flink > run > > -m yarn-cluster -d -p 100 -yn 20 -ys 5 -yjm 60g > -ytm 30g 任务可以正常执行 flink > > run -m yarn-cluster -d -p 100 -yn 10 -ys 10 -yjm > 60g -ytm 30g > > 任务在做hashjoin的时候就会失败 报错是No pooled slot available and request to > > > ResourceManager for new slot failed 搞不懂这有啥关系,求指教 faaron zheng 邮箱: > > > faaronzh...@gmail.com 签名由 网易邮箱大师 定制 >
Re: Flink 1.9 SQL Kafka Connector,Json format,how to deal with not json message?
Hi LakeShen, I'm sorry there is no such configuration for json format currently. I think it makes sense to add such configuration like 'format.ignore-parse-errors' in csv format. I created FLINK-15396[1] to track this. Best, Jark [1]: https://issues.apache.org/jira/browse/FLINK-15396 On Thu, 26 Dec 2019 at 11:44, LakeShen wrote: > Hi community,when I write the flink ddl sql like this: > > CREATE TABLE kafka_src ( > id varchar, > a varchar, > b TIMESTAMP, > c TIMESTAMP > ) > with ( >... > 'format.type' = 'json', > 'format.property-version' = '1', > 'format.derive-schema' = 'true', > 'update-mode' = 'append' > ); > > If the message is not the json format ,there is a error in the log。 > My question is that how to deal with the message which it not json format? > My thought is that I can catch the exception > in JsonRowDeserializationSchema deserialize() method,is there any > parameters to do this? > Thanks your replay. > >
Re: Flink 1.9 SQL Kafka Connector,Json format,how to deal with not json message?
Hi LakeShen, I'm sorry there is no such configuration for json format currently. I think it makes sense to add such configuration like 'format.ignore-parse-errors' in csv format. I created FLINK-15396[1] to track this. Best, Jark [1]: https://issues.apache.org/jira/browse/FLINK-15396 On Thu, 26 Dec 2019 at 11:44, LakeShen wrote: > Hi community,when I write the flink ddl sql like this: > > CREATE TABLE kafka_src ( > id varchar, > a varchar, > b TIMESTAMP, > c TIMESTAMP > ) > with ( >... > 'format.type' = 'json', > 'format.property-version' = '1', > 'format.derive-schema' = 'true', > 'update-mode' = 'append' > ); > > If the message is not the json format ,there is a error in the log。 > My question is that how to deal with the message which it not json format? > My thought is that I can catch the exception > in JsonRowDeserializationSchema deserialize() method,is there any > parameters to do this? > Thanks your replay. > >
Re: Flink1.9批任务yn和ys对任务的影响
也可以试下最新的1.10版本,这个版本里面 sql 的算子已经不再申请固定写死的内存数量, 而是根据当时 slot 能提供多少 managed 内存来自适应了。 Best, Kurt On Thu, Dec 26, 2019 at 1:36 PM Xintong Song wrote: > slot需要多少内存是和具体作业相关的,不同作业差别会比较大。 > > slot的资源需求是根据所有算子的资源需求相加得到的,如果你对你的作业用到了哪些算子比较了解的话,可以根据算子的资源需求推算出来。 > 算子的默认资源需求可以参考 [1],里面有五个“table.exec.resource.*”的配置项,也可以调整这些配置项来更改算子使用的内存。 > > 如果对作业使用到的算子不是很了解的话,那比较简单的办法还是直接提交作业试试看,去日志里面搜"Request slot with > profile"就能够看到slot的资源需求。 > > Thank you~ > > Xintong Song > > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html#execution-options > > On Thu, Dec 26, 2019 at 11:36 AM faaron zheng > wrote: > > > 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed > > memory为2g,也就是一个slot平均200m,所以任务没调度起来。 > > 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱: > > faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月26日 11:23,faaron zheng 写道: > > 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed > > memory为2g,也就是一个slot平均200m,所以任务没调度起来。 > > 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱: > > faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月25日 11:30,Xintong Song 写道: > > Hi faaron, Flink 1.9 中 -yn参数应该是不生效的,后续版本中已经删除了这个参数。 根据你的参数,在每个 TM > > 的内存为30G不变的情况下,每个 TM 中的slot个数(-ys)从5变成10,也就意味着平均每个slot占用的内存变为了原来的一半。 Flink > > 1.9 的sql batch 算子对 flink managed memory 是有确定的需求的,很可能是这个变化导致单个 slot > 的managed > > memory无法满足算子的资源需求了。 Thank you~ Xintong Song On Wed, Dec 25, 2019 at 11:09 > > AM faaron zheng wrote: > 跑tpcds的query1: flink > run > > -m yarn-cluster -d -p 100 -yn 20 -ys 5 -yjm 60g > -ytm 30g 任务可以正常执行 flink > > run -m yarn-cluster -d -p 100 -yn 10 -ys 10 -yjm > 60g -ytm 30g > > 任务在做hashjoin的时候就会失败 报错是No pooled slot available and request to > > > ResourceManager for new slot failed 搞不懂这有啥关系,求指教 faaron zheng 邮箱: > > > faaronzh...@gmail.com 签名由 网易邮箱大师 定制 >
Re: testing - syncing data timeline
Lets say you keep your #1, which does hourly counting, and emit result to the merged new #2. The new #2 would first keep all hourly result in state, and also keep tracking whether it already receive all 24 results belong to same day. Once you received all 24 count belong to the same day, you can start your logic. You could also determine what kind of data you want to keep in state after that. Best, Kurt On Thu, Dec 26, 2019 at 1:14 PM Avi Levi wrote: > not sure that I can see how it is simpler. #2 is time window per day it > emits the highest hour for that day. #4 is not a time window it keeps > history of several days . if I want to put the logic of #2 I will need to > manage it with timers, correct ? > > On Thu, Dec 26, 2019 at 6:28 AM Kurt Young wrote: > >> *This Message originated outside your organization.* >> -- >> Hi, >> >> You can merge the logic of #2 into #4, it will be much simpler. >> >> Best, >> Kurt >> >> >> On Wed, Dec 25, 2019 at 7:36 PM Avi Levi wrote: >> >>> Hi , >>> >>> I have the following pipeline : >>> 1. single hour window that counts the number of records >>> 2. single day window that accepts the aggregated data from #1 and emits >>> the highest hour count of that day >>> 3. union #1 + #2 >>> 4. Logic operator that accepts the data from #3 and keep a listState of >>> #2 and apply some logic on #1 based on that state (e.g comparing a single >>> hour the history of the max hours at the last X days ) and emits the result >>> >>> the timestamsAndWaterMarks is >>> using BoundedOutOfOrdernessTimestampExtractor (event-time) and I allow >>> lateness of 3 hours >>> >>> the problem is that when I try to do unit tests of all the pipeline, >>> the data from #1 rich #4 before the latter accepts the data from #3 hence >>> it doesn't have any state yet (state is always empty when the stream from >>> #1 arrives ). >>> My source in the tests is a collection that represents the records. >>> is there anyway I can solve this ? >>> [image: Screen Shot 2019-12-25 at 13.04.17.png] >>> I appreciate any help you can provide >>> Cheers >>> Avi >>> >>> >>>
Re: Flink1.9批任务yn和ys对任务的影响
slot需要多少内存是和具体作业相关的,不同作业差别会比较大。 slot的资源需求是根据所有算子的资源需求相加得到的,如果你对你的作业用到了哪些算子比较了解的话,可以根据算子的资源需求推算出来。 算子的默认资源需求可以参考 [1],里面有五个“table.exec.resource.*”的配置项,也可以调整这些配置项来更改算子使用的内存。 如果对作业使用到的算子不是很了解的话,那比较简单的办法还是直接提交作业试试看,去日志里面搜"Request slot with profile"就能够看到slot的资源需求。 Thank you~ Xintong Song [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html#execution-options On Thu, Dec 26, 2019 at 11:36 AM faaron zheng wrote: > 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed > memory为2g,也就是一个slot平均200m,所以任务没调度起来。 > 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱: > faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月26日 11:23,faaron zheng 写道: > 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed > memory为2g,也就是一个slot平均200m,所以任务没调度起来。 > 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱: > faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月25日 11:30,Xintong Song 写道: > Hi faaron, Flink 1.9 中 -yn参数应该是不生效的,后续版本中已经删除了这个参数。 根据你的参数,在每个 TM > 的内存为30G不变的情况下,每个 TM 中的slot个数(-ys)从5变成10,也就意味着平均每个slot占用的内存变为了原来的一半。 Flink > 1.9 的sql batch 算子对 flink managed memory 是有确定的需求的,很可能是这个变化导致单个 slot 的managed > memory无法满足算子的资源需求了。 Thank you~ Xintong Song On Wed, Dec 25, 2019 at 11:09 > AM faaron zheng wrote: > 跑tpcds的query1: flink run > -m yarn-cluster -d -p 100 -yn 20 -ys 5 -yjm 60g > -ytm 30g 任务可以正常执行 flink > run -m yarn-cluster -d -p 100 -yn 10 -ys 10 -yjm > 60g -ytm 30g > 任务在做hashjoin的时候就会失败 报错是No pooled slot available and request to > > ResourceManager for new slot failed 搞不懂这有啥关系,求指教 faaron zheng 邮箱: > > faaronzh...@gmail.com 签名由 网易邮箱大师 定制
Re: testing - syncing data timeline
not sure that I can see how it is simpler. #2 is time window per day it emits the highest hour for that day. #4 is not a time window it keeps history of several days . if I want to put the logic of #2 I will need to manage it with timers, correct ? On Thu, Dec 26, 2019 at 6:28 AM Kurt Young wrote: > *This Message originated outside your organization.* > -- > Hi, > > You can merge the logic of #2 into #4, it will be much simpler. > > Best, > Kurt > > > On Wed, Dec 25, 2019 at 7:36 PM Avi Levi wrote: > >> Hi , >> >> I have the following pipeline : >> 1. single hour window that counts the number of records >> 2. single day window that accepts the aggregated data from #1 and emits >> the highest hour count of that day >> 3. union #1 + #2 >> 4. Logic operator that accepts the data from #3 and keep a listState of >> #2 and apply some logic on #1 based on that state (e.g comparing a single >> hour the history of the max hours at the last X days ) and emits the result >> >> the timestamsAndWaterMarks is >> using BoundedOutOfOrdernessTimestampExtractor (event-time) and I allow >> lateness of 3 hours >> >> the problem is that when I try to do unit tests of all the pipeline, the >> data from #1 rich #4 before the latter accepts the data from #3 hence it >> doesn't have any state yet (state is always empty when the stream from #1 >> arrives ). >> My source in the tests is a collection that represents the records. >> is there anyway I can solve this ? >> [image: Screen Shot 2019-12-25 at 13.04.17.png] >> I appreciate any help you can provide >> Cheers >> Avi >> >> >>
Re: Aggregating Movie Rental information in a DynamoDB table using DynamoDB streams and Flink
Hi Joe, Your requirement is the effective exactly-once for external sink. I think your option 4 with TwoPhaseCommitSinkFunction is the right way to go. Unfortunately I am not quite familiar with this part, so can not give you specific suggestions for using it, especially for your concern of storing checkpoint id. After the holiday some guys with rich experienced with it can provide you more professional ideas I guess. :) ATM you can refer to the simple implementation TwoPhaseCommitSinkFunctionTest#ContentDumpSinkFunction and complex one FlinkKafkaProducer for more insights. In addition, the StreamingFileSink also implements the exactly-once for sink. You might also refer to it to get some insights if possible. Best, Zhijiang -- From:Joe Hansen Send Time:2019 Dec. 26 (Thu.) 01:42 To:user Subject:Aggregating Movie Rental information in a DynamoDB table using DynamoDB streams and Flink Happy Holidays everyone! tl;dr: I need to aggregate movie rental information that is being stored in one DynamoDB table and store running total of the aggregation in another table. How do I ensure exactly-once aggregation. I currently store movie rental information in a DynamoDB table named MovieRentals: {movie_title, rental_period_in_days, order_date, rent_amount} We have millions of movie rentals happening on any given day. Our web application needs to display the aggregated rental amount for any given movie title. I am planning to use Flink to aggregate rental amounts by movie_title on the MovieRental DynamoDB stream and store the aggregated rental amounts in another DynamoDB table named RentalAmountsByMovie: {movie_title, total_rental_amount} How do I ensure that RentalAmountsByMovie amounts are accurate. i.e. How do I prevent results from any checkpoint from not updating the RentalAmountsByMovie table records more than once? 1) Do I need to store checkpoint ids in the RentalAmountsByMovie table and do conditional updates to handle the scenario described above? 2) I can possibly implement TwoPhaseCommitSinkFunction that talks to DynamoDB. However, according to Flink documentation the commit function can be called more than once and hence needs to be idempotent. So even this solution requires checkpoint-ids to be stored on the target store. 3) Another pattern seems to be storing the time-window aggregation results in the RentalAmountsByMovie table. And the webapp will have to compute the running total on the fly. I don't like this solution for its latency implications to the webapp. 4) May be I can use Flink's Queryable state feature. However, that feature seems to be in Beta: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/queryable_state.html I imagine this is a very common aggregation use case. How do folks usually handle **updating aggregated results in Flink external sinks**? I appreciate any pointers. Happy to provide more details if needed. Thanks!
Re: testing - syncing data timeline
Hi, You can merge the logic of #2 into #4, it will be much simpler. Best, Kurt On Wed, Dec 25, 2019 at 7:36 PM Avi Levi wrote: > Hi , > > I have the following pipeline : > 1. single hour window that counts the number of records > 2. single day window that accepts the aggregated data from #1 and emits > the highest hour count of that day > 3. union #1 + #2 > 4. Logic operator that accepts the data from #3 and keep a listState of #2 > and apply some logic on #1 based on that state (e.g comparing a single hour > the history of the max hours at the last X days ) and emits the result > > the timestamsAndWaterMarks is > using BoundedOutOfOrdernessTimestampExtractor (event-time) and I allow > lateness of 3 hours > > the problem is that when I try to do unit tests of all the pipeline, the > data from #1 rich #4 before the latter accepts the data from #3 hence it > doesn't have any state yet (state is always empty when the stream from #1 > arrives ). > My source in the tests is a collection that represents the records. > is there anyway I can solve this ? > [image: Screen Shot 2019-12-25 at 13.04.17.png] > I appreciate any help you can provide > Cheers > Avi > > >
source并行度不同导致任务没有数据落地
hi all: 最近碰到一个很头疼的事情,两个任务相同的sql语句不同的source,任务的并行度为8,一个source是kafka一个source是rabbitmq,kafka和rabbitmq中加载相同的数据后,source为rabbitmq的任务有数据落地,source为kafka的任务运行好几次都不见有数据落地。因为sql中涉及到了窗口,所以考虑过kafka多partition对数据读取顺序的影响,将所有数据都加载到kafka的同一个partition中重启任务后发现还是没有数据落地。考虑到这两个任务唯一的不同点就是源为rabbitmq的任务source算子的并行度为1,所以将源为kafka的任务的source并行度也设为1,运行任务后发现有数据落地了。source并行度的改变应该只是改变了一下source与其它算子之间的数据传递方式,这种改变会对最终的结果造成影响吗?有没有大佬碰到过相同的问题? flink版本1.9.1 sql:select count(ps_comment) col1,ceil(stddev_pop(ps_availqty)) col2, tumble_start(over_time,interval '72' hour) col3, tumble_end(over_time,interval '72' hour) col4, ps_date from cirrostream_kafka_ck_source_03_8x3 where ps_availqty <= 489 and ps_supplycost > 998 and ps_comment not like '%ff%' and ps_partkey <= 3751122 or ps_suppkey = 723 group by ps_date,ps_availqty,tumble(over_time,interval '72' hour) having min(ps_partkey) not in (3525711,3738707,3740245) zhaorui_9...@163.com
Flink 1.9 SQL Kafka Connector,Json format,how to deal with not json message?
Hi community,when I write the flink ddl sql like this: CREATE TABLE kafka_src ( id varchar, a varchar, b TIMESTAMP, c TIMESTAMP ) with ( ... 'format.type' = 'json', 'format.property-version' = '1', 'format.derive-schema' = 'true', 'update-mode' = 'append' ); If the message is not the json format ,there is a error in the log。 My question is that how to deal with the message which it not json format? My thought is that I can catch the exception in JsonRowDeserializationSchema deserialize() method,is there any parameters to do this? Thanks your replay.
Flink 1.9 SQL Kafka Connector,Json format,how to deal with not json message?
Hi community,when I write the flink ddl sql like this: CREATE TABLE kafka_src ( id varchar, a varchar, b TIMESTAMP, c TIMESTAMP ) with ( ... 'format.type' = 'json', 'format.property-version' = '1', 'format.derive-schema' = 'true', 'update-mode' = 'append' ); If the message is not the json format ,there is a error in the log。 My question is that how to deal with the message which it not json format? My thought is that I can catch the exception in JsonRowDeserializationSchema deserialize() method,is there any parameters to do this? Thanks your replay.
Re: Flink实现Kafka到Mysql的 End-To-End Exactly-Once中遇到的问题
是否可以尝试使用幂等来解决 端到端的一致性 Best wishes, 沈磊 卢伟楠 于2019年12月25日周三 下午4:09写道: > 各位大佬好: > > 最近是实现Kafka到Mysql的 End-To-End Exactly-Once中遇到以下2个问题: > 1:com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: > Communications link failure during commit(). Transaction resolution unknown. > 2:org.apache.flink.streaming.runtime.tasks.TimerException: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > > 已经做了一个最简单的复现问题的demo,求指教 > git clone https://github.com/lusecond/flink_help --depth=1 > > > 测试过程中,发现继承TwoPhaseCommitSinkFunction类的4个重写方法beginTransaction、preCommit、commit、abort > 分别在不同的线程工作,怀疑过因为线程切换导致jdbc的事务提交出问题,已经做过相关测试排除不是由此引起的问题
回复:Flink1.9批任务yn和ys对任务的影响
感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed memory为2g,也就是一个slot平均200m,所以任务没调度起来。 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱:faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月26日 11:23,faaron zheng 写道: 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed memory为2g,也就是一个slot平均200m,所以任务没调度起来。 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱:faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月25日 11:30,Xintong Song 写道: Hi faaron, Flink 1.9 中 -yn参数应该是不生效的,后续版本中已经删除了这个参数。 根据你的参数,在每个 TM 的内存为30G不变的情况下,每个 TM 中的slot个数(-ys)从5变成10,也就意味着平均每个slot占用的内存变为了原来的一半。 Flink 1.9 的sql batch 算子对 flink managed memory 是有确定的需求的,很可能是这个变化导致单个 slot 的managed memory无法满足算子的资源需求了。 Thank you~ Xintong Song On Wed, Dec 25, 2019 at 11:09 AM faaron zheng wrote: > 跑tpcds的query1: flink run -m yarn-cluster -d -p 100 -yn 20 -ys 5 -yjm 60g > -ytm 30g 任务可以正常执行 flink run -m yarn-cluster -d -p 100 -yn 10 -ys 10 -yjm > 60g -ytm 30g 任务在做hashjoin的时候就会失败 报错是No pooled slot available and request to > ResourceManager for new slot failed 搞不懂这有啥关系,求指教 faaron zheng 邮箱: > faaronzh...@gmail.com 签名由 网易邮箱大师 定制
Re: Rewind offset to a previous position and ensure certainty.
If I understood correctly, different partitions of Kafka would be emitted by different source tasks with different watermark progress. And the Flink framework would align the different watermarks to only output the smallest watermark among them, so the events from slow partitions would not be discarded because the downstream operator would only see the watermark based on the slow partition atm. You can refer to [1] for some details. As for rewinding the offset of partition position, I guess it only happens in failure recovery case or you manually restart the job. Anyway all the topology tasks would be restarted and previous received watermarks are cleared. So it would also not discard the events in this case. Unless you can only rewind some source task to previous positions and keep other downstream tasks still running, it might have the issues you concern. But Flink can not support such operation/function atm. :) [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_timestamps_watermarks.html Best, Zhijiang -- From:邢瑞斌 Send Time:2019 Dec. 25 (Wed.) 20:27 To:user-zh ; user Subject:Rewind offset to a previous position and ensure certainty. Hi, I'm trying to use Kafka as an event store and I want to create several partitions to improve read/write throughput. Occasionally I need to rewind offset to a previous position for recomputing. Since order isn't guaranteed among partitions in Kafka, does this mean that Flink won't produce the same results as before when rewind even if it uses event time? For example, consumer for a partition progresses extremely fast and raises watermark, so events from other partitions are discarded. Is there any ways to prevent this from happening? Thanks in advance! Ruibin
Re: Rewind offset to a previous position and ensure certainty.
If I understood correctly, different partitions of Kafka would be emitted by different source tasks with different watermark progress. And the Flink framework would align the different watermarks to only output the smallest watermark among them, so the events from slow partitions would not be discarded because the downstream operator would only see the watermark based on the slow partition atm. You can refer to [1] for some details. As for rewinding the offset of partition position, I guess it only happens in failure recovery case or you manually restart the job. Anyway all the topology tasks would be restarted and previous received watermarks are cleared. So it would also not discard the events in this case. Unless you can only rewind some source task to previous positions and keep other downstream tasks still running, it might have the issues you concern. But Flink can not support such operation/function atm. :) [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_timestamps_watermarks.html Best, Zhijiang -- From:邢瑞斌 Send Time:2019 Dec. 25 (Wed.) 20:27 To:user-zh ; user Subject:Rewind offset to a previous position and ensure certainty. Hi, I'm trying to use Kafka as an event store and I want to create several partitions to improve read/write throughput. Occasionally I need to rewind offset to a previous position for recomputing. Since order isn't guaranteed among partitions in Kafka, does this mean that Flink won't produce the same results as before when rewind even if it uses event time? For example, consumer for a partition progresses extremely fast and raises watermark, so events from other partitions are discarded. Is there any ways to prevent this from happening? Thanks in advance! Ruibin
回复:Flink1.9批任务yn和ys对任务的影响
感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed memory为2g,也就是一个slot平均200m,所以任务没调度起来。 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱:faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月25日 11:30,Xintong Song 写道: Hi faaron, Flink 1.9 中 -yn参数应该是不生效的,后续版本中已经删除了这个参数。 根据你的参数,在每个 TM 的内存为30G不变的情况下,每个 TM 中的slot个数(-ys)从5变成10,也就意味着平均每个slot占用的内存变为了原来的一半。 Flink 1.9 的sql batch 算子对 flink managed memory 是有确定的需求的,很可能是这个变化导致单个 slot 的managed memory无法满足算子的资源需求了。 Thank you~ Xintong Song On Wed, Dec 25, 2019 at 11:09 AM faaron zheng wrote: > 跑tpcds的query1: flink run -m yarn-cluster -d -p 100 -yn 20 -ys 5 -yjm 60g > -ytm 30g 任务可以正常执行 flink run -m yarn-cluster -d -p 100 -yn 10 -ys 10 -yjm > 60g -ytm 30g 任务在做hashjoin的时候就会失败 报错是No pooled slot available and request to > ResourceManager for new slot failed 搞不懂这有啥关系,求指教 faaron zheng 邮箱: > faaronzh...@gmail.com 签名由 网易邮箱大师 定制
Re: 关于 FLink historyserver没有completed-jobs的问题
flink-conf.yaml里需要有这些配置 historyserver.web.port: 8082 historyserver.web.address: 0.0.0.0 historyserver.archive.fs.refresh-interval: 1 historyserver.archive.fs.dir: hdfs://127.0.0.1:8020/flink/v1.1/completed-jobs/ jobmanager.archive.fs.dir: hdfs://127.0.0.1:8020/flink/v1.1/completed-jobs/ #多少秒后,会将完成的任务提交到history jobstore.expiration-time: 14400 jobmanager.archive.fs.dir和historyserver.archive.fs.dir一样即可 然后启动bin/historyserver.sh start 访问ip:8082,需要跑一个任务,并且等待jobstore.expiration-time这个时间,才会有数据 发件人: 起子 发送时间: 2019-12-25 15:57 收件人: user-zh 主题: 关于 FLink historyserver没有completed-jobs的问题 大神们: 我启动了flink的historyserver,但是里面并没有已完成的任务 配置如下: 结果界面如下: hdfs如下: 麻烦大神们给与指导 部门 / 数据平台 花名 / 起子 Mobile :159 8810 1848 WeChat :159 8810 1848 Email :q...@dian.so Address :浙江省杭州市余杭区文一西路998号5#705
Re: flink 维表关联
Hi 李现 现实确实很难做到对流表进行全量的join,如需全量,state会占用很大的存储,而且后续迁移很困难。请问一下你说的这个方案可以举个例子吗? 原始邮件 发件人:李现stormallin2...@gmail.com 收件人:user-zhuser...@flink.apache.org 发送时间:2019年12月26日(周四) 08:44 主题:Re: flink 维表关联 流的大小应该不是无限制的,应该是有个窗口期?窗口期之外的数据离线处理? xin Destiny nj18652727...@gmail.com于2019年12月25日 周三18:13写道: Hi,lucas.wu: 我个人觉得可以把join的条件和流对应的数据存放在mapstate中,每次维表的缓存更新数据之后,去mapstate中查询,如果存在对应的KV,将新关联后的数据下发; 不过这样state会占用很大的内存,需要主意state的清理 lucas.wu lucas...@xiaoying.com 于2019年12月25日周三 下午5:13写道:hi all: flink的kafka流表与hbase维表关联,维表后面有变动的话,如何将之前关联过的数据进行更新?
Re: Rewind offset to a previous position and ensure certainty.
Hi Ruibin, Are you finding how to generate watermark pre Kafka partition? Flink provides Kafka-partition-aware watermark generation. [1] Best, Vino [1]: https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition 邢瑞斌 于2019年12月25日周三 下午8:27写道: > Hi, > > I'm trying to use Kafka as an event store and I want to create several > partitions to improve read/write throughput. Occasionally I need to rewind > offset to a previous position for recomputing. Since order isn't guaranteed > among partitions in Kafka, does this mean that Flink won't produce the same > results as before when rewind even if it uses event time? For example, > consumer for a partition progresses extremely fast and raises watermark, so > events from other partitions are discarded. Is there any ways to prevent > this from happening? > > Thanks in advance! > > Ruibin >
Re: Rewind offset to a previous position and ensure certainty.
Hi Ruibin, Are you finding how to generate watermark pre Kafka partition? Flink provides Kafka-partition-aware watermark generation. [1] Best, Vino [1]: https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition 邢瑞斌 于2019年12月25日周三 下午8:27写道: > Hi, > > I'm trying to use Kafka as an event store and I want to create several > partitions to improve read/write throughput. Occasionally I need to rewind > offset to a previous position for recomputing. Since order isn't guaranteed > among partitions in Kafka, does this mean that Flink won't produce the same > results as before when rewind even if it uses event time? For example, > consumer for a partition progresses extremely fast and raises watermark, so > events from other partitions are discarded. Is there any ways to prevent > this from happening? > > Thanks in advance! > > Ruibin >
回复: flink 维表关联
可以使用guava实现维表数据缓存在jvm,可以设置缓存数据有效期 | | 叶贤勋 | | yxx_c...@163.com | 签名由网易邮箱大师定制 在2019年12月26日 08:44,李现 写道: 流的大小应该不是无限制的,应该是有个窗口期?窗口期之外的数据离线处理? xin Destiny 于2019年12月25日 周三18:13写道: Hi,lucas.wu: 我个人觉得可以把join的条件和流对应的数据存放在mapstate中,每次维表的缓存更新数据之后,去mapstate中查询,如果存在对应的KV,将新关联后的数据下发; 不过这样state会占用很大的内存,需要主意state的清理 lucas.wu 于2019年12月25日周三 下午5:13写道: hi all: flink的kafka流表与hbase维表关联,维表后面有变动的话,如何将之前关联过的数据进行更新?
Re: flink 维表关联
流的大小应该不是无限制的,应该是有个窗口期?窗口期之外的数据离线处理? xin Destiny 于2019年12月25日 周三18:13写道: > Hi,lucas.wu: > > 我个人觉得可以把join的条件和流对应的数据存放在mapstate中,每次维表的缓存更新数据之后,去mapstate中查询,如果存在对应的KV,将新关联后的数据下发; > 不过这样state会占用很大的内存,需要主意state的清理 > > lucas.wu 于2019年12月25日周三 下午5:13写道: > > > hi all: > > flink的kafka流表与hbase维表关联,维表后面有变动的话,如何将之前关联过的数据进行更新? >
Aggregating Movie Rental information in a DynamoDB table using DynamoDB streams and Flink
Happy Holidays everyone! tl;dr: I need to aggregate movie rental information that is being stored in one DynamoDB table and store running total of the aggregation in another table. How do I ensure exactly-once aggregation. I currently store movie rental information in a DynamoDB table named MovieRentals: {movie_title, rental_period_in_days, order_date, rent_amount} We have millions of movie rentals happening on any given day. Our web application needs to display the aggregated rental amount for any given movie title. I am planning to use Flink to aggregate rental amounts by movie_title on the MovieRental DynamoDB stream and store the aggregated rental amounts in another DynamoDB table named RentalAmountsByMovie: {movie_title, total_rental_amount} How do I ensure that RentalAmountsByMovie amounts are accurate. i.e. How do I prevent results from any checkpoint from not updating the RentalAmountsByMovie table records more than once? 1) Do I need to store checkpoint ids in the RentalAmountsByMovie table and do conditional updates to handle the scenario described above? 2) I can possibly implement TwoPhaseCommitSinkFunction that talks to DynamoDB. However, according to Flink documentation the commit function can be called more than once and hence needs to be idempotent. So even this solution requires checkpoint-ids to be stored on the target store. 3) Another pattern seems to be storing the time-window aggregation results in the RentalAmountsByMovie table. And the webapp will have to compute the running total on the fly. I don't like this solution for its latency implications to the webapp. 4) May be I can use Flink's Queryable state feature. However, that feature seems to be in Beta: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/queryable_state.html I imagine this is a very common aggregation use case. How do folks usually handle **updating aggregated results in Flink external sinks**? I appreciate any pointers. Happy to provide more details if needed. Thanks!
Re: Apache Flink - Flink Metrics collection using Prometheus on EMR from streaming mode
Thanks Vino and Rafi for your references. Regarding push gateway recommendations for batch - I am following this reference (https://prometheus.io/docs/practices/pushing/). The scenario that I have is that we start Flink Apps on EMR whenever we need them. Sometimes the task manager gets killed and then restarted on another node. In order to keep up with registering new task/job managers and de-registering the stopped/removed ones, I wanted to see if there is any service discovery integration with Flink apps. Thanks again for your help and let me know if you have any additional pointers. On Wednesday, December 25, 2019, 03:39:31 AM EST, Rafi Aroch wrote: Hi, Take a look here: https://github.com/eastcirclek/flink-service-discovery I used it successfully quite a while ago, so things might have changed since. Thanks, Rafi On Wed, Dec 25, 2019, 05:54 vino yang wrote: Hi Mans, IMO, the mechanism of metrics reporter does not depend on any deployment mode. >> is there any Prometheus configuration or service discovery option available >>that will dynamically pick up the metrics from the Filnk job and task >>managers running in cluster ? Can you share more information about your scene? >> I believe for a batch job I can configure flink config to use Prometheus >>gateway configuration but I think this is not recommended for a streaming job. What does this mean? Why the Prometheus gateway configuration for Flink batch job is not recommended for a streaming job? Best,Vino M Singh 于2019年12月24日周二 下午4:02写道: Hi: I wanted to find out what's the best way of collecting Flink metrics using Prometheus in a streaming application on EMR/Hadoop. Since the Flink streaming jobs could be running on any node - is there any Prometheus configuration or service discovery option available that will dynamically pick up the metrics from the Filnk job and task managers running in cluster ? I believe for a batch job I can configure flink config to use Prometheus gateway configuration but I think this is not recommended for a streaming job. Please let me know if you have any advice. Thanks Mans
Re: The assigned slot bae00218c818157649eb9e3c533b86af_11 was removed
tm挂掉了,可以看下是否存在checkpoint连续失败导致OOM, 或者是大数据集大窗口运算,如果数据量大也会导致这个问题。 Xintong Song 于2019年12月25日周三 上午10:28写道: > 这个应该不是root cause,slot was removed通常是tm挂掉了导致的,需要找下对应的tm日志看下挂掉的原因。 > > Thank you~ > > Xintong Song > > > > On Tue, Dec 24, 2019 at 10:06 PM hiliuxg <736742...@qq.com> wrote: > > > 偶尔发现,分配好的slot突然就被remove了,导致作业重启,看不出是什么原因导致?CPU和FULL GC都没有,异常信息如下: > > > > org.apache.flink.util.FlinkException: The assigned slot > > bae00218c818157649eb9e3c533b86af_11 was removed. > > at > > > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893) > > at > > > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863) > > at > > > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058) > > at > > > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385) > > at > > > org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:847) > > at > > > org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run(ResourceManager.java:1161) > > at > > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392) > > at > > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185) > > at > > > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > > at > > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) > > at > > > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) > > at > > > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) > > at > > akka.actor.Actor$class.aroundReceive(Actor.scala:502) > > at > > akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > > at > > akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > > at > > akka.actor.ActorCell.invoke(ActorCell.scala:495) > > at > > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > > at > akka.dispatch.Mailbox.run(Mailbox.scala:224) > > at > > akka.dispatch.Mailbox.exec(Mailbox.scala:234) > > at > > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > at > > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > at > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > at > > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >
Rewind offset to a previous position and ensure certainty.
Hi, I'm trying to use Kafka as an event store and I want to create several partitions to improve read/write throughput. Occasionally I need to rewind offset to a previous position for recomputing. Since order isn't guaranteed among partitions in Kafka, does this mean that Flink won't produce the same results as before when rewind even if it uses event time? For example, consumer for a partition progresses extremely fast and raises watermark, so events from other partitions are discarded. Is there any ways to prevent this from happening? Thanks in advance! Ruibin
Rewind offset to a previous position and ensure certainty.
Hi, I'm trying to use Kafka as an event store and I want to create several partitions to improve read/write throughput. Occasionally I need to rewind offset to a previous position for recomputing. Since order isn't guaranteed among partitions in Kafka, does this mean that Flink won't produce the same results as before when rewind even if it uses event time? For example, consumer for a partition progresses extremely fast and raises watermark, so events from other partitions are discarded. Is there any ways to prevent this from happening? Thanks in advance! Ruibin
testing - syncing data timeline
Hi , I have the following pipeline : 1. single hour window that counts the number of records 2. single day window that accepts the aggregated data from #1 and emits the highest hour count of that day 3. union #1 + #2 4. Logic operator that accepts the data from #3 and keep a listState of #2 and apply some logic on #1 based on that state (e.g comparing a single hour the history of the max hours at the last X days ) and emits the result the timestamsAndWaterMarks is using BoundedOutOfOrdernessTimestampExtractor (event-time) and I allow lateness of 3 hours the problem is that when I try to do unit tests of all the pipeline, the data from #1 rich #4 before the latter accepts the data from #3 hence it doesn't have any state yet (state is always empty when the stream from #1 arrives ). My source in the tests is a collection that represents the records. is there anyway I can solve this ? [image: Screen Shot 2019-12-25 at 13.04.17.png] I appreciate any help you can provide Cheers Avi
Re: flink 维表关联
Hi,lucas.wu: 我个人觉得可以把join的条件和流对应的数据存放在mapstate中,每次维表的缓存更新数据之后,去mapstate中查询,如果存在对应的KV,将新关联后的数据下发; 不过这样state会占用很大的内存,需要主意state的清理 lucas.wu 于2019年12月25日周三 下午5:13写道: > hi all: > flink的kafka流表与hbase维表关联,维表后面有变动的话,如何将之前关联过的数据进行更新?
Re: question: jvm heap size per task?
Understood, thank you for the quick response! Thanks, Li Xintong Song 于2019年12月25日周三 下午5:05写道: > Hi Li, > > It is true that currently all the task managers have the same heap size, > and it's fixed ever since started. Unfortunately, your needs cannot be > satisfied at the moment. > > Heap size of task managers cannot be changed once started, because flink > task managers run in JVMs and JVM does not support resizing after started. > > However, there is an ongoing approach towards your needs. The community is > working on fine-grained resource management, which in general allows > specify per task/operator resource requirements and allocate task manager > resources accordingly. You can refers to FLIP-53 [1] and FLIP-56 [2] for > more details. Another related effort is pluggable slot manager [3], which > allows having pluggable resource scheduling strategies such as launch task > managers with customized resources according to the tasks' requirements. > > Thank you~ > > Xintong Song > > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management > [2] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation > [3] https://issues.apache.org/jira/browse/FLINK-14106 > > On Wed, Dec 25, 2019 at 4:18 PM Li Zhao wrote: > >> Hi, >> >> Greetings, hope this is the proper place to ask questions, apologize if >> not. >> >> We have a shared flink cluster running with docker, want to set different >> heap size per task(some tasks require larger heap size, while most tasks >> only need a little), is it feasible? >> >> I've gone through [1], [2] and [3], my current understanding is all task >> managers have the same heap size which is set by `taskmanager.heap.size` >> and is fixed ever since the task manager is started, and all tasks running >> in that task manager will share that heap size. >> >> Am I understanding it correctly? And any approaches to our needs? >> >> Thanks in advance! >> >> [1]: >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/cli.html >> [2]: >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html >> [3]: >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors >> >> Thanks, >> Li >> >
flink 维表关联
hi all: flink的kafka流表与hbase维表关联,维表后面有变动的话,如何将之前关联过的数据进行更新?
Re: question: jvm heap size per task?
Hi Li, It is true that currently all the task managers have the same heap size, and it's fixed ever since started. Unfortunately, your needs cannot be satisfied at the moment. Heap size of task managers cannot be changed once started, because flink task managers run in JVMs and JVM does not support resizing after started. However, there is an ongoing approach towards your needs. The community is working on fine-grained resource management, which in general allows specify per task/operator resource requirements and allocate task manager resources accordingly. You can refers to FLIP-53 [1] and FLIP-56 [2] for more details. Another related effort is pluggable slot manager [3], which allows having pluggable resource scheduling strategies such as launch task managers with customized resources according to the tasks' requirements. Thank you~ Xintong Song [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation [3] https://issues.apache.org/jira/browse/FLINK-14106 On Wed, Dec 25, 2019 at 4:18 PM Li Zhao wrote: > Hi, > > Greetings, hope this is the proper place to ask questions, apologize if > not. > > We have a shared flink cluster running with docker, want to set different > heap size per task(some tasks require larger heap size, while most tasks > only need a little), is it feasible? > > I've gone through [1], [2] and [3], my current understanding is all task > managers have the same heap size which is set by `taskmanager.heap.size` > and is fixed ever since the task manager is started, and all tasks running > in that task manager will share that heap size. > > Am I understanding it correctly? And any approaches to our needs? > > Thanks in advance! > > [1]: > https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/cli.html > [2]: > https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html > [3]: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors > > Thanks, > Li >
Re: Apache Flink - Flink Metrics collection using Prometheus on EMR from streaming mode
Hi, Take a look here: https://github.com/eastcirclek/flink-service-discovery I used it successfully quite a while ago, so things might have changed since. Thanks, Rafi On Wed, Dec 25, 2019, 05:54 vino yang wrote: > Hi Mans, > > IMO, the mechanism of metrics reporter does not depend on any deployment > mode. > > >> is there any Prometheus configuration or service discovery option > available that will dynamically pick up the metrics from the Filnk job and > task managers running in cluster ? > > Can you share more information about your scene? > > >> I believe for a batch job I can configure flink config to use > Prometheus gateway configuration but I think this is not recommended for a > streaming job. > > What does this mean? Why the Prometheus gateway configuration for Flink > batch job is not recommended for a streaming job? > > Best, > Vino > > M Singh 于2019年12月24日周二 下午4:02写道: > >> Hi: >> >> I wanted to find out what's the best way of collecting Flink metrics >> using Prometheus in a streaming application on EMR/Hadoop. >> >> Since the Flink streaming jobs could be running on any node - is there >> any Prometheus configuration or service discovery option available that >> will dynamically pick up the metrics from the Filnk job and task managers >> running in cluster ? >> >> I believe for a batch job I can configure flink config to use Prometheus >> gateway configuration but I think this is not recommended for a streaming >> job. >> >> Please let me know if you have any advice. >> >> Thanks >> >> Mans >> >
question: jvm heap size per task?
Hi, Greetings, hope this is the proper place to ask questions, apologize if not. We have a shared flink cluster running with docker, want to set different heap size per task(some tasks require larger heap size, while most tasks only need a little), is it feasible? I've gone through [1], [2] and [3], my current understanding is all task managers have the same heap size which is set by `taskmanager.heap.size` and is fixed ever since the task manager is started, and all tasks running in that task manager will share that heap size. Am I understanding it correctly? And any approaches to our needs? Thanks in advance! [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/cli.html [2]: https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html [3]: https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors Thanks, Li
Flink实现Kafka到Mysql的 End-To-End Exactly-Once中遇到的问题
各位大佬好: 最近是实现Kafka到Mysql的 End-To-End Exactly-Once中遇到以下2个问题: 1:com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Communications link failure during commit(). Transaction resolution unknown. 2:org.apache.flink.streaming.runtime.tasks.TimerException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator 已经做了一个最简单的复现问题的demo,求指教 git clone https://github.com/lusecond/flink_help --depth=1 测试过程中,发现继承TwoPhaseCommitSinkFunction类的4个重写方法beginTransaction、preCommit、commit、abort 分别在不同的线程工作,怀疑过因为线程切换导致jdbc的事务提交出问题,已经做过相关测试排除不是由此引起的问题
关于 FLink historyserver没有completed-jobs的问题
大神们: 我启动了flink的historyserver,但是里面并没有已完成的任务 配置如下: 结果界面如下: hdfs如下: 麻烦大神们给与指导 部门 / 数据平台 花名 / 起子 Mobile :159 8810 1848 WeChat :159 8810 1848 Email :q...@dian.so Address :浙江省杭州市余杭区文一西路998号5#705