【Flink SQL】维表优化规则建议

2021-01-08 Thread 张韩
版本:1.12
问题:维表关联若是支持事件时间,维表需要有主键和时间属性,在满足这两个条件前提下,自定义维表若是实现LookupTableSource接口则优化会报异常:
Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are 
not enough rules to produce a node with desired properties: 
convention=STREAM_PHYSICAL, FlinkRelDistributionTraitDef=any, 
MiniBatchIntervalTraitDef=None: 0, ModifyKindSetTraitDef=[NONE], 
UpdateKindTraitDef=[NONE].
Missing conversion is FlinkLogicalTableSourceScan[convention: LOGICAL -> 
STREAM_PHYSICAL]
建议:LookupTableSource的维表关联需是系统时间,在优化规则LogicalCorrelateToJoinFromTemporalTableRule中把这种限制信息提示出来是不是更好些




 

【Flink SQL】维表优化规则建议

2021-01-08 Thread 张韩
版本:1.12
问题:维表关联若是支持事件时间,维表需要有主键和时间属性,在满足这两个条件前提下,自定义维表若是实现LookupTableSource接口则优化会报异常:
Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are 
not enough rules to produce a node with desired properties: 
convention=STREAM_PHYSICAL, FlinkRelDistributionTraitDef=any, 
MiniBatchIntervalTraitDef=None: 0, ModifyKindSetTraitDef=[NONE], 
UpdateKindTraitDef=[NONE].
Missing conversion is FlinkLogicalTableSourceScan[convention: LOGICAL -> 
STREAM_PHYSICAL]
建议:LookupTableSource的维表关联需是系统时间,在优化规则LogicalCorrelateToJoinFromTemporalTableRule中把这种限制信息提示出来是不是更好些

Re: Re:Re: Re:flink作业通过grafana监控,若想发出报警该如何选择指标的问题

2021-01-08 Thread 赵一旦
使用Flink的rest
api不可以嘛。我是standalone集群,写个python脚本,写了个list为expected_jobs,如果发现集群没这个job就报警。

Yun Tang  于2021年1月8日周五 上午10:53写道:

> 因为numRestarts 是一个累计值,所以你得区分当前值和之前的数值是否发生了增加,来区分是否发生了failover。
>
>
> 另外,不建议使用YARN的application状态来判断Flink作业状态,因为如果Flink作业配置了重试策略,即使作业不断进行failover,整个YARN的application状态仍然是RUNNING,并不能发现问题。
>
> 祝好
> 唐云
> 
> From: bradyMk 
> Sent: Thursday, January 7, 2021 16:38
> To: user-zh@flink.apache.org 
> Subject: Re: Re:Re: Re:flink作业通过grafana监控,若想发出报警该如何选择指标的问题
>
> 好的,我研究一下,谢谢指导~
>
>
>
> -
> Best Wishes
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Use Flink to process request with list of queries and aggregate

2021-01-08 Thread Li Wang
Hi Team,

I have a Flink application reading from Kafka. Each payload is a request
sent by a user containing a list of queries. What I would like to do is use
Flink to process the queries parallelly and aggregate results and send back
to the user.

For example, let's say we have two messages in Kafka

message 1: "user1 - [query1, query2, query3]" should return as "user1 -
[result1, result2, result3]"
message 2: "user2 - [query1, query2, query3, query4]" should return as
"user2 - [result1, result2, result3, result4]"

My idea is to use flatmap to split each query and keyBy user and then
aggregate. But how do I know when aggregation is finished? If I use
`countWindow` how to pass in queries length as a variable since it's not
constant?

Thanks.
- Li


Normalizing Metric Identifies for Reporting Systems

2021-01-08 Thread Joseph Lorenzini




Hi all,
 
I am implementing a metric reporter for newrelic. I’d like it to support a job’s operator metrics that come with the flink framework out of the box. In order to ensure each metric is unique you can’t use the
 metric name, you need to use the metric identifier. However, I am not sure the best way to present metric identifiers to newrelic. Here’s the format for an operator.
 
.taskmanager
 
Here’s an example:
 
ip.taskmanager.19ebf992ecc26eed8269da120a17c20a.NorvaxWordCount.CHAIN DataSource (at get(TextInput.java:63) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (TokenizerFlatMap)
 -> Combine (SumTheFields).0.isBackPressured
 
What’s currently tripping me up is the value of the operator:

 
CHAIN DataSource (at get(TextInput.java:63) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (TokenizerFlatMap) -> Combine (SumTheFields).
 
I can’t realistically make that part of the newrelic metric name. That wouldn’t be usable in the UI or for querying and NR has length restrictions on metric names. What I am looking for is a way to normalize
 this as part of the metric name such that it’s both useful for a human to consume and something that a system like newrelic can consume.
 
Thanks,
Joe 
 
 

Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should
 destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate
 to the official business of my firm shall be understood as neither given nor endorsed by it.




Re: Normalizing Metric Identifies for Reporting Systems

2021-01-08 Thread Chesnay Schepler
One thing you could do is take the first N characters and hash the 
remaining ones; I don't think there is a better solution at the moment.
The size of job/task/operator names is a rather fundamental issue that 
makes a lot of things complicated (metrics, logging, UI), but we haven't 
made any progress on transitioning to more sane names.


Note that if NR or the user need truly unique metric names then you 
cannot rely on operator names, and should rather use their IDs.


On 1/8/2021 3:16 PM, Joseph Lorenzini wrote:


Hi all,

I am implementing a metric reporter for newrelic. I’d like it to 
support a job’s operator metrics that come with the flink framework 
out of the box. In order to ensure each metric is unique you can’t use 
the metric name, you need to use the metric identifier. However, I am 
not sure the best way to present metric identifiers to newrelic. 
Here’s the format for an operator.


.taskmanager

Here’s an example:

ip.taskmanager.19ebf992ecc26eed8269da120a17c20a.NorvaxWordCount.CHAIN 
DataSource (at get(TextInput.java:63) 
(org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap 
(TokenizerFlatMap) -> Combine (SumTheFields).0.isBackPressured


What’s currently tripping me up is the value of the operator:

CHAIN DataSource (at get(TextInput.java:63) 
(org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap 
(TokenizerFlatMap) -> Combine (SumTheFields).


I can’t realistically make that part of the newrelic metric name. That 
wouldn’t be usable in the UI or for querying and NR has length 
restrictions on metric names. What I am looking for is a way to 
normalize this as part of the metric name such that it’s both useful 
for a human to consume and something that a system like newrelic can 
consume.


Thanks,

Joe

Privileged/Confidential Information may be contained in this message. 
If you are not the addressee indicated in this message (or responsible 
for delivery of the message to such person), you may not copy or 
deliver this message to anyone. In such case, you should destroy this 
message and kindly notify the sender by reply email. Please advise 
immediately if you or your employer does not consent to Internet email 
for messages of this kind. Opinions, conclusions and other information 
in this message that do not relate to the official business of my firm 
shall be understood as neither given nor endorsed by it. 





RE: Flink to get historical data from kafka between timespan t1 & t2

2021-01-08 Thread VINAY.RAICHUR
Thanks Aljoscha for your prompt response. It means a lot to me 

Could you also attach the code snippet for KafkaSource`, `KafkaSourceBuilder`, 
and `OffsetInitializers` that you were referring to in your previous reply, for 
my reference please to make it more clearer for me.

Kind regards,
Vinay

-Original Message-
From: Aljoscha Krettek  
Sent: 08 January 2021 19:26
To: user@flink.apache.org
Subject: Re: Flink to get historical data from kafka between timespan t1 & t2

Hi,

for your point 3. you can look at
`FlinkKafkaConsumerBase.setStartFromTimestamp(...)`.

Points 1. and 2. will not work with the well established `FlinkKafkaConsumer`. 
However, it should be possible to do it with the new `KafkaSource` that was 
introduced in Flink 1.12. It might be a bit rough around the edged, though.

With the `KafkaSource` you can specify `OffsetInitializers` for both the 
starting and stopping offset of the source. Take a look at `KafkaSource`, 
`KafkaSourceBuilder`, and `OffsetInitializers` in the code.

I hope this helps.

Best,
Aljoscha

On 2021/01/08 07:51, vinay.raic...@t-systems.com wrote:
>Hi Flink Community Team,
>
>This is a desperate request for your help on below.
>
>I am new to the Flink and trying to use it with Kafka for Event-based data 
>stream processing in my project. I am struggling using Flink to find solutions 
>to my requirements of project below:
>
>
>  1.  Get all Kafka topic records at a given time point 't' (now or in 
>the past). Also how to pull latest-record only* from Kafka using Flink
>  2.  Getting all records from Kafka for a given time interval in the past 
> between t1 & t2 time period.
>  3.  Continuously getting data from Kafka starting at a given time point (now 
> or in the past). The client will actively cancel/close the data streaming. 
> Examples: live dashboards. How to do it using Flink?
>Please provide me sample "Flink code snippet" for pulling data from kafka for 
>above three requirements and oblige. I am stuck for last one month without 
>much progress and your timely help will be a savior for me!
>Thanks & Regards,
>Vinay Raichur
>T-Systems India | Digital Solutions
>Mail: vinay.raic...@t-systems.com
>Mobile: +91 9739488992
>


flink1.12的kafka source 和 OperatorCoordinator

2021-01-08 Thread hao kong
hello,我一直在关注FLIP-27实现的全新source,更新1.12后发现了已经有新的kafkasource实现,目前在使用kafkasource与coordinator通信的时候遇到了困难。
我创建了一个kafkasource和对应的operatorCoordinator,并在source上create了一个reader,在operatorCoordinator的handleEventFromOperator上打了一个断点,在启动的时候能够成功收到注册reader的消息,但是无法收到其他通过sourceReaderContext的sendSourceEventToCoordinator发送的sourceEvent,主要原因是不知道如何去new一个sourceReaderContext,请问是否有相应的代码例子呢?


Re: Flink to get historical data from kafka between timespan t1 & t2

2021-01-08 Thread Aljoscha Krettek

Hi,

for your point 3. you can look at 
`FlinkKafkaConsumerBase.setStartFromTimestamp(...)`.


Points 1. and 2. will not work with the well established 
`FlinkKafkaConsumer`. However, it should be possible to do it with the 
new `KafkaSource` that was introduced in Flink 1.12. It might be a bit 
rough around the edged, though.


With the `KafkaSource` you can specify `OffsetInitializers` for both the 
starting and stopping offset of the source. Take a look at 
`KafkaSource`, `KafkaSourceBuilder`, and `OffsetInitializers` in the 
code.


I hope this helps.

Best,
Aljoscha

On 2021/01/08 07:51, vinay.raic...@t-systems.com wrote:

Hi Flink Community Team,

This is a desperate request for your help on below.

I am new to the Flink and trying to use it with Kafka for Event-based data 
stream processing in my project. I am struggling using Flink to find solutions 
to my requirements of project below:


 1.  Get all Kafka topic records at a given time point 't' (now or in the 
past). Also how to pull latest-record only* from Kafka using Flink
 2.  Getting all records from Kafka for a given time interval in the past between 
t1 & t2 time period.
 3.  Continuously getting data from Kafka starting at a given time point (now 
or in the past). The client will actively cancel/close the data streaming. 
Examples: live dashboards. How to do it using Flink?
Please provide me sample "Flink code snippet" for pulling data from kafka for 
above three requirements and oblige. I am stuck for last one month without much progress 
and your timely help will be a savior for me!
Thanks & Regards,
Vinay Raichur
T-Systems India | Digital Solutions
Mail: vinay.raic...@t-systems.com
Mobile: +91 9739488992



Re: Flink taskmanager id

2021-01-08 Thread Till Rohrmann
Hi Omkar,

Since version 1.12.0 you can configure the TaskManager's resource id via
`taskmanager.resource-id` [1]. Moreover, if not set, then it defaults to
rpcAddress:rpcPort and a 6 digit random suffix.

[1] https://issues.apache.org/jira/browse/FLINK-17579

Cheers,
Till

On Thu, Jan 7, 2021 at 11:53 PM Deshpande, Omkar 
wrote:

> Hello,
>
> I use flink on kubernetes. And the taskmanagers get assigned random uuids.
> Is there a way to explicitly configure them to use hostnames instead?
>
>
> Omkar
>


Re: How should I process a cumulative counter?

2021-01-08 Thread Aljoscha Krettek

Hi Larry,

the basic problem for your use case is that window boundaries are 
inclusive for the start timestamp and exclusive for the end timestamp.  
It's setup like this to ensure that consecutive tumbling windows don't 
overlap. This is only a function of how our `WindowAssigner` works, so 
it could be done differently in a different system.


Have you tried using a sliding window where the `slide` is `size - 1ms`?  
With this, you would ensure that elements that fall exactly on the 
boundary, i.e. your hourly sensor updates would end up in both of the 
consecutive windows. It seems a bit unorthodox but could work in your 
case.


Best,
Aljoscha

On 2021/01/08 08:56, Larry Aspen wrote:

Hi,

I'm evaluating Flink for our company's IoT use case and read a blog post
by Fabian Hueske from 2015 [1]. We have a similar situation except the
sensor is sending the value of a cumulative counter instead of a count.
We would like to calculate the sum of deltas of consecutive cumulative
counter values that occur during a time window.

Here is a scenario of a cumulative counter measuring runtime in seconds
and a machine starting for the first time at 12:00:00 and running for
the whole hour (sensor records values when it starts, every 15 minutes
and on hour change):

timestamp, cumulative counter value in seconds
12:00:00, 0
12:15:00, 900
12:30:00, 1800
12:45:00, 2700
13:00:00, 3600

This would produce the following deltas:
12:00:00, 900 -0 = 900
12:15:00, 1800 - 900 = 900
12:30:00, 2700 - 1800 = 900
12:45:00, 3600 - 2700 = 900

We would then sum the deltas to get runtime in seconds for the hour:
900 + 900 + 900 + 900 = 3600

What would be a good way to handle this kind of calculation in Flink?

I have already tried using a tumbling event time window of one hour,
but then the last value is only part of the next window and the delta
of 12:45:00 is missing and the sum is 900 + 900 + 900 = 2700.

I have also tried a sliding event time window of two hours where the sum
is calculated for the first hour. This produces the correct sum in this
scenario but fails if the next hour is later (e.g. 12:45:00, 14:00:00
i.e. machine is shutdown between 12:45:00 - 13:00:00 and started at
14:00:00).

My latest attempt has been to use a global window where I try to keep
the values for the last two hours and calculate the sum for the older
hour. This seems to work in my experiments where I read values from
a file and use parallelism of one. If I increase the parallelism, the
values are processed out of order and the results are incorrect as
older values are received after newer values which causes them to be
evicted.

Any advice on this would be appreciated.

Best regards,
Larry Aspen

[1] https://flink.apache.org/news/2015/12/04/Introducing-windows.html


Re: Flink app logs to Elastic Search

2021-01-08 Thread Aljoscha Krettek
So you're saying there is no logging output whatsoever being sent to 
Elasticsearch? Did you try and see if the jar file is being picked up?  
Are you still getting the pre-defined, text-based logging output?


Best,
Aljoscha

On 2021/01/07 17:04, bat man wrote:

Hi Team,

I have a requirement to push the flink app logs to Elastic Search for log
management. Can anyone guide if you are already doing this.
I have tried this -
https://cristian.io/post/flink-log4j/
I’m not getting any error for a sample job I tried.

I am using EMR to run Flink 1.9 and Elastic Search latest version running
on ec2 machine.

Thanks,
Hemant


Re: Question about "NoWatermark" in Flink 1.9.2

2021-01-08 Thread Aljoscha Krettek

Thanks for the update!

Best,
Aljoscha

On 2021/01/07 16:45, Peter Huang wrote:

Hi,

We end up finding the root cause. Since a time point, two of the partitions
of the input topic don't have any data which causes the second window
operator in the pipeline can't receive the watermark of all of the
partitions of the first operator. Thus, the watermark can be determined.
Hopefully, this finding will be useful for other users in the community.


Best Regards
Peter Huang

On Thu, Jan 7, 2021 at 8:11 AM Peter Huang 
wrote:


Hi,

We have a pipeline running while for both datacenters. Since Jan 5th, one
of the instances has the issue of  "No WaterMark", In the program, we are
using BoundedOutOfdernessTimestampExtractor to get a ts field from each
message. The same code runs well in the other dc. We checked the input
message, ts field is correctly set. Even we switch auto-watermark, the
issue is still there. The graph below shows no Watermark in the second
operator. Did anyone experience the same issue before?



[image: image.png]


Best Regards
Peter Huang






FlinkSQL 的表中 定义字段为Float类型,查询时报错

2021-01-08 Thread automths
Hi:

我用Table API创建了一张表,表字段如下:
key:FLOAT,
col1: FLOAT,
col2: FLOAT,
col3: FLOAT


使用如下SQL进行查询:
select * from flink where
(key < 5)
or
((key > 10 and key < 12)
 or
(key in (15, 16, 17))  or (key > 18 and key <= 19))


执行时报了如下错误:
Exception in thread "main" java.lang.UnsupportedOperationException: class 
org.apache.calcite.sql.type.SqlTypeName: FLOAT
at org.apache.calcite.util.Util.needToImplement(Util.java:1075)
at org.apache.calcite.rex.RexLiteral.appendAsJava(RexLiteral.java:703)
at org.apache.calcite.rex.RexLiteral.toJavaString(RexLiteral.java:408)
at org.apache.calcite.rex.RexLiteral.computeDigest(RexLiteral.java:276)
at org.apache.calcite.rex.RexLiteral.(RexLiteral.java:223)
at org.apache.calcite.rex.RexLiteral.toLiteral(RexLiteral.java:737)
at org.apache.calcite.rex.RexLiteral.lambda$printSarg$4(RexLiteral.java:710)
at org.apache.calcite.util.RangeSets$Printer.lessThan(RangeSets.java:392)
at org.apache.calcite.util.RangeSets.forEach(RangeSets.java:253)
at org.apache.calcite.util.Sarg.lambda$printTo$0(Sarg.java:110)
at org.apache.calcite.linq4j.Ord.forEach(Ord.java:157)
at org.apache.calcite.util.Sarg.printTo(Sarg.java:106)
at org.apache.calcite.rex.RexLiteral.printSarg(RexLiteral.java:709)
at org.apache.calcite.rex.RexLiteral.lambda$appendAsJava$1(RexLiteral.java:652)
at org.apache.calcite.util.Util.asStringBuilder(Util.java:2502)
at org.apache.calcite.rex.RexLiteral.appendAsJava(RexLiteral.java:651)
at org.apache.calcite.rex.RexLiteral.toJavaString(RexLiteral.java:408)
at org.apache.calcite.rex.RexLiteral.computeDigest(RexLiteral.java:276)
at org.apache.calcite.rex.RexLiteral.(RexLiteral.java:223)
at org.apache.calcite.rex.RexBuilder.makeLiteral(RexBuilder.java:971)


麻烦问一下,这是什么原因?定义字段时不能用Float吗?如果不能用,假如我的source或者sink需要是Float类型,怎么处理?望知道的大佬告知,感谢。


祝好!
automths





腾讯安检测的这个Apache Flink目录遍历漏洞风险通告,社区会对以前的版本根据修复吗

2021-01-08 Thread zhouyajun
报告链接:https://s.tencent.com/research/bsafe/1215.html



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Flink1.12怎么降级avro到1.8.2?

2021-01-08 Thread Dacheng
最终自己在 maven-shade-plugin的方向上有了突破
对“Error creating shaded jar SubmissionPublisher”的报错,网上并没有找到什么有用的信息

但尝试升级 maven-shade-plugin 版本3.2.0 -> 3.2.4 之后可以解决














在 2021-01-07 20:55:14,"Dacheng"  写道:
>Hi,
>
>
>大家好,
>
>
>降级avro遇到的问题
>在1.12官方文档里提到avro目前使用1.10,但是可以按需降级到1.8.2
>https://ci.apache.org/projects/flink/flink-docs-release-1.12/release-notes/flink-1.12.html#upgrade-to-avro-version-1100-from-182-flink-18192
>我这边在尝试降级时,遇到了下面的问题
>
>
>1. 如果直接依赖1.8.2 会导致 flink-avro里的 AvroSchemaConverter#nullableSchema 报错
>java.lang.NoSuchMethodError: org.apache.avro.Schema.isNullable()Z
>at 
> org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.nullableSchema(AvroSchemaConverter.java:448)
>我对比了一下,Avro 1.8.2和1.10.0,发现在1.8.2版本里就是没有Schema.isNullable()这个方法
>
>
>2. 为了解决问题 1,我尝试使用maven shade plugin,把org.apache.avro这个依赖进行relocate
>pom为 https://paste.ubuntu.com/p/SMYHy66bc6/
>但是报错
>[ERROR] Failed to execute goal 
>org.apache.maven.plugins:maven-shade-plugin:3.2.0:shade (shade-deps) on 
>project leyan-flink: Error creating shaded jar: Problem shading JAR 
>/Users/dongzhi/.m2/repository/org/glassfish/jersey/core/jersey-common/2.30/jersey-common-2.30.jar
> entry 
>META-INF/versions/11/org/glassfish/jersey/internal/jsr166/SubmissionPublisher$1.class:
> org.apache.maven.plugin.MojoExecutionException: Error in ASM processing class 
>META-INF/versions/11/org/glassfish/jersey/internal/jsr166/SubmissionPublisher$1.class:
> UnsupportedOperationException -> [Help 1]
>[ERROR]
>这个暂时没找到解决方案。因此求助社区
>
>
>降级avro的原因  flink-sql集成hive时遇到了问题
>另,我之前使用avro 1.10没问题
>但是在尝试引入hive依赖,在SQL使用hive的内置函数时,依赖项为
>flink-avro-confluent-registry-1.12.0
>flink-connector-hive_2.11-1.12.0(provided)
>hive-exec 2.3.4 (provided)
>hadoop-mapreduce-client-core 2.6.0-cdh5.13.3 <= 引入这个依赖是因为遇到了和  
>http://apache-flink.147419.n8.nabble.com/Flink-td7866.html  相同的问题
>在flink/lib下的额外jar包为
>flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar
>hadoop-mapreduce-client-core-2.6.0-cdh5.13.3.jar
>遇到了和 https://github.com/confluentinc/schema-registry/issues/1432 
>类似的报错。因此尝试降级avro到1.8.2
>java.lang.NoSuchFieldError: FACTORY
>at org.apache.avro.Schemas.toString(Schemas.java:36) 
> ~[trade-event.jar:1.12.0] 
>但是整个项目使用 mvn dependency:tree分析,没有任何地方使用 avro 1.8.2。
>flink应用的发布方式是on-yarn per-job;线上CDH集群使用的avro版本比较旧,不知道是不是因为这个问题导致哪里使用了avro 1.8.2
>
>
>谢谢


Re: How to gracefully avoid "Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type"?

2021-01-08 Thread Arvid Heise
Hi Dongwon,

inferring the type information of Java classes is quite messy. At first, it
seems like that should work out the box as you are only using  as the
type of the list, right? However, there is no way of knowing if you didn't
use a subclass of A. Of course, if A was final, it might be possible to
exclude this case but you quickly go down a rabbit hole.
It gets especially bad if you consider that your classes evolve over time.
What happens if A is first final, but you later decide to subclass it? How
should Flink map old data to the new hierarchy? Flink falls back to Kryo
for most cases, which is why you need generic types. However, that is
rather inefficient unless you register all possible classes beforehand [1].

To avoid such limitations, I always recommend schema-first approaches when
moving a PoC to production code. First figure out what kind of data you
actually want to transfer. Then, settle for a serializer [2]. Then, create
the schema and let the classes be generated (on-the-fly).

I usually do it in two ways: if I write a rather generic program, I try to
use Table API, which optimizes everything in a Row has one of the most
memory efficient representations. If Table API is not sufficient, I fall
back to Avro and use Avro specific records [3].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html
[2]
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
[3] https://avro.apache.org/docs/current/gettingstartedjava.html


On Mon, Jan 4, 2021 at 6:49 PM Dongwon Kim  wrote:

> Any advice would be appreciated :)
>
> Thanks,
>
> Dongwon
>
> -- Forwarded message -
> From: Dongwon Kim 
> Date: Mon, Dec 14, 2020 at 11:27 PM
> Subject: How to gracefully avoid "Generic types have been disabled in the
> ExecutionConfig and type java.util.List is treated as a generic type"?
> To: user 
>
>
> Hi,
>
> The following program compiles and runs w/o exceptions:
>
>> public class Test {
>>
>>   public static class A {
>> private int n;
>>
>> public A() { }
>> public int getN() {  return n;  }
>> public void setN(int n) {  this.n = n;  }
>>   }
>>
>>   public static class B {
>> private List lst;
>>
>> public B() { }
>> public List getLst() {  return lst;  }
>> public void setLst(List lst) {  this.lst = lst;  }
>>   }
>>
>>   public static void main(String[] args) throws Exception {
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.createLocalEnvironment();
>>
>> env.fromElements(new B())
>>   .print();
>>
>> env.execute();
>>   }
>> }
>>
>
> When I add the following line,
>
>> env.getConfig().disableGenericTypes();
>
> then the program shows me an exception:
>
>> Exception in thread "main" java.lang.UnsupportedOperationException:
>> Generic types have been disabled in the ExecutionConfig and type
>> java.util.List is treated as a generic type.
>> at
>> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
>> at
>> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319)
>> at
>> org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:970)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromElements(StreamExecutionEnvironment.java:871)
>> at Test.main(Test.java:29)
>
>
> To avoid this exception, I found that I have to declare a type factory
> like:
>
>>   public static class BTypeFactory extends TypeInfoFactory {
>> @Override
>> public TypeInformation createTypeInfo(Type t, Map> TypeInformation> genericParameters) {
>>   return Types.POJO(
>> B.class,
>> ImmutableMap.>builder()
>>   .put("lst", Types.LIST(Types.POJO(A.class)))
>> .build()
>>   );
>> }
>>   }
>
> and give it to class B as follows:
>
>>   @TypeInfo(BTypeFactory.class)
>>   public static class B {
>
>
> Is there no other way but to declare BTypeFactory in such cases?
> I don't like the way I have to type a field name twice, one for a member
> variable and the other for an Map entry in TypeInfoFactory.
>
> Thanks in advance,
>
> Dongwon
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


检查点无错误,但是日志中有错误,不清楚是什么原理呢?

2021-01-08 Thread 赵一旦
检查点无错误,但检查点配置的后端sdk有报错,所以不清楚这个错误究竟有没有影响。下面是报错堆栈,帮忙分析下这个是写检查点数据的过程吗?如果是的话,404是什么意思。找不到?找不到啥。。。

com.baidubce.BceServiceException: Not Found (Status Code: 404; Error Code:
null; Request ID: 624d3468-8d7b-46f7-be5d-750c9039893d)
at
com.baidubce.http.handler.BceErrorResponseHandler.handle(BceErrorResponseHandler.java:59)
~[bce-java-sdk-0.10.82.jar:?]
at com.baidubce.http.BceHttpClient.execute(BceHttpClient.java:243)
~[bce-java-sdk-0.10.82.jar:?]
at
com.baidubce.AbstractBceClient.invokeHttpClient(AbstractBceClient.java:189)
~[bce-java-sdk-0.10.82.jar:?]
at
com.baidubce.services.bos.BosClient.getObjectMetadata(BosClient.java:1189)
~[bce-java-sdk-0.10.82.jar:?]
at
com.baidubce.services.bos.BosClient.getObjectMetadata(BosClient.java:1171)
~[bce-java-sdk-0.10.82.jar:?]
at
org.apache.hadoop.fs.bos.BosNativeFileSystemStore.retrieveMetadata(BosNativeFileSystemStore.java:531)
~[bos-hdfs-sdk-1.0.1-SNAPSHOT-0.jar:?]
at sun.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
~[?:?]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_251]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_251]
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
~[hadoop-common-2.7.5.jar:?]
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
~[hadoop-common-2.7.5.jar:?]
at org.apache.hadoop.fs.bos.$Proxy22.retrieveMetadata(Unknown
Source) ~[?:?]
at
org.apache.hadoop.fs.bos.BaiduBosFileSystem.getFileStatus(BaiduBosFileSystem.java:252)
~[bos-hdfs-sdk-1.0.1-SNAPSHOT-0.jar:?]
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1425)
~[hadoop-common-2.7.5.jar:?]
at
org.apache.hadoop.fs.bos.BaiduBosFileSystem.create(BaiduBosFileSystem.java:163)
~[bos-hdfs-sdk-1.0.1-SNAPSHOT-0.jar:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
~[hadoop-common-2.7.5.jar:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:891)
~[hadoop-common-2.7.5.jar:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:788)
~[hadoop-common-2.7.5.jar:?]
at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:149)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:61)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:389)
~[flink-dist_2.11-1.12.0.jar:1.1
2.0]
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flushToFile(FsCheckpointStreamFactory.java:254)
~[flink-dist_2.11-1.12.0.jar:1.12
.0]
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:230)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at java.io.DataOutputStream.write(DataOutputStream.java:107)
~[?:1.8.0_251]
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
~[?:1.8.0_251]
at
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.getBytes(NetworkBuffer.java:397)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractUnpooledSlicedByteBuf.getBytes(AbstractUnpooledSlicedByteBuf.java:392)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.shaded.netty4.io.netty.buffer.SlicedByteBuf.getBytes(SlicedByteBuf.java:26)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.shaded.netty4.io.netty.buffer.ReadOnlyByteBuf.getBytes(ReadOnlyByteBuf.java:264)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateSerializerImpl.writeData(ChannelStateSerializer.java:164)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.lambda$write$2(ChannelStateCheckpointWriter.java:136)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.runWithChecks(ChannelStateCheckpointWriter.java:226)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.write(ChannelStateCheckpointWriter.java:133)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.writeOutput(ChannelStateCheckpointWriter.java:125)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at