flink实时双流驱动join问题

2022-09-16 Thread casel.chen
请教一个flink实现实时双流驱动join问题:


order cdc流字段:order_id, order_status, order_time, user_id (order_id是主键)
user cdc流字段:user_id, user_name, user_phone, user_address(user_id是主键)
关联结果流字段:order_id, order_status, order_time, user_name, user_phone, 
user_address(order_id是主键)
期望当order流数据更新或user流数据更新时,关联结果流数据都会得到更新。inner join不满足是因为两条流distinct 
id都很大,状态会很大,且不能TTL,因为user流更新时间不定,短的几小时,长达上月。


请问这种场景下要如何使用flink实现实时双流驱动join?

Re: SQL Engine Type inference when extending AsyncTableFunction class twice.

2022-09-16 Thread Jonathan Weaver
I think I've narrowed it down to this function in ExtractionUtils

public static Optional> extractSimpleGeneric(
Class baseClass, Class clazz, int pos) {
try {
if (clazz.getSuperclass() != baseClass) {
return Optional.empty();
}
final Type t =
((ParameterizedType) clazz.getGenericSuperclass())
.getActualTypeArguments()[pos];
return Optional.ofNullable(toClass(t));
} catch (Exception unused) {
return Optional.empty();
}
}

clazz.superClasss() == "BaseClass" in my example and baseClass in the
function is expecting AsyncTableFunction .. because that doesn't
compare it returns an empty result, even though it's correctly getting the
type inference elsewise.

Is there a way we could allow multiple extends in the future, instead of
just allowing a direct single subclass?



On Thu, Sep 15, 2022 at 4:42 PM Jonathan Weaver 
wrote:

> I am having an issue with the automatic type inference with SQL engine in
> an AsyncTableFunction class.
>
> I am extending AsyncTableFunction in a BaseClass (common code).
>
> Then extending again for some specific implementations.
>
> FinalClass extends BaseClass
>
> If I use BaseClass it correctly infers the output of the RowData from the
> catalog.
> If I use FinalClass it errors with
>
> Cannot extract a data type from an internal
> 'org.apache.flink.table.data.RowData' class without further information.
> Please use annotations to define the full logical type.
>
> So something with the typeInference is not looking at the right class in
> the hierarchy.
>
> I have tried overriding typeInformation at various points but it doesn't
> seem to help.
>
> Does anyone have an idea of how to have a common base class that gets
> extended with correct automatic typeinference?
>
> I can provide more details if needed.
>


Re: Classloading issues with Flink Operator / Kubernetes Native

2022-09-16 Thread Yaroslav Tkachenko
Application mode. I've done a bit more research and created
https://issues.apache.org/jira/browse/FLINK-29288, planning to work on a PR
today.

TLDR: currently Flink operator always creates /opt/flink/usrlib folder and
forces you to specify the jarURI parameter, which is passed as
pipeline.jars / pipeline.classpaths configuration options. This leads to
the jar being loaded twice by different classloaders (system and user
ones).

On Fri, Sep 16, 2022 at 2:30 AM Matthias Pohl 
wrote:

> Are you deploying the job in session or application mode? Could you
> provide the stacktrace. I'm wondering whether that would be helpful to pin
> a code location for further investigation.
> So far, I couldn't come up with a definite answer about placing the jar in
> the lib directory. Initially, I would have thought that it's fine
> considering that all dependencies are included and the job jar itself ends
> up on the user classpath. I'm curious whether Chesnay (CC'd) has an answer
> to that one.
>
> On Tue, Sep 13, 2022 at 1:40 AM Yaroslav Tkachenko 
> wrote:
>
>> Hey everyone,
>>
>> I’m migrating a Flink Kubernetes standalone job to the Flink operator
>> (with Kubernetes native mode).
>>
>> I have a lot of classloading issues when trying to run with the operator
>> in native mode. For example, I have a Postgres driver as a dependency (I
>> can confirm the files are included in the uber jar), but I still get
>> "java.sql.SQLException: No suitable driver found for jdbc:postgresql:..."
>> exception.
>>
>> In the Kubernetes standalone setup my uber jar is placed in the
>> /opt/flink/lib folder, this is what I specify as "jarURI" in the operator
>> config. Is this supported? Should I only be using /opt/flink/usrlib?
>>
>> Thanks for any suggestions.
>>
>


Re: A question about restoring state with an additional variable with kryo

2022-09-16 Thread Vishal Santoshi
Thank you for the clarification. I thought so to,

Unfortunately my state are generics based and those are definitely not
treated as a POJO , though it has all the constructs ( no arg constructor,
getters/setters etc ). I will likely take an at least once hit by

Changing the uid of that specific Operator, and restart with Allow
non-restored state ... This will ignore state that cannot be restored ( for
the previous uid ) , construct state for the new uid  and not affect other
operators ( including the kafka consumer operators ). I can live with it, I
think.

On Fri, Sep 16, 2022 at 2:55 AM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hi Vishal,
>
>
>
> Good news and bad news :
>
>
>
>- Bad: Kryo serializer cannot be used for schema evolution, see [1]
>- Good: not all is lost here,
>   - If you happen to have state that you cannot afford to lose, you
>   can transcode it by means of the savepoint API [2],
>   - However, this takes quite some effort
>- In general, if you ever plan to migrate/extend your schemas, choose
>a data type that supports schema migration [1],
>- In your case, PoJo types would be the closest to your original
>implementation
>- You can disable Kryo in configuration to avoid this situation in the
>future, by the way,
>- Kryo serializer is quite slow compared to the other options and I
>believe it is only there as a (emergency) fallback solution: [3]
>
>
>
> Feel free to ask for clarification 
>
>
>
> Thias
>
>
>
>
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/#kryo-cannot-be-used-for-schema-evolution
>
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/
>
> [3]
> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
>
>
>
>
>
>
>
> *From:* Vishal Santoshi 
> *Sent:* Friday, September 16, 2022 1:17 AM
> *To:* user 
> *Subject:* Re: A question about restoring state with an additional
> variable with kryo
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> The exception thrown is as follows. I realize that it is trying to read
> the long value. How do I signal to kryo that it is OK and that he object
> can have a default value
>
>
>
> Caused by: java.io.EOFException: No more bytes left.
>
> at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
> .require(NoFetchingInput.java:80)
>
> at com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690)
>
> at com.esotericsoftware.kryo.io.Input.readLong(Input.java:685)
>
> at com.esotericsoftware.kryo.serializers.
> DefaultSerializers$LongSerializer.read(DefaultSerializers.java:133)
>
> at com.esotericsoftware.kryo.serializers.
> DefaultSerializers$LongSerializer.read(DefaultSerializers.java:123)
>
> at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
>
> at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
> .java:113)
>
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:528)
>
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
> .deserialize(KryoSerializer.java:354)
>
> at org.apache.flink.api.common.typeutils.CompositeSerializer
> .deserialize(CompositeSerializer.java:156)
>
> at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(
> RocksDBValueState.java:89)
>
>
>
> On Thu, Sep 15, 2022 at 7:10 PM Vishal Santoshi 
> wrote:
>
> << How do I make sure that when reconstituting the state, kryo does not
> complain? It tries to map the previous state to the new definition of Class
> A and complains that it cannot read the value for `String b`.
>
>
>
> >> How do I make sure that when reconstituting the state, kryo does not
> complain? It tries to map the previous state to the new definition of Class
> A and complains that it cannot read the value for `long b`.
>
>
>
> Sorry a typo
>
>
>
>
>
> On Thu, Sep 15, 2022 at 7:04 PM Vishal Santoshi 
> wrote:
>
> I have state in rocksDB that represents say
>
>
>
> class A {
>
>   String a
>
> }
>
>
>
> I now change my class and add another variable
>
>
>
>
> Class A {
>
>   String a;
>
>   long b = 0;
>
> }
>
>
>
> How do I make sure that when reconstituting the state, kryo does not
> complain? It tries to map the previous state to the new definition of Class
> A and complains that it cannot read the value for `String b`.
>
>
>
> Unfortunately the state is not using POJO serializer.
>
>
>
> Thanks and Regards.
>
>
>
> Vishal
>
>
>
>
>
>
>
>
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser 

Re: ExecutionMode in ExecutionConfig

2022-09-16 Thread Yun Tang
Hi Hailu,

If you take a look at the history of ExecutionMode [1], apart from the 
refactoring commit, this class is introduced before the year 2016, in which 
DataSet API has not been deprecated.

>From my point of view, you should set runtime mode [2] instead of execution 
>mode currently if using Flink as a computation engine.


[1] 
https://github.com/apache/flink/commits/master/flink-core/src/main/java/org/apache/flink/api/common/ExecutionMode.java
[2] 
https://github.com/apache/flink/blob/9d2ae5572897f3e2d9089414261a250cfc2a2ab8/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java#L98

Best
Yun Tang


From: zhanghao.c...@outlook.com 
Sent: Thursday, September 15, 2022 0:03
To: Hailu, Andreas ; user@flink.apache.org 

Subject: Re: ExecutionMode in ExecutionConfig

It's added in Flink 1.14: 
https://nightlies.apache.org/flink/flink-docs-master/zh/release-notes/flink-1.14/#expose-a-consistent-globaldataexchangemode.
 Not sure if there's a way to change this in 1.13

Best,
Zhanghao Chen

From: Hailu, Andreas 
Sent: Wednesday, September 14, 2022 23:38
To: zhanghao.c...@outlook.com ; 
user@flink.apache.org 
Subject: RE: ExecutionMode in ExecutionConfig


I can give this a try. Do you know which Flink version does this feature become 
available in?



ah



From: zhanghao.c...@outlook.com 
Sent: Wednesday, September 14, 2022 11:10 AM
To: Hailu, Andreas [Engineering] ; 
user@flink.apache.org
Subject: Re: ExecutionMode in ExecutionConfig



Could you try setting ”execution.batch-shuffle-mode‘=‘ALL_EXCHANGES_PIPELINED’? 
Looks like the ExecutionMode in ExecutionConfig does not work for DataStream 
APIs.



The default shuffling behavior for a DataStream API in batch mode is 
'ALL_EXCHANGES_BLOCKING' where upstream and downstream tasks run subsequently. 
On the other hand, the pipelined mode will have upstream and downstream tasks 
run simultaneously.





Best,

Zhanghao Chen



From: Hailu, Andreas mailto:andreas.ha...@gs.com>>
Sent: Wednesday, September 14, 2022 21:37
To: zhanghao.c...@outlook.com 
mailto:zhanghao.c...@outlook.com>>; 
user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: RE: ExecutionMode in ExecutionConfig



Hi Zhanghao,



That seems different than what I’m referencing and one of my points of 
confusion – the documents refer to ExecutionMode as BATCH and STREAMING which 
is different than what the code refers to it as Runtime Mode e.g. 
env.setRuntimeMode(RuntimeExecutionMode.BATCH);



I’m referring to the ExecutionMode in the ExecutionConfig e.g. 
env.getConfig().setExecutionMode(ExecutionMode.BATCH)/ 
env.getConfig().setExecutionMode(ExecutionMode.PIPELINED). I’m not able to find 
documentation on this anywhere.







ah



From: zhanghao.c...@outlook.com 
mailto:zhanghao.c...@outlook.com>>
Sent: Wednesday, September 14, 2022 1:10 AM
To: Hailu, Andreas [Engineering] 
mailto:andreas.ha...@ny.email.gs.com>>; 
user@flink.apache.org
Subject: Re: ExecutionMode in ExecutionConfig



https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/
 gives a comprehensive description on it

Execution Mode (Batch/Streaming) | Apache 
Flink

Execution Mode (Batch/Streaming) # The DataStream API supports different 
runtime execution modes from which you can choose depending on the requirements 
of your use case and the characteristics of your job. There is the “classic” 
execution behavior of the DataStream API, which we call STREAMING execution 
mode. This should be used for unbounded jobs that require continuous 
incremental ...

nightlies.apache.org





Best,

Zhanghao Chen



From: Hailu, Andreas mailto:andreas.ha...@gs.com>>
Sent: Wednesday, September 14, 2022 7:13
To: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: ExecutionMode in ExecutionConfig



Hello,



Is there somewhere I can learn more about the details of the effect of 
ExecutionMode in ExecutionConfig on a job? I am trying sort out some 

Re: 任务启动异常导致Flink服务挂掉,无法启动Flink服务

2022-09-16 Thread yidan zhao
嗯。去zookeeper中删除jobgraph和running job xx吧啦的几个节点。

Summer  于2022年9月16日周五 16:51写道:

> 开了,但是全被干挂了
>  回复的原邮件 
> 发件人 yidan zhao 
> 发送日期 2022年9月16日 16:05
> 收件人 Summer 
> 抄送人 user-zh@flink.apache.org
> 
> 主题 Re: 任务启动异常导致Flink服务挂掉,无法启动Flink服务
> HA模式开启了对嘛。
>
> Summer  于2022年9月16日周五 15:48写道:
>
>> 原因是找到了,${FLINK_HOME}/lib缺少了一个任务依赖Jar包,
>> 那么如果我在不添加这个jar的情况下,由于Flink无法启动,怎么才能取消掉这个任务??
>>
>>
>>
>>  回复的原邮件 
>> 发件人 yidan zhao 
>> 发送日期 2022年9月16日 14:51
>> 收件人 Summer 
>> 抄送人 user-zh@flink.apache.org
>> 
>> 主题 Re: 任务启动异常导致Flink服务挂掉,无法启动Flink服务
>> 开启了HA是吧。
>>
>> Summer  于2022年9月16日周五 14:32写道:
>>
>> standlone部署
>>
>>
>>
>>
>>
>>
>>
>>
>>  回复的原邮件 
>> 发件人 yidan zhao 
>> 发送日期 2022年9月16日 14:20
>> 收件人 user-zh 
>> 主题 Re: 任务启动异常导致Flink服务挂掉,无法启动Flink服务
>> 什么部署模式。
>>
>> Summer  于2022年9月16日周五 13:57写道:
>>
>>
>>
>> Flink版本:1.13.3
>> 我有一个Flink Sql的任务,也生成了checkpoint,但是执行过程出现Execption,导致整个Flink JobManger无法启动。
>> 我再重启Flink的时候,这个FlinkSql任务由于一直抛异常导致Flink进程启动不起来。
>> 请问有什么办法取消这个任务。
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>


Re: Classloading issues with Flink Operator / Kubernetes Native

2022-09-16 Thread Matthias Pohl via user
Are you deploying the job in session or application mode? Could you provide
the stacktrace. I'm wondering whether that would be helpful to pin a code
location for further investigation.
So far, I couldn't come up with a definite answer about placing the jar in
the lib directory. Initially, I would have thought that it's fine
considering that all dependencies are included and the job jar itself ends
up on the user classpath. I'm curious whether Chesnay (CC'd) has an answer
to that one.

On Tue, Sep 13, 2022 at 1:40 AM Yaroslav Tkachenko 
wrote:

> Hey everyone,
>
> I’m migrating a Flink Kubernetes standalone job to the Flink operator
> (with Kubernetes native mode).
>
> I have a lot of classloading issues when trying to run with the operator
> in native mode. For example, I have a Postgres driver as a dependency (I
> can confirm the files are included in the uber jar), but I still get
> "java.sql.SQLException: No suitable driver found for jdbc:postgresql:..."
> exception.
>
> In the Kubernetes standalone setup my uber jar is placed in the
> /opt/flink/lib folder, this is what I specify as "jarURI" in the operator
> config. Is this supported? Should I only be using /opt/flink/usrlib?
>
> Thanks for any suggestions.
>


回复: 任务启动异常导致Flink服务挂掉,无法启动Flink服务

2022-09-16 Thread Summer

开了,但是全被干挂了



 回复的原邮件 
发件人 yidan zhao发送日期 2022年9月16日 16:05收件人 
Summer抄送人 
user-zh@flink.apache.org主题 Re: 
任务启动异常导致Flink服务挂掉,无法启动Flink服务
HA模式开启了对嘛。

Summer  于2022年9月16日周五 15:48写道:


原因是找到了,${FLINK_HOME}/lib缺少了一个任务依赖Jar包,
那么如果我在不添加这个jar的情况下,由于Flink无法启动,怎么才能取消掉这个任务??












 回复的原邮件 
发件人 yidan zhao发送日期 2022年9月16日 14:51收件人 
Summer抄送人 
user-zh@flink.apache.org主题 Re: 
任务启动异常导致Flink服务挂掉,无法启动Flink服务
开启了HA是吧。

Summer  于2022年9月16日周五 14:32写道:

standlone部署








  回复的原邮件 
 发件人 yidan zhao 
 发送日期 2022年9月16日 14:20
 收件人 user-zh 
 主题 Re: 任务启动异常导致Flink服务挂掉,无法启动Flink服务
 什么部署模式。

 Summer  于2022年9月16日周五 13:57写道:



 Flink版本:1.13.3
 我有一个Flink Sql的任务,也生成了checkpoint,但是执行过程出现Execption,导致整个Flink JobManger无法启动。
 我再重启Flink的时候,这个FlinkSql任务由于一直抛异常导致Flink进程启动不起来。
 请问有什么办法取消这个任务。






































Re: 任务启动异常导致Flink服务挂掉,无法启动Flink服务

2022-09-16 Thread yidan zhao
HA模式开启了对嘛。

Summer  于2022年9月16日周五 15:48写道:

> 原因是找到了,${FLINK_HOME}/lib缺少了一个任务依赖Jar包,
> 那么如果我在不添加这个jar的情况下,由于Flink无法启动,怎么才能取消掉这个任务??
>
>
>
>  回复的原邮件 
> 发件人 yidan zhao 
> 发送日期 2022年9月16日 14:51
> 收件人 Summer 
> 抄送人 user-zh@flink.apache.org
> 
> 主题 Re: 任务启动异常导致Flink服务挂掉,无法启动Flink服务
> 开启了HA是吧。
>
> Summer  于2022年9月16日周五 14:32写道:
>
> standlone部署
>
>
>
>
>
>
>
>
>  回复的原邮件 
> 发件人 yidan zhao 
> 发送日期 2022年9月16日 14:20
> 收件人 user-zh 
> 主题 Re: 任务启动异常导致Flink服务挂掉,无法启动Flink服务
> 什么部署模式。
>
> Summer  于2022年9月16日周五 13:57写道:
>
>
>
> Flink版本:1.13.3
> 我有一个Flink Sql的任务,也生成了checkpoint,但是执行过程出现Execption,导致整个Flink JobManger无法启动。
> 我再重启Flink的时候,这个FlinkSql任务由于一直抛异常导致Flink进程启动不起来。
> 请问有什么办法取消这个任务。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>


回复: 任务启动异常导致Flink服务挂掉,无法启动Flink服务

2022-09-16 Thread Summer

原因是找到了,${FLINK_HOME}/lib缺少了一个任务依赖Jar包,
那么如果我在不添加这个jar的情况下,由于Flink无法启动,怎么才能取消掉这个任务??












 回复的原邮件 
发件人 yidan zhao发送日期 2022年9月16日 14:51收件人 
Summer抄送人 
user-zh@flink.apache.org主题 Re: 
任务启动异常导致Flink服务挂掉,无法启动Flink服务
开启了HA是吧。

Summer  于2022年9月16日周五 14:32写道:

standlone部署








  回复的原邮件 
 发件人 yidan zhao 
 发送日期 2022年9月16日 14:20
 收件人 user-zh 
 主题 Re: 任务启动异常导致Flink服务挂掉,无法启动Flink服务
 什么部署模式。

 Summer  于2022年9月16日周五 13:57写道:



 Flink版本:1.13.3
 我有一个Flink Sql的任务,也生成了checkpoint,但是执行过程出现Execption,导致整个Flink JobManger无法启动。
 我再重启Flink的时候,这个FlinkSql任务由于一直抛异常导致Flink进程启动不起来。
 请问有什么办法取消这个任务。
































RE: A question about restoring state with an additional variable with kryo

2022-09-16 Thread Schwalbe Matthias
Hi Vishal,

Good news and bad news :


  *   Bad: Kryo serializer cannot be used for schema evolution, see [1]
  *   Good: not all is lost here,
 *   If you happen to have state that you cannot afford to lose, you can 
transcode it by means of the savepoint API [2],
 *   However, this takes quite some effort
  *   In general, if you ever plan to migrate/extend your schemas, choose a 
data type that supports schema migration [1],
  *   In your case, PoJo types would be the closest to your original 
implementation
  *   You can disable Kryo in configuration to avoid this situation in the 
future, by the way,
  *   Kryo serializer is quite slow compared to the other options and I believe 
it is only there as a (emergency) fallback solution: [3]

Feel free to ask for clarification 

Thias



[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/#kryo-cannot-be-used-for-schema-evolution
[2] 
https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/
[3] 
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html



From: Vishal Santoshi 
Sent: Friday, September 16, 2022 1:17 AM
To: user 
Subject: Re: A question about restoring state with an additional variable with 
kryo

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


The exception thrown is as follows. I realize that it is trying to read the 
long value. How do I signal to kryo that it is OK and that he object can have a 
default value

Caused by: java.io.EOFException: No more bytes left.
at 
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:80)
at 
com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690)
at 
com.esotericsoftware.kryo.io.Input.readLong(Input.java:685)
at 
com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.read(DefaultSerializers.java:133)
at 
com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.read(DefaultSerializers.java:123)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
at 
org.apache.flink.api.common.typeutils.CompositeSerializer.deserialize(CompositeSerializer.java:156)
at 
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:89)

On Thu, Sep 15, 2022 at 7:10 PM Vishal Santoshi 
mailto:vishal.santo...@gmail.com>> wrote:
<< How do I make sure that when reconstituting the state, kryo does not 
complain? It tries to map the previous state to the new definition of Class A 
and complains that it cannot read the value for `String b`.

>> How do I make sure that when reconstituting the state, kryo does not 
>> complain? It tries to map the previous state to the new definition of Class 
>> A and complains that it cannot read the value for `long b`.

Sorry a typo


On Thu, Sep 15, 2022 at 7:04 PM Vishal Santoshi 
mailto:vishal.santo...@gmail.com>> wrote:
I have state in rocksDB that represents say

class A {
  String a
}

I now change my class and add another variable


Class A {
  String a;
  long b = 0;
}

How do I make sure that when reconstituting the state, kryo does not complain? 
It tries to map the previous state to the new definition of Class A and 
complains that it cannot read the value for `String b`.

Unfortunately the state is not using POJO serializer.

Thanks and Regards.

Vishal




Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


回复: 任务启动异常导致Flink服务挂掉,无法启动Flink服务

2022-09-16 Thread 小昌同学
截图一下日志报错的exception看看


| |
小昌
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | yidan zhao |
| 发送日期 | 2022年9月16日 14:20 |
| 收件人 | user-zh |
| 主题 | Re: 任务启动异常导致Flink服务挂掉,无法启动Flink服务 |
什么部署模式。

Summer  于2022年9月16日周五 13:57写道:


Flink版本:1.13.3
我有一个Flink Sql的任务,也生成了checkpoint,但是执行过程出现Execption,导致整个Flink JobManger无法启动。
我再重启Flink的时候,这个FlinkSql任务由于一直抛异常导致Flink进程启动不起来。
请问有什么办法取消这个任务。
























Re: 任务启动异常导致Flink服务挂掉,无法启动Flink服务

2022-09-16 Thread yidan zhao
开启了HA是吧。

Summer  于2022年9月16日周五 14:32写道:

> standlone部署
>
>
>
>
>
>
>
>
>  回复的原邮件 
> 发件人 yidan zhao 
> 发送日期 2022年9月16日 14:20
> 收件人 user-zh 
> 主题 Re: 任务启动异常导致Flink服务挂掉,无法启动Flink服务
> 什么部署模式。
>
> Summer  于2022年9月16日周五 13:57写道:
>
>
>
> Flink版本:1.13.3
> 我有一个Flink Sql的任务,也生成了checkpoint,但是执行过程出现Execption,导致整个Flink JobManger无法启动。
> 我再重启Flink的时候,这个FlinkSql任务由于一直抛异常导致Flink进程启动不起来。
> 请问有什么办法取消这个任务。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>


回复: 任务启动异常导致Flink服务挂掉,无法启动Flink服务

2022-09-16 Thread Summer

standlone部署





















 回复的原邮件 
发件人 yidan zhao发送日期 2022年9月16日 14:20收件人 
user-zh主题 Re: 任务启动异常导致Flink服务挂掉,无法启动Flink服务
什么部署模式。

Summer  于2022年9月16日周五 13:57写道:


Flink版本:1.13.3
 我有一个Flink Sql的任务,也生成了checkpoint,但是执行过程出现Execption,导致整个Flink JobManger无法启动。
 我再重启Flink的时候,这个FlinkSql任务由于一直抛异常导致Flink进程启动不起来。
 请问有什么办法取消这个任务。






























Re: 任务启动异常导致Flink服务挂掉,无法启动Flink服务

2022-09-16 Thread yidan zhao
什么部署模式。

Summer  于2022年9月16日周五 13:57写道:
>
>
> Flink版本:1.13.3
> 我有一个Flink Sql的任务,也生成了checkpoint,但是执行过程出现Execption,导致整个Flink JobManger无法启动。
> 我再重启Flink的时候,这个FlinkSql任务由于一直抛异常导致Flink进程启动不起来。
> 请问有什么办法取消这个任务。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>


如何监控flink sql作业端到端延迟?

2022-09-16 Thread casel.chen
线上运行了多个flink sql作业,现在想监控端到端延迟。我配置了
metrics.latency.interval=3
metrics.latency.granularity=operator
metrics.latency.history-size=128
参数,延迟指标已经发到了prometheus,看到该指标有50、75、95、98,99,999分位线,另外还有operator_id和operator_id_subtask_index,细到了算子子task级别。
1. 想知道怎样根据这些暴露指标统计出该flink 
sql作业的端到端延迟分位线?是需要把所有同一个job的同一个算子同一分位值取平均再把不同算子得到的值相加么?
2. 另外,我们大部分sql作业都是从kafka接入的,消息格式是canal json,想进一步统计canal 
json中的binlog发生时间与kafka消息metadata里的timestamp时间差 和 
kafka消息metadata里的timestamp与flink开始处理该消息的时间差,请问有办法不修改flink源码获取吗?


|
flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency{app="tb-bipusr-outcome-bank-record-binlog2mongo",
 component="taskmanager", host="172_19_193_104", 
instance="172.19.193.104:9249", job="kubernetes-pods", 
job_id="2ea0a87e69f0d485859a9108d595dd8d", 
job_name="tb_bipusr_outcome_bank_record_binlog2mongo", 
kubernetes_namespace="bfj", 
kubernetes_pod_name="tb-bipusr-outcome-bank-record-binlog2mongo-taskmanager-1-8",
 operator_id="570f707193e0fe32f4d86d067aba243b", operator_subtask_index="2", 
quantile="0.95", source_id="cbc357ccb763df2852fee8c4fc7d55f2", 
tm_id="tb_bipusr_outcome_bank_record_binlog2mongo_taskmanager_1_8", 
type="flink-native-kubernetes"}
| 11.943 |
|
flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency{app="tb-bipusr-outcome-bank-record-binlog2mongo",
 component="taskmanager", host="172_19_193_104", 
instance="172.19.193.104:9249", job="kubernetes-pods", 
job_id="2ea0a87e69f0d485859a9108d595dd8d", 
job_name="tb_bipusr_outcome_bank_record_binlog2mongo", 
kubernetes_namespace="bfj", 
kubernetes_pod_name="tb-bipusr-outcome-bank-record-binlog2mongo-taskmanager-1-8",
 operator_id="570f707193e0fe32f4d86d067aba243b", operator_subtask_index="2", 
quantile="0.98", source_id="cbc357ccb763df2852fee8c4fc7d55f2", 
tm_id="tb_bipusr_outcome_bank_record_binlog2mongo_taskmanager_1_8", 
type="flink-native-kubernetes"}
| 21 |
|
flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency{app="tb-bipusr-outcome-bank-record-binlog2mongo",
 component="taskmanager", host="172_19_193_104", 
instance="172.19.193.104:9249", job="kubernetes-pods", 
job_id="2ea0a87e69f0d485859a9108d595dd8d", 
job_name="tb_bipusr_outcome_bank_record_binlog2mongo", 
kubernetes_namespace="bfj", 
kubernetes_pod_name="tb-bipusr-outcome-bank-record-binlog2mongo-taskmanager-1-8",
 operator_id="570f707193e0fe32f4d86d067aba243b", operator_subtask_index="2", 
quantile="0.99", source_id="cbc357ccb763df2852fee8c4fc7d55f2", 
tm_id="tb_bipusr_outcome_bank_record_binlog2mongo_taskmanager_1_8", 
type="flink-native-kubernetes"}
|