Re: Write Streaming data to S3 in Parquet files

2021-09-27 Thread Guowei Ma
Hi,Harshvardhan

I think you could use some factory such as `ParquetAvroWriters.for`
form `ParquetAvroWriters.java` [1].
And you could see more same class in the package
`org.apache.flink.formats.parquet.`

[1]
https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/ParquetAvroWriters.java

Best,
Guowei


On Mon, Sep 27, 2021 at 2:36 AM Harshvardhan Shinde <
harshvardhan.shi...@oyorooms.com> wrote:

> Hi,
>
> Thanks for the response.
>
> How can this streaming data be written to S3 for the path to be given?
> Also I see that the FileSink takes GenericRecord, so how can the
> DataStream be converted to a GenericRecord?
>
> Please bear with me if my questions don't make any sense.
>
> On Sun, Sep 26, 2021 at 9:12 AM Guowei Ma  wrote:
>
>> Hi, Harshvardhan
>>
>> I think CaiZhi is right.
>> I only have a small addition. Because I see that you want to convert
>> Table to DataStream, you can look at FileSink (ParquetWriterFactory)[1].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/datastream/file_sink/#bulk-encoded-formats
>>
>> Best,
>> Guowei
>>
>>
>> On Sun, Sep 26, 2021 at 10:31 AM Caizhi Weng 
>> wrote:
>>
>>> Hi!
>>>
>>> Try the PARTITIONED BY clause. See
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/formats/parquet/
>>>
>>> Harshvardhan Shinde  于2021年9月24日周五
>>> 下午5:52写道:
>>>
 Hi,
 I wanted to know if we can write streaming data to S3 in parquet format
 with partitioning.
 Here's what I want to achieve:
 I have a kafka table which gets updated with the data from kafka topic
 and I'm using select statement to get the data into a Table and converting
 into a stream as:

 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
 Table table = tableEnv.sqlQuery("Select * from test");
 DataStream stream = tableEnv.toDataStream(table);

 Now I want to write this stream to S3 in parquet files with hourly
 partitions.

 Here are my questions:
 1. Is this possible?
 2. If yes, how it can be achieved or link to appropriate documentation.

 Thanks and Regards,
 Harshvardhan


>
> --
> Thanks and Regards,
> Harshvardhan
> Data Platform
>


AW: How to add Flink a Flink connector to stateful functions

2021-09-27 Thread Christian Krudewig (Corporate Development)
Hello Roman,

Well, if that's the way to do it, I can manage to maintain a fork of the 
statefun repo with these tiny changes. But first my question is if that is the 
way it should be done? Or if there is another way to activate these connectors.

Best,

Christian

-Ursprüngliche Nachricht-
Von: Roman Khachatryan  
Gesendet: Dienstag, 28. September 2021 00:31
An: Christian Krudewig (Corporate Development) ; 
Igal Shilman 
Cc: user@flink.apache.org
Betreff: Re: How to add Flink a Flink connector to stateful functions

Hi,

> Does that mean that I need to build the stateful functions java application 
> and afterwards the docker image?
Yes, you have to rebuild the application after updating the pom, as well as its 
docker image.

Is your concern related to synchronizing local docker images with the official 
repo?
If so, wouldn't using a specific statefun image version solve this issue?

Regards,
Roman

On Mon, Sep 27, 2021 at 9:29 PM Christian Krudewig (Corporate
Development)  wrote:
>
> Hello everyone,
>
>
>
> Currently I’m busy setting up a pipeline with Stateful Functions using a 
> deployment of the standard docker image “apache/flink-statefun” to 
> kubernetes. It has been going smoothly so far and I love the whole toolset. 
> But now I want to add Egress modules for both Opensearch (= ElasticSearch 
> protocol) and ScyllaDB (= Cassandra protocol). The documentation at 
> https://ci.apache.org/projects/flink/flink-statefun-docs-master/docs/io-module/flink-connectors/
>  indicates that I can somehow simply plug in the standard Flink datastream 
> connectors for ElasticSearch and Cassandra. But I didn’t get how exactly.
>
> It says “include the dependency in your pom”. Does that mean that I need to 
> build the stateful functions java application and afterwards the docker 
> image? That would be a bit unfortunate in terms of long-term maintenance 
> effort, because I would need to keep my local checkout in sync with the 
> official repositories and rebuild every now and then. Maybe this can also be 
> added on top of the existing docker image by adding some jar file to some 
> magic plugin folder?
>
>
>
> Sorry, I hope this doesn’t sound captious. I just want to understand and do 
> it the right way. Maybe there is also some minimal example? I didn’t find any 
> in the playground nor on stackoverflow or the mailing lists.
>
>
>
> Thanks,
>
>
>
> Christian Krudewig
>
>
>
>


smime.p7s
Description: S/MIME cryptographic signature


rpc invocation exceeds the maximum akka framesize

2021-09-27 Thread Deshpande, Omkar
Hello,

We run a lot of flink applications. Some of them sometimes run into this error 
on Job Manager-
The rpc invocation size exceeds the maximum akka framesize

After we increase the framesize the application starts working again.
What factors determine the akka framesize? We sometimes see applications run 
without this issue for months and then run into this error.
How can we determine the framesize before running into this error?

Thanks,
Omkar


退订

2021-09-27 Thread Gauler Tan
退订


回复: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node Deduplicate

2021-09-27 Thread Liu Join
Exception in thread "main" org.apache.flink.table.api.TableException: 
GroupWindowAggregate doesn't support consuming update and delete changes which 
is produced by node Deduplicate(keep=[FirstRow], key=[dnt, ordernum, t1, csq, 
num, type], order=[ROWTIME])
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:382)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:165)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:341)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:330)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:329)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:329)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:279)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:341)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:330)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:329)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:329)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:322)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:204)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:341)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:330)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:329)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:329)
at 

Re: flink-1.12.5 定义HIVDE DDL ,并且有comment,但是hive表里面没有体现出comment

2021-09-27 Thread Caizhi Weng
Hi!

这个问题已经在社区提出过了,可以在 https://issues.apache.org/jira/browse/FLINK-18958
这里追踪解决进度。

kcz <573693...@qq.com.invalid> 于2021年9月27日周一 上午11:21写道:

> hive版本3.1.0
> ddl如下:
> create table test_hive(
> id int comment 'test comment'
> ) PARTITIONED BY (dt STRING) STORED AS orc TBLPROPERTIES (
>  'partition.time-extractor.kind'='custom',
>  'partition.time-extractor.timestamp-pattern'='$dt',
> 
> 'partition.time-extractor.class'='com.hycan.bigdata.utils.MyPartTimeExtractor',
>  'sink.partition-commit.trigger'='partition-time',
>  'sink.partition-commit.delay'='1 d',
>  'sink.partition-commit.policy.kind'='metastore,success-file'
> );
>
> 实际使用hive desc formatted test_hive 没有看到comment


回复: flink能支持动态增加任务

2021-09-27 Thread yunying


好的好的,谢谢
| |
cuibeiyu2672
|
|
cuibeiyu2...@163.com
|
签名由网易邮箱大师定制
在2021年9月28日 10:16,Caizhi Weng 写道:
Hi!

如果你说的是对于固定的字段,每次需要过滤出来的值不一样,可以考虑维表 join。维表里保存的就是你需要过滤出来的值,这样每次只要更新维表即可。

如果你说的是每次要选择不同的字段,可能只能通过 udtf 来完成这个需求。udtf 里通过网络等方式访问外部资源来判断现在需要过滤的是哪些字段的哪些值。

yunying  于2021年9月28日周二 上午9:47写道:


flink消费一个kafka主题,比如里面有一个字段分为a,b,c,d..,现在有一个需求就是要过滤出字段是a的数据,进行后续操作。写完这个任务提交了过后,过段时间又需要过滤出字段b进行后续操作,后续的操作都是一样的,现在我就要为它在开发一个任务,在提交一次,数据量都不大。但是每提交一次都会耗费资源。以后说不定还会要过滤c,d,e有什么好办法解决这个问题吗


Unsubscribe

2021-09-27 Thread 475916258
Unsubscribe

Re: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node Deduplicate

2021-09-27 Thread Caizhi Weng
Hi!

你使用的是什么 Flink 版本呢?之前的 Flink 版本 window agg 只能消费 insert only 的数据,最新的 Flink
1.14 能够支持这样的询问。

lzy139...@outlook.com  于2021年9月27日周一 上午11:37写道:

> 使用ROW_NUMBER过滤数据后,进行开窗聚合计算报错


Re: flink能支持动态增加任务

2021-09-27 Thread Caizhi Weng
Hi!

如果你说的是对于固定的字段,每次需要过滤出来的值不一样,可以考虑维表 join。维表里保存的就是你需要过滤出来的值,这样每次只要更新维表即可。

如果你说的是每次要选择不同的字段,可能只能通过 udtf 来完成这个需求。udtf 里通过网络等方式访问外部资源来判断现在需要过滤的是哪些字段的哪些值。

yunying  于2021年9月28日周二 上午9:47写道:

>
> flink消费一个kafka主题,比如里面有一个字段分为a,b,c,d..,现在有一个需求就是要过滤出字段是a的数据,进行后续操作。写完这个任务提交了过后,过段时间又需要过滤出字段b进行后续操作,后续的操作都是一样的,现在我就要为它在开发一个任务,在提交一次,数据量都不大。但是每提交一次都会耗费资源。以后说不定还会要过滤c,d,e有什么好办法解决这个问题吗


Re: 退订

2021-09-27 Thread Leonard Xu
如果需要取消订阅 user-zh@flink.apache.org  
邮件组,请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 
  即可


> 在 2021年9月27日,14:43,rzy1107  写道:
> 
> 退订



Re: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node Deduplicate

2021-09-27 Thread Leonard Xu
hi, 报错详情可以在邮件贴下吗?


> 在 2021年9月27日,11:36,lzy139...@outlook.com 写道:
> 
> 使用ROW_NUMBER过滤数据后,进行开窗聚合计算报错



flink能支持动态增加任务

2021-09-27 Thread yunying
flink消费一个kafka主题,比如里面有一个字段分为a,b,c,d..,现在有一个需求就是要过滤出字段是a的数据,进行后续操作。写完这个任务提交了过后,过段时间又需要过滤出字段b进行后续操作,后续的操作都是一样的,现在我就要为它在开发一个任务,在提交一次,数据量都不大。但是每提交一次都会耗费资源。以后说不定还会要过滤c,d,e有什么好办法解决这个问题吗

Re: How to add Flink a Flink connector to stateful functions

2021-09-27 Thread Roman Khachatryan
Hi,

> Does that mean that I need to build the stateful functions java application 
> and afterwards the docker image?
Yes, you have to rebuild the application after updating the pom, as
well as its docker image.

Is your concern related to synchronizing local docker images with the
official repo?
If so, wouldn't using a specific statefun image version solve this issue?

Regards,
Roman

On Mon, Sep 27, 2021 at 9:29 PM Christian Krudewig (Corporate
Development)  wrote:
>
> Hello everyone,
>
>
>
> Currently I’m busy setting up a pipeline with Stateful Functions using a 
> deployment of the standard docker image “apache/flink-statefun” to 
> kubernetes. It has been going smoothly so far and I love the whole toolset. 
> But now I want to add Egress modules for both Opensearch (= ElasticSearch 
> protocol) and ScyllaDB (= Cassandra protocol). The documentation at 
> https://ci.apache.org/projects/flink/flink-statefun-docs-master/docs/io-module/flink-connectors/
>  indicates that I can somehow simply plug in the standard Flink datastream 
> connectors for ElasticSearch and Cassandra. But I didn’t get how exactly.
>
> It says “include the dependency in your pom”. Does that mean that I need to 
> build the stateful functions java application and afterwards the docker 
> image? That would be a bit unfortunate in terms of long-term maintenance 
> effort, because I would need to keep my local checkout in sync with the 
> official repositories and rebuild every now and then. Maybe this can also be 
> added on top of the existing docker image by adding some jar file to some 
> magic plugin folder?
>
>
>
> Sorry, I hope this doesn’t sound captious. I just want to understand and do 
> it the right way. Maybe there is also some minimal example? I didn’t find any 
> in the playground nor on stackoverflow or the mailing lists.
>
>
>
> Thanks,
>
>
>
> Christian Krudewig
>
>
>
>


How to add Flink a Flink connector to stateful functions

2021-09-27 Thread Christian Krudewig (Corporate Development)
Hello everyone,

 

Currently I'm busy setting up a pipeline with Stateful Functions using a
deployment of the standard docker image "apache/flink-statefun" to
kubernetes. It has been going smoothly so far and I love the whole toolset.
But now I want to add Egress modules for both Opensearch (= ElasticSearch
protocol) and ScyllaDB (= Cassandra protocol). The documentation at
https://ci.apache.org/projects/flink/flink-statefun-docs-master/docs/io-modu
le/flink-connectors/ indicates that I can somehow simply plug in the
standard Flink datastream connectors for ElasticSearch and Cassandra. But I
didn't get how exactly.

It says "include the dependency in your pom". Does that mean that I need to
build the stateful functions java application and afterwards the docker
image? That would be a bit unfortunate in terms of long-term maintenance
effort, because I would need to keep my local checkout in sync with the
official repositories and rebuild every now and then. Maybe this can also be
added on top of the existing docker image by adding some jar file to some
magic plugin folder?

 

Sorry, I hope this doesn't sound captious. I just want to understand and do
it the right way. Maybe there is also some minimal example? I didn't find
any in the playground nor on stackoverflow or the mailing lists.

 

Thanks,

 

Christian Krudewig

 

 



smime.p7s
Description: S/MIME cryptographic signature


Re: Potential bug when assuming roles from AWS EKS when using S3 as RocksDb checkpoint backend?

2021-09-27 Thread Rommel Holmes
Hi, Ingo

I was looking into the aws dependeencies, and from
https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts-minimum-sdk.html
the minimum required version to use the feature is 1.11.704.

So 1.11.788 should be sufficient? Can you point it to me where it says that
1.11.951 is the minimum requirement?

Thank you.

Rommel

On Sun, Sep 26, 2021 at 12:34 AM Ingo Bürk  wrote:

> Hi Thomas,
>
> I haven't encountered that before, sorry. I assume you're still using
> Flink 1.12? The PR I linked to updated the AWS dependencies to the minimum
> required versions to use this feature, so I'm not sure just setting the
> credentials provider alone would be sufficient. The PR was currently only
> merged for the upcoming 1.14 release, but perhaps you could try the current
> release candidate to see if it works with that? If that works we could also
> think about backporting this change, we just initially didn't do that since
> upgrading those dependencies has a certain operational risk and we want to
> wait for user feedback first.
>
>
> Best
> Ingo
>
> On Sun, Sep 26, 2021 at 8:12 AM Thomas Wang  wrote:
>
>> Ingo,
>>
>> I dig into the Flink code a little bit. It looks like the key for
>> specifying the roleArn and roleSessionName are
>> fs.s3a.aws.credentials.provider:
>> com.amazonaws.auth.WebIdentityTokenCredentialsProvider
>> fs.s3a.aws.credentials.provider.role.arn: arn:aws:iam::...:role/...
>> fs.s3a.aws.credentials.provider.role.sessionName: ...
>>
>> However, for some reason, I'm still getting the same error. Please help!
>> Thanks.
>>
>> Thomas
>>
>>
>> On Sat, Sep 25, 2021 at 9:36 PM Thomas Wang  wrote:
>>
>>> Ingo,
>>>
>>> It looks like I'm now seeing "Caused by: java.lang.NullPointerException:
>>> You must specify a value for roleArn and roleSessionName". I assume I would
>>> also need to specify that through the configuration file. Could you suggest
>>> the key for this configuration? Thanks.
>>>
>>> Thomas
>>>
>>> On Sat, Sep 25, 2021 at 7:25 PM Thomas Wang  wrote:
>>>
 Thanks Ingo. Adding the following setting worked.

 fs.s3a.aws.credentials.provider:
 com.amazonaws.auth.WebIdentityTokenCredentialsProvider

 Thomas

 On Sat, Sep 25, 2021 at 1:12 PM Ingo Bürk  wrote:

> Hi Thomas,
>
> I think you might be looking for this:
> https://github.com/apache/flink/pull/16717
>
>
> Best
> Ingo
>
> On Sat, Sep 25, 2021, 20:46 Thomas Wang  wrote:
>
>> Hi,
>>
>> I'm using the official docker image:
>> apache/flink:1.12.1-scala_2.11-java11
>>
>> I'm trying to run a Flink job on an EKS cluster. The job is running
>> under a k8s service account that is tied to an IAM role. If I'm not using
>> s3 as RocksDB checkpoint backend, everything works just fine. However, 
>> when
>> I enabled s3 as RocksDB checkpoint backend, I got permission denied.
>>
>> The IAM role tied to the service account has the appropriate
>> permissions to s3. However the underlying role tied to the EKS node
>> doesn't. After debugging with AWS support, it looks like the request to 
>> s3
>> was made under the EKS node role, not the role tied to the service 
>> account.
>> Thus the permission denial.
>>
>> With the same Flink application, I'm also making requests to AWS
>> Secrets Manager to get some sensitive information and those requests were
>> made explicitly with AWS Java SDK 2.x bundled in the same application Jar
>> file. Those requests were made correctly with the IAM role tied to the
>> service account.
>>
>> Based on the info above, I suspect Flink may be using an older
>> version of the AWS SDK that doesn't support assuming an IAM role via an
>> IODC web identity token file. Please see AWS doc here:
>> https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts-minimum-sdk.html
>>
>> Could someone help me confirm this bug and maybe have it fixed some
>> time? Thanks.
>>
>> Thomas
>>
>

-- 
 Yours
 Rommel
*
  I  waited patiently for the LORD;
   he turned to me and heard my cry.
 He lifted me out of the slimy pit,
   out of the mud and mire;
he set my feet on a rock
   and gave me a firm place to stand. *


Unable to connect to Mesos on mesos-appmaster.sh start

2021-09-27 Thread Javier Vegas
I am trying to start Flink 1.13.2 on Mesos following the instrucions in
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/mesos/
and using Marathon to deploy a Docker image with both the Flink and my
binaries.

My entrypoint for the Docker image is:


/opt/flink/bin/mesos-appmaster.sh \

  -Djobmanager.rpc.address=$HOSTNAME \

  -Dmesos.resourcemanager.framework.user=flink \

  -Dmesos.master=10.0.18.246:5050 \

  -Dmesos.resourcemanager.tasks.cpus=6



When mesos-appmaster.sh starts, in the stderr I see this:


I0927 16:50:32.306691 801308 exec.cpp:164] Version: 1.7.3

I0927 16:50:32.310277 801345 exec.cpp:238] Executor registered on agent
f671d9ee-57f6-4f92-b1b2-3137676f6cdf-S6090

I0927 16:50:32.311120 801355 executor.cpp:130] Registered docker executor
on 10.0.20.177

I0927 16:50:32.311394 801345 executor.cpp:186] Starting task
tl_flink_prod.fb215c64-1fb2-11ec-9ce6-aaa2e9cb6ba0

WARNING: Your kernel does not support swap limit capabilities or the cgroup
is not mounted. Memory limited without swap.

WARNING: An illegal reflective access operation has occurred

WARNING: Illegal reflective access by
org.apache.hadoop.security.authentication.util.KerberosUtil
(file:/opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar) to method
sun.security.krb5.Config.getInstance()

WARNING: Please consider reporting this to the maintainers of
org.apache.hadoop.security.authentication.util.KerberosUtil

WARNING: Use --illegal-access=warn to enable warnings of further illegal
reflective access operations

WARNING: All illegal access operations will be denied in a future release

I0927 16:50:43.622053   237 sched.cpp:232] Version: 1.7.3

I0927 16:50:43.624439   328 sched.cpp:336] New master detected at
master@10.0.18.246:5050

I0927 16:50:43.624779   328 sched.cpp:356] No credentials provided.
Attempting to register without authentication


where the "New master detected" line is promising.

However, on the Flink UI I see only the jobmanager started, and there are
no task managers.  Getting into the Docker container, I see this in the log:

WARN  org.apache.flink.mesos.scheduler.ConnectionMonitor  - Unable to
connect to Mesos; still trying...


I have verified that from the container I can access the Mesos container
10.0.18.246:5050


Does any other port besides the web UI port 5050 need to be open for
mesos-appmaster to connect with the Mesos master?


In the appmaster log (attached) I see one exception that I don't know if
they are related to the Mesos connection problem, one is


java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.

at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:448)

at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:419)

at org.apache.hadoop.util.Shell.(Shell.java:496)

at org.apache.hadoop.util.StringUtils.(StringUtils.java:79)

at
org.apache.hadoop.conf.Configuration.getBoolean(Configuration.java:1555)

at
org.apache.hadoop.security.SecurityUtil.getLogSlowLookupsEnabled(SecurityUtil.java:497)

at
org.apache.hadoop.security.SecurityUtil.(SecurityUtil.java:90)

at
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:289)

at
org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:277)

at
org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:833)

at
org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:803)

at
org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:676)

at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)

at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
Source)

at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source)

at java.base/java.lang.reflect.Method.invoke(Unknown Source)

at
org.apache.flink.runtime.util.EnvironmentInformation.getHadoopUser(EnvironmentInformation.java:215)

at
org.apache.flink.runtime.util.EnvironmentInformation.logEnvironmentInfo(EnvironmentInformation.java:432)

at
org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint.main(MesosSessionClusterEntrypoint.java:95)




I am not trying (yet) to run in high availability mode, so I am not sure if
I need to have HADOOP_HOME set or not, but I don't see anything about
HADOOP_HOME in the FLink docs.



Any tips on how I can fix my Docker+Marathon+Mesos environment so Flink can
connect to my Mesos master?


Thanks,


Javier Vegas


flink--mesos-appmaster-6c49aa87e1d4.log
Description: Binary data


Re: could not stop with a Savepoint.

2021-09-27 Thread Roman Khachatryan
Hi,

The above exception may be caused by both savepoint timing out and job
termination timing out.
To distinguish between these two cases, could you please check the
status of the savepoint and the tasks in the Flink Web UI? IIUC, after
you get this exception on client, you still have the job running.
Could you also check if there are any exceptions in "Exceptions
history" or in the logs?

Regards,
Roman

On Mon, Sep 27, 2021 at 6:49 AM Marco Villalobos
 wrote:
>
> Today, I kept on receiving a timeout exception when stopping my job with a 
> savepoint.
> This happened with Flink version 1.12.2 running in EMR.
>
> I had to use the deprecated cancel with savepoint feature instead.
>
> In fact, stopping with a savepoint, creating a savepoint, and cancelling with 
> a savepoint all gave me the timeout exception.
>
> However, the cancel with savepoint started creating a savepoint on the 
> cluster.
>
> The program finished with the following exception:
>
> org.apache.flink.util.FlinkException: Could not stop with a savepoint job 
> "5d6100984035db9541e9f08ecbd311bf".
> at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:585)
> at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1006)
> at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:573)
> at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1073)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1136)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
> at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136)
> Caused by: java.util.concurrent.TimeoutException
> at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
> at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:583)
> ... 9 more
>
>
>


Re: Flink Performance Issue

2021-09-27 Thread Arvid Heise
Hi Kamaal,

I did a quick test with a local Kafka in docker. With parallelism 1, I can
process 20k messages of size 4KB in about 1 min. So if you use parallelism
of 15, I'd expect it to take it below 10s even with bigger data skew.

What I recommend you to do is to start from scratch and just work with a
simple source -> sink. That should be much much faster. If so, then you can
add complexity until you find the bottleneck.

If not, I suspect your ObjectNodeJsonDeSerializerSchema to be the issue.
For example, are you creating an ObjectMapper with each invocation? That's
a typical mistake.

Best,

Arvid

On Mon, Sep 27, 2021 at 2:38 PM Mohammed Kamaal 
wrote:

> Hi Robert,
>
> I have removed all the business logic (keyBy and window) operator code and
> just had a source and sink to test it.
> The throughput is 20K messages in 2 minutes. It is a simple read from
> source (kafka topic) and write to sink (kafka topic). Don't you think 2
> minutes is also not a better throughput for a simple read/write
> application?. Each message is 4 KB.
>
> As I had mentioned in the previous email(s), I am using keyBy() and
> Window() to handle business logic. Do you think these operators would have
> a huge impact on the performance?. Or is it something to do with my Kafka
> cluster configuration or the older version of flink (1.8) that I am using
> in my application. Not sure if flink version 1.8 has a performance issue.
>
> Please let me know.
> Below is my kafka cluster configuration.
>
> auto.create.topics.enable=true
> log.retention.hours=24
> default.replication.factor=3
> min.insync.replicas=2
> num.io.threads=45
> num.network.threads=60
> num.partitions=45
> num.replica.fetchers=2
> unclean.leader.election.enable=true
> replica.lag.time.max.ms=3
> zookeeper.session.timeout.ms=18000
> log.retention.ms=17280
> log.cleanup.policy=delete
> group.max.session.timeout.ms=120
>
>
>
> Thanks
>
> On Wed, Sep 22, 2021 at 9:06 PM Robert Metzger 
> wrote:
>
>> Hi Kamaal,
>>
>> I would first suggest understanding the performance bottleneck, before
>> applying any optimizations.
>>
>> Idea 1: Are your CPUs fully utilized?
>> if yes, good, then scaling up will probably help
>> If not, then there's another inefficiency
>>
>> Idea 2: How fast can you get the data into your job, without any
>> processing?
>> You can measure this by submitting a simple Flink job that just reads the
>> data and writes it to a discarding sink. Either disable the operator
>> chaining to get metrics for the records per second, or add a custom mapper
>> in between that measures the throughput.
>> Ideally you see here that you can read all your data in a few seconds, if
>> not, then there's a problem getting your data in.
>>
>> Idea 3: is your IO fully utilized ?( if you are checkpointing to RocksDB,
>> the disk can dramatically slow you down)
>> Idea 4: Are you under high memory pressure, and your JVMs are spending
>> most of their cycles garbage collecting?
>>
>> My bet is you are not getting data into your cluster as fast as you think
>> (Idea 2)
>>
>>
>> On Wed, Sep 22, 2021 at 12:05 PM Mohammed Kamaal <
>> mohammed.kamaa...@gmail.com> wrote:
>>
>>> Hi Arvid,
>>>
>>> The throughput has decreased further after I removed all the
>>> rebalance(). The performance has decreased from 14 minutes for 20K messages
>>> to 20 minutes for 20K messages.
>>>
>>> Below are the tasks that the flink application is performing. I am using
>>> keyBy and Window operation. Do you think am I making any mistake here or
>>> the way I am performing the keyBy or Window operation needs to be
>>> corrected?.
>>>
>>> //Add Source
>>> StreamExecutionEnvironment streamenv =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> initialStreamData = streamenv.addSource(new
>>> FlinkKafkaConsumer<>(topicsProperties.getProperty(Common.CGM_STREAM_TOPIC),
>>> new *ObjectNodeJsonDeSerializerSchema()*,
>>> kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);
>>>
>>> DataStream cgmStreamData = initialStreamData.keyBy(value ->
>>> value.findValue("PERSON_ID").asText())
>>> .flatMap(new *SgStreamingTask()*).setParallelism(Common.FORTY_FIVE);
>>>
>>> DataStream artfctOverlapStream = cgmStreamData.keyBy(new
>>> CGMKeySelector()).countWindow(2, 1)
>>> .apply(new *ArtifactOverlapProvider()*
>>> ).setParallelism(Common.FORTY_FIVE).rebalance();
>>>
>>> DataStream streamWithSgRoc = artfctOverlapStream.keyBy(new
>>> CGMKeySelector()).countWindow(7, 1)
>>> .apply(new *SgRocProvider()*
>>> ).setParallelism(Common.FORTY_FIVE).rebalance();
>>>
>>> DataStream cgmExcursionStream =
>>> streamWithSgRoc.keyBy(new CGMKeySelector())
>>> .countWindow(Common.THREE, Common.ONE).apply(new
>>> *CGMExcursionProviderStream()*
>>> ).setParallelism(Common.FORTY_FIVE).rebalance();
>>>
>>> //Add Sink
>>> cgmExcursionStream.addSink(new FlinkKafkaProducer(
>>> topicsProperties.getProperty(Common.CGM_EVENT_TOPIC), new
>>> CGMDataCollectorSchema(),
>>> 

Re: How do I verify data is written to a JDBC sink?

2021-09-27 Thread Roman Khachatryan
Hi,

Do I understand correctly, that long checkpointing times are caused by
slow queries to the database?
If so, async querying might resolve the issue on Flink side, but the
unnecessary load on DB will remain.

Instead, maybe you can use CDC to stream DB changes and send messages
to RabbitMQ when necessary [1][2]?

Another option is to implement a custom JDBC writing Function (using
JdbcOutputFormat) that would send a message once the relevant write
succeeds (it doesn't have to be a sink function).
This can be achieved by overriding JdbcOutputFormat.attemptFlush method [3]

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/debezium/
[2] https://github.com/ververica/flink-cdc-connectors
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java//org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.html#attemptFlush--

Regards,
Roman

On Mon, Sep 27, 2021 at 7:44 AM Marco Villalobos
 wrote:
>
>
> In my Flink Job, I am using event time to process time-series data.
>
> Due to our business requirements, I need to verify that a specific subset of 
> data written to a JDBC sink has been written before I send an activemq 
> message to another component.
>
> My job flows like this:
>
> 1. Kafka Source
> 2. Split source message by flat map
> 3. Aggregate messages in a 15 minute window. (Keyed by rounding timestamp up 
> to nearest quarter, and name, note that there are 120,000 names.
> 5. Insert, forward-fill, or back-fill time-series data (keyed by name, again 
> there are 120,000 names).  Forward fills are done through an event time 
> timer. Collect range of data processed in a side output.
> 6. In a window function determine when time-series (rounded up to nearest 
> quarter) aligned to the same quarter.
> 7. Verify that a subset of aligned time series are already written to the 
> database (keyed by name, and there are 120,000 of them) and collect an active 
> mq message when that happens.
>
> I could not find a good way to verify data was written to the database, so I 
> introduced a Keyed Process Function with a timer that creates a JDBC 
> connection and then polls the database to verify it has been written. If the 
> first attempt fails, it then uses a processing time timer to check a minute 
> later.  Please keep in mind that there are 12 keys, but only about 1000 
> records need this database verification.
>
> This approach caused checkpoint times to take 2-4 hours. Previously, 
> checkpoint times were only a few seconds.
>
> I am experimenting with using an RichAsyncFunction, and the R2DBC Postgres 
> async driver instead.  My R2DBC async code also has a 1 minute timer in it. 
> So far this async approach fails.
>
> I feel as though I am at a crossroads. These are my options:
>
> 1. Continue to tune checkpoints to work with my blocking JDBC calls used in a 
> Keyed Process function that polls for database writes.
>
> Or
>
> 2. Experiment more writing code that uses a rich async function that does the 
> verification.
>
> 3. Write the data that needs to be verified to another kafka queue and have 
> another Flink Job do the verification. It would only need to do that with 
> 1000 records approximately every 15 minutes.
>
> Does anybody else have other ideas I can use to solve this?


Re: Flink Performance Issue

2021-09-27 Thread Mohammed Kamaal
Hi Robert,

I have removed all the business logic (keyBy and window) operator code and just 
had a source and sink to test it.
The throughput is 20K messages in 2 minutes. It is a simple read from source 
(kafka topic) and write to sink (kafka topic). Don't you think 2 minutes is 
also not a better throughput for a simple read/write application?. Each message 
is 4 KB.

As I had mentioned in the previous email(s), I am using keyBy() and Window() to 
handle business logic. Do you think these operators would have a huge impact on 
the performance?. Or is it something to do with my Kafka cluster configuration 
or the older version of flink (1.8) that I am using in my application. Not sure 
if flink version 1.8 has a performance issue.

Please let me know.
Below is my kafka cluster configuration.

auto.create.topics.enable=true
log.retention.hours=24
default.replication.factor=3
min.insync.replicas=2
num.io.threads=45
num.network.threads=60
num.partitions=45
num.replica.fetchers=2
unclean.leader.election.enable=true
replica.lag.time.max.ms=3
zookeeper.session.timeout.ms=18000
log.retention.ms=17280
log.cleanup.policy=delete
group.max.session.timeout.ms=120



Thanks

> On Wed, Sep 22, 2021 at 9:06 PM Robert Metzger  wrote:
> Hi Kamaal,
> 
> I would first suggest understanding the performance bottleneck, before 
> applying any optimizations.
> 
> Idea 1: Are your CPUs fully utilized?
> if yes, good, then scaling up will probably help
> If not, then there's another inefficiency
> 
> Idea 2: How fast can you get the data into your job, without any processing?
> You can measure this by submitting a simple Flink job that just reads the 
> data and writes it to a discarding sink. Either disable the operator chaining 
> to get metrics for the records per second, or add a custom mapper in between 
> that measures the throughput.
> Ideally you see here that you can read all your data in a few seconds, if 
> not, then there's a problem getting your data in.
> 
> Idea 3: is your IO fully utilized ?( if you are checkpointing to RocksDB, the 
> disk can dramatically slow you down)
> Idea 4: Are you under high memory pressure, and your JVMs are spending most 
> of their cycles garbage collecting?
> 
> My bet is you are not getting data into your cluster as fast as you think 
> (Idea 2)
> 
> 
>> On Wed, Sep 22, 2021 at 12:05 PM Mohammed Kamaal 
>>  wrote:
>> Hi Arvid,
>> 
>> The throughput has decreased further after I removed all the rebalance(). 
>> The performance has decreased from 14 minutes for 20K messages to 20 minutes 
>> for 20K messages.
>> 
>> Below are the tasks that the flink application is performing. I am using 
>> keyBy and Window operation. Do you think am I making any mistake here or the 
>> way I am performing the keyBy or Window operation needs to be corrected?.
>> 
>> //Add Source
>> StreamExecutionEnvironment streamenv = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> initialStreamData = streamenv.addSource(new 
>> FlinkKafkaConsumer<>(topicsProperties.getProperty(Common.CGM_STREAM_TOPIC),
>> new ObjectNodeJsonDeSerializerSchema(), 
>> kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);
>> 
>> DataStream cgmStreamData = initialStreamData.keyBy(value -> 
>> value.findValue("PERSON_ID").asText())
>> .flatMap(new SgStreamingTask()).setParallelism(Common.FORTY_FIVE);
>> 
>> DataStream artfctOverlapStream = cgmStreamData.keyBy(new 
>> CGMKeySelector()).countWindow(2, 1)
>> .apply(new 
>> ArtifactOverlapProvider()).setParallelism(Common.FORTY_FIVE).rebalance();
>> 
>> DataStream streamWithSgRoc = artfctOverlapStream.keyBy(new 
>> CGMKeySelector()).countWindow(7, 1)
>> .apply(new SgRocProvider()).setParallelism(Common.FORTY_FIVE).rebalance();
>> 
>> DataStream cgmExcursionStream = streamWithSgRoc.keyBy(new 
>> CGMKeySelector())
>> .countWindow(Common.THREE, Common.ONE).apply(new 
>> CGMExcursionProviderStream()).setParallelism(Common.FORTY_FIVE).rebalance();
>> 
>> //Add Sink
>> cgmExcursionStream.addSink(new FlinkKafkaProducer(
>> topicsProperties.getProperty(Common.CGM_EVENT_TOPIC), new 
>> CGMDataCollectorSchema(),
>> kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);
>> 
>> Implementation classes:-
>> 
>> //deserialize the json message received
>> ObjectNodeJsonDeSerializerSchema implements 
>> KeyedDeserializationSchema{
>> public ObjectNode deserialize(byte[] messageKey, byte[] message, String 
>> topic, int partition, long offset);
>> }
>> 
>> //Flapmap to check each message and apply validation
>> public class SgStreamingTask extends RichFlatMapFunction {
>> void flatMap(ObjectNode streamData, Collector out);
>> }
>> 
>> //persist three state variables and apply business logic
>> public class ArtifactOverlapProvider extends RichFlatMapFunction> Tuple2>
>> implements WindowFunction {
>> public void apply(String key, GlobalWindow window, Iterable values, 
>> Collector out);
>> }
>> 
>> //Apply business logic
>> public class SgRocProvider implements 

Re: flink job : TPS drops from 400 to 30 TPS

2021-09-27 Thread JING ZHANG
Hi,
About cpu cost, there are several methods:
1. Flink builtin metric: `Status.JVM.CPU.Load` [1]
2. Use `top` command on the target machine which deploys a suspect
TaskManager
3. You could use flame graph to do deeper profiler of a JVM [2].
...

About RPC response, I'm not an expert on HBase, I'm not sure whether HBase
cluster has metrics to trace each RPC response time.
You could also add metric to trace the time cost of a remote request in an
extension HBase connector.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/metrics/#cpu
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/flame_graphs/

Hope it helps.

Best,
JING ZHANG

Ragini Manjaiah  于2021年9月27日周一 下午5:25写道:

> please let me know how to check Does RPC response and CPU cost
>
> On Mon, Sep 27, 2021 at 1:19 PM JING ZHANG  wrote:
>
>> Hi,
>> Since there is not enough information, you could first check the back
>> pressure status of the job [1], find the task which caused the back
>> pressure.
>> Then try to find out why the task processed data slowly, there are many
>> reasons, for example the following reasons:
>> (1) Does data skew exist, which means some tasks processed more input
>> data than the other tasks?
>> (2) Is the CPU cost very high?
>> (3) Does RPC response start to slow down?
>> (4) If you choose async mode lookup, the LookupJoin operator needs to
>> buffer some data into state. Which state backend do you use? Does the state
>> backend work fine?
>> ...
>>
>> Would you please provide more information about the job, for example back
>> pressure status, input data distribution, async mode or sync mode lookup.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/monitoring/back_pressure/
>>
>> Best,
>> JING ZHANG
>>
>> Ragini Manjaiah  于2021年9月27日周一 下午2:05写道:
>>
>>> Hi ,
>>> I have a flink real time job which  processes user records via topic and
>>> also reading data from hbase acting as a look table . If the look table
>>> does not contain required metadata then it queries the external db via api
>>> . First 1to 2 hours it works fine without issues, later it drops down
>>> drastically to 30 TPS. What are the things I need to look into in such a
>>> situation? There are no exceptions caught . how to check the bottle neck
>>> area . can some throw some light on this.
>>>
>>>
>>> Thanks & Regards
>>> Ragini Manjaiah
>>>
>>>


Re: flink job : TPS drops from 400 to 30 TPS

2021-09-27 Thread Ragini Manjaiah
please let me know how to check Does RPC response and CPU cost

On Mon, Sep 27, 2021 at 1:19 PM JING ZHANG  wrote:

> Hi,
> Since there is not enough information, you could first check the back
> pressure status of the job [1], find the task which caused the back
> pressure.
> Then try to find out why the task processed data slowly, there are many
> reasons, for example the following reasons:
> (1) Does data skew exist, which means some tasks processed more input data
> than the other tasks?
> (2) Is the CPU cost very high?
> (3) Does RPC response start to slow down?
> (4) If you choose async mode lookup, the LookupJoin operator needs to
> buffer some data into state. Which state backend do you use? Does the state
> backend work fine?
> ...
>
> Would you please provide more information about the job, for example back
> pressure status, input data distribution, async mode or sync mode lookup.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/monitoring/back_pressure/
>
> Best,
> JING ZHANG
>
> Ragini Manjaiah  于2021年9月27日周一 下午2:05写道:
>
>> Hi ,
>> I have a flink real time job which  processes user records via topic and
>> also reading data from hbase acting as a look table . If the look table
>> does not contain required metadata then it queries the external db via api
>> . First 1to 2 hours it works fine without issues, later it drops down
>> drastically to 30 TPS. What are the things I need to look into in such a
>> situation? There are no exceptions caught . how to check the bottle neck
>> area . can some throw some light on this.
>>
>>
>> Thanks & Regards
>> Ragini Manjaiah
>>
>>


Re: flink job : TPS drops from 400 to 30 TPS

2021-09-27 Thread JING ZHANG
Hi,
Since there is not enough information, you could first check the back
pressure status of the job [1], find the task which caused the back
pressure.
Then try to find out why the task processed data slowly, there are many
reasons, for example the following reasons:
(1) Does data skew exist, which means some tasks processed more input data
than the other tasks?
(2) Is the CPU cost very high?
(3) Does RPC response start to slow down?
(4) If you choose async mode lookup, the LookupJoin operator needs to
buffer some data into state. Which state backend do you use? Does the state
backend work fine?
...

Would you please provide more information about the job, for example back
pressure status, input data distribution, async mode or sync mode lookup.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/monitoring/back_pressure/

Best,
JING ZHANG

Ragini Manjaiah  于2021年9月27日周一 下午2:05写道:

> Hi ,
> I have a flink real time job which  processes user records via topic and
> also reading data from hbase acting as a look table . If the look table
> does not contain required metadata then it queries the external db via api
> . First 1to 2 hours it works fine without issues, later it drops down
> drastically to 30 TPS. What are the things I need to look into in such a
> situation? There are no exceptions caught . how to check the bottle neck
> area . can some throw some light on this.
>
>
> Thanks & Regards
> Ragini Manjaiah
>
>


Re: flink 1.13.2 使用avg函数对int字段求平均值,输出类型为int类型,而不是浮点型

2021-09-27 Thread Shuo Cheng
by-design 的行为, avg 就是 sum / count, flink 目前行为是根据入参类型来推断返回类型 (与 `sum` 以及 `/`
保持一致), 想要保持高精度,可以考虑把入参 cast 成 double.

On Mon, Sep 27, 2021 at 2:30 PM Asahi Lee <978466...@qq.com.invalid> wrote:

> hi!   我使用flink 1.13.2版本,在对 int 类型的字段通过avg函数求平均值时,其返回值类型为
> int ,而不是 double,decimal等浮点类型,导致计算值的精度丢失,请问这是bug吗?


退订

2021-09-27 Thread rzy1107
退订

flink 1.13.2 ????avg??????int????????????????????????int??????????????????

2021-09-27 Thread Asahi Lee
hi!   ??flink 1.13.2?? int 
??avg?? int  
double??decimal??bug

flink job : TPS drops from 400 to 30 TPS

2021-09-27 Thread Ragini Manjaiah
Hi ,
I have a flink real time job which  processes user records via topic and
also reading data from hbase acting as a look table . If the look table
does not contain required metadata then it queries the external db via api
. First 1to 2 hours it works fine without issues, later it drops down
drastically to 30 TPS. What are the things I need to look into in such a
situation? There are no exceptions caught . how to check the bottle neck
area . can some throw some light on this.


Thanks & Regards
Ragini Manjaiah