Re: 某作业计算算子处于busy状态

2022-09-20 Thread 杨扬
flink内存泄漏有什么排查的指标或者工具吗?
比如大致定位泄漏的位置之类的。





> 在 2022年9月19日,下午5:41,yidan zhao  写道:
> 
> 那你代码检查下有没有内存泄露呢。
> 
> 杨扬  于2022年9月19日周一 11:21写道:
>> 
>> 还有一个现象,观察到 
>> taskHeap内存占用在逐步升高,作业刚启动的时候占用在10%左右,一周后增加至25%左右,两周后增加至50%左右,上述指的是GC后观察到的内存占用值。两周后计算算子几乎一直100%busy状态,端到端延迟已经达到了10s左右,作业已经不可用需要重启了。
>> 
>> 
>> 
>> 
>>> 在 2022年9月15日,下午8:58,yidan zhao  写道:
>>> 
>>> 本身低延迟一定程度上就是靠“资源低利用率”实现的。资源高利用率情况,就是尽可能满负荷够用就行的意思。
>>> 
>>> yidan zhao  于2022年9月15日周四 20:57写道:
 
 资源足够,busy 50%+,延迟如果也可接受的话,其实就不算问题。2s延迟不算高。
 
 杨扬  于2022年9月15日周四 20:02写道:
> 
> 目前并发度已经设定为25,每个slot内存为4G,已经使用100G内存,峰值流量1TPS左右,资源是足够的吧?
> 
> 
> 
> 
>> 在 2022年9月15日,下午7:27,yidan zhao  写道:
>> 
>> busy那就提升并发度看看效果?
>> 
>> 杨扬 mailto:yangya...@cupdata.com>> 于2022年9月15日周四 
>> 14:51写道:
>> 各位好!
>> 目前有一flink作业,大致分为3个阶段:
>> 读取kafka中数据(1个source,并行度3)-> 进行数据筛选和条件判断(没有窗口操作,并行度25)-> 
>> 结果写入kafka(20多个sink,每个sink并行度3)。可参考附件图片。
>> 
>> 目前存在的问题是:作业在运行一段时间后,中间25并行度的一系列计算算子会变为busy状态(会达到50%以上),端到端的信息延迟增加,偶尔延迟会达到2秒以上。此时作业日志并没有报错、异常、告警等信息。
>> 
>> 
>> 上述问题因为没有日志异常告警信息,本人有些无从下手解决。猜测是否因为sink数据量太多且每个sink并行度都是3会导致中间25个并行度的一系列算子和sink之间的交互产生大量shuffle引起?望各位大佬帮忙分析一下这个问题
>> 
>> 
>> 
>> ===
>> 此邮件已由 Deep Discovery Email Inspector 进行了分析。
> 
>>> 
>>> ===
>>> 此邮件已由 Deep Discovery Email Inspector 进行了分析。
>> 
> 
> === 
> 此邮件已由 Deep Discovery Email Inspector 进行了分析。



Re: Is it possible to connect multiple streams

2022-09-20 Thread Shammon FY
Hi

Thanks @yaroslav .
And @deepakgd79 here is the document for datastream:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#datastream-transformations
You can find examples for union, connect, join and other transformations


On Wed, Sep 21, 2022 at 11:55 AM Yaroslav Tkachenko 
wrote:

> Hi Deepak,
>
> You can use a union operator. I actually gave a talk on creating an
> advanced join using the union operator and multiple streams:
> -
> https://www.slideshare.net/sap1ens/storing-state-forever-why-it-can-be-good-for-your-analytics
> - https://www.youtube.com/watch?v=tiGxEGPyqCg
>
> I hope this helps.
>
> On Tue, Sep 20, 2022 at 5:22 PM Deepak kumar Gunjetti <
> deepakg...@gmail.com> wrote:
>
>> Hi,
>> My name is Deepak, I am a new user to apache flink. It is one of the best
>> open source i have used. I want to thank the community for developing such
>> a wonderful product.
>>
>> I have one query.
>> Is it possible to connect multiple streams, like
>> stream1.connect(stream2).connect(stream3).flatmap(new
>> RickCoFlatMapFunctionHandler())
>>
>> Can someone please let me know how I can achieve this.
>> Thanks,
>> Deepak
>>
>


Re: Is it possible to connect multiple streams

2022-09-20 Thread Yaroslav Tkachenko
Hi Deepak,

You can use a union operator. I actually gave a talk on creating an
advanced join using the union operator and multiple streams:
-
https://www.slideshare.net/sap1ens/storing-state-forever-why-it-can-be-good-for-your-analytics
- https://www.youtube.com/watch?v=tiGxEGPyqCg

I hope this helps.

On Tue, Sep 20, 2022 at 5:22 PM Deepak kumar Gunjetti 
wrote:

> Hi,
> My name is Deepak, I am a new user to apache flink. It is one of the best
> open source i have used. I want to thank the community for developing such
> a wonderful product.
>
> I have one query.
> Is it possible to connect multiple streams, like
> stream1.connect(stream2).connect(stream3).flatmap(new
> RickCoFlatMapFunctionHandler())
>
> Can someone please let me know how I can achieve this.
> Thanks,
> Deepak
>


Re: Classloading issues with Flink Operator / Kubernetes Native

2022-09-20 Thread Yaroslav Tkachenko
Interesting, do you see the /opt/flink/usrlib folder created as well? Also,
what Flink version do you use?

Thanks.

On Tue, Sep 20, 2022 at 4:04 PM Javier Vegas  wrote:

>
> jarURI: local:///opt/flink/lib/MYJARNAME.jar
>
> El mar, 20 sept 2022 a las 0:25, Yaroslav Tkachenko ()
> escribió:
>
>> Hi Javier,
>>
>> What do you specify as a jarURI?
>>
>> On Mon, Sep 19, 2022 at 3:56 PM Javier Vegas  wrote:
>>
>>> I am doing the same thing (migrating from standalone to operator in
>>> native mode) and also have my jar in /opt/flink/lib but for me it works
>>> fine, no class loading errors on app startup.
>>>
>>> El vie, 16 sept 2022 a las 9:28, Yaroslav Tkachenko (<
>>> yaros...@goldsky.com>) escribió:
>>>
 Application mode. I've done a bit more research and created
 https://issues.apache.org/jira/browse/FLINK-29288, planning to work on
 a PR today.

 TLDR: currently Flink operator always creates /opt/flink/usrlib folder
 and forces you to specify the jarURI parameter, which is passed as
 pipeline.jars / pipeline.classpaths configuration options. This leads to
 the jar being loaded twice by different classloaders (system and user
 ones).

 On Fri, Sep 16, 2022 at 2:30 AM Matthias Pohl 
 wrote:

> Are you deploying the job in session or application mode? Could you
> provide the stacktrace. I'm wondering whether that would be helpful to pin
> a code location for further investigation.
> So far, I couldn't come up with a definite answer about placing the
> jar in the lib directory. Initially, I would have thought that it's fine
> considering that all dependencies are included and the job jar itself ends
> up on the user classpath. I'm curious whether Chesnay (CC'd) has an answer
> to that one.
>
> On Tue, Sep 13, 2022 at 1:40 AM Yaroslav Tkachenko <
> yaros...@goldsky.com> wrote:
>
>> Hey everyone,
>>
>> I’m migrating a Flink Kubernetes standalone job to the Flink operator
>> (with Kubernetes native mode).
>>
>> I have a lot of classloading issues when trying to run with
>> the operator in native mode. For example, I have a Postgres driver as a
>> dependency (I can confirm the files are included in the uber jar), but I
>> still get "java.sql.SQLException: No suitable driver found for
>> jdbc:postgresql:..." exception.
>>
>> In the Kubernetes standalone setup my uber jar is placed in the
>> /opt/flink/lib folder, this is what I specify as "jarURI" in the operator
>> config. Is this supported? Should I only be using /opt/flink/usrlib?
>>
>> Thanks for any suggestions.
>>
>


Re: flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游, 设计的考虑是啥?

2022-09-20 Thread Shammon FY
Hi
我个人觉得简单的说flink数据传输是pull模型可能会有歧义,一般来讲大家理解的两个模型的执行流程如下
1. push模型
上下游计算任务将初始化网络连接后,上游计算任务直接通过连接不断向下游"push"数据
2. pull模型
上下游计算任务初始化网络连接后,下游计算任务根据自己的计算进度,轮询向上游发送请求“pull”数据,执行下一轮计算

在flink里,上下游交互流程主要分为几个步骤
1. 上游计算任务所在的TM创建一个Netty Server
2. 下游计算任务启动时通过Netty Client跟上游创建连接
3. 下游计算任务向上游发送一个partition
request请求,上游根据request请求创建数据reader,通过reader不断读取数据并通过连接发送数据
4. 上下游计算任务分别有自己的内存池子,用于流控,大概流程如下
a) 下游计算任务根据数据消费内存池子情况,不定期向上游计算任务更新授信(credit)
b) 上游计算任务根据接收到的credit消息,更新本地管理的授信大小
c) 上游计算任务根据本地授信大小不断向下游计算任务发送数据

通过这种方式,在资源足够的情况下,可以保证数据传输是完全流式的,这跟传统的pull模型不同,可能更像是支持授信流控机制的push模型

On Wed, Sep 21, 2022 at 9:43 AM yh z  wrote:

> 你好。 Flink 采用的是 pull 模型。pull 模型的优点在于:1.
> 其具有更好的扩展性(下游的消费者可以根据需求增加,只需要获取到上游的消费位点); 2. 下游的消费者可以根据需求来调整消费速率;
> 3.网络传输,flink 以前也尝试使用过push模型,且为了节约开销,进程间是复用 TCP连接,一个 task 线程的性能瓶颈将导致整条链路的所有
> task 线程不能接收数据,影响整体的数据消费速率。 push模型的优点:消耗较小,不需要设计机制来一直轮训观察上游节点的数据情况。
>
> Xuyang  于2022年9月9日周五 20:35写道:
>
> > Hi,主要是pull模型:下游主动拉取上游的数据。可以在下游的消费能力达到极限时,通过反压机制,让上游减少生产的数据。
> >
> >
> >
> >
> >
> >
> >
> > --
> >
> > Best!
> > Xuyang
> >
> >
> >
> >
> >
> > 在 2022-09-09 19:04:27,"郑 致远"  写道:
> > >各位大佬好
> > >请教下,
> > >flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游,  这么设计的考虑是啥呢?
> >
>


Flink SQL suport tfrecord format

2022-09-20 Thread 张颖
hi,when I write a sql like this:




String sqlCreate = "CREATE TABLE fs_table (\n" +
"  `examplestr` bytes\n" +
")  WITH (\n" +
"  'connector'='filesystem',\n" +
"  'format'='raw',\n" +
"  'path'='/tmp/zhangying480'\n" +
")";
 I get an error like this,which means my tfrecord format is wrong,but it is 
correct:
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: java.io.IOException: Length header crc32 checking failed: 
-1837507072 != -252953760, length = 323850
at 
com.jd.realtime.formal.operator.source.inner.TFRecordReader.read(TFRecordReader.java:44)
at 
com.jd.realtime.formal.operator.source.inner.TFRecordInputFormat.nextRecord(TFRecordInputFormat.java:128)
at 
com.jd.realtime.formal.operator.source.inner.TFRecordInputFormat.nextRecord(TFRecordInputFormat.java:17)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:202)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)


I read the source code then have a result:
public class SerializationSchemaAdapter implements Encoder {

private static final long serialVersionUID = 1L;

static final byte LINE_DELIMITER = "\n".getBytes(StandardCharsets.UTF_8)[0];


@Override
public void encode(RowData element, OutputStream stream) throws IOException {
checkOpened();
stream.write(serializationSchema.serialize(element));
stream.write(LINE_DELIMITER);


write line_delimiter will cause the incorrect result,so I realize a 
TFRecordSerializationSchemaAdapter
and I override the FileSystemTableSink,then I fix the problem


  
So far,I only suport filesystemsink with tfrecord format,do you think It is 
necessary to support other system?

















Flink SQL suport tfrecord format

2022-09-20 Thread 张颖
hi,when I write a sql like this:




String sqlCreate = "CREATE TABLE fs_table (\n" +
"  `examplestr` bytes\n" +
")  WITH (\n" +
"  'connector'='filesystem',\n" +
"  'format'='raw',\n" +
"  'path'='/tmp/zhangying480'\n" +
")";
 I get an error like this,which means my tfrecord format is wrong,but it is 
correct:
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: java.io.IOException: Length header crc32 checking failed: 
-1837507072 != -252953760, length = 323850
at 
com.jd.realtime.formal.operator.source.inner.TFRecordReader.read(TFRecordReader.java:44)
at 
com.jd.realtime.formal.operator.source.inner.TFRecordInputFormat.nextRecord(TFRecordInputFormat.java:128)
at 
com.jd.realtime.formal.operator.source.inner.TFRecordInputFormat.nextRecord(TFRecordInputFormat.java:17)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:202)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)


I read the source code then have a result:
public class SerializationSchemaAdapter implements Encoder {

private static final long serialVersionUID = 1L;

static final byte LINE_DELIMITER = "\n".getBytes(StandardCharsets.UTF_8)[0];


@Override
public void encode(RowData element, OutputStream stream) throws IOException {
checkOpened();
stream.write(serializationSchema.serialize(element));
stream.write(LINE_DELIMITER);


write line_delimiter will cause the incorrect result,so I realize a 
TFRecordSerializationSchemaAdapter
and I override the FileSystemTableSink,then I fix the problem


  
So far,I only suport filesystemsink with tfrecord format,do you think It is 
necessary to support other system?

















Re: flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游, 设计的考虑是啥?

2022-09-20 Thread yh z
你好。 Flink 采用的是 pull 模型。pull 模型的优点在于:1.
其具有更好的扩展性(下游的消费者可以根据需求增加,只需要获取到上游的消费位点); 2. 下游的消费者可以根据需求来调整消费速率;
3.网络传输,flink 以前也尝试使用过push模型,且为了节约开销,进程间是复用 TCP连接,一个 task 线程的性能瓶颈将导致整条链路的所有
task 线程不能接收数据,影响整体的数据消费速率。 push模型的优点:消耗较小,不需要设计机制来一直轮训观察上游节点的数据情况。

Xuyang  于2022年9月9日周五 20:35写道:

> Hi,主要是pull模型:下游主动拉取上游的数据。可以在下游的消费能力达到极限时,通过反压机制,让上游减少生产的数据。
>
>
>
>
>
>
>
> --
>
> Best!
> Xuyang
>
>
>
>
>
> 在 2022-09-09 19:04:27,"郑 致远"  写道:
> >各位大佬好
> >请教下,
> >flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游,  这么设计的考虑是啥呢?
>


Re: Unable to run pyflink job - NetUtils getAvailablePort Error

2022-09-20 Thread Dian Fu
Hi Ramana,

The method appearing in the exception message was updated in Flink 1.15,
see [1] for more details. So I believe there must be jars of version 1.15
in your environment and could you double check that?

Regards,
Dian

[1]
https://github.com/apache/flink/commit/dd1fddb13b2d08ade580e5b3ec6b8e910974308d

On Wed, Sep 7, 2022 at 1:37 PM Ramana  wrote:

> Hi Xingbo
>
> I have double checked on this, both the flink and pyflink versions that i
> have are 1.14.4 on Jobmanager and task manager.
> However, I still get this error.
>
> Thanks
> Ramana
>
>
>
> On Tue, Sep 6, 2022, 14:23 Xingbo Huang  wrote:
>
>> Hi Raman,
>>
>> This problem comes from the inconsistency between your flink version and
>> pyflink version
>>
>> Best,
>> Xingbo
>>
>> Ramana  于2022年9月6日周二 15:08写道:
>>
>>> Hello there,
>>>
>>> I have a pyflink setup of 1 : JobManager - 1 : Task Manager.
>>>
>>> Trying to run a pyflink job and no matter what i do, i get the following
>>> error message.
>>>
>>> -
>>> The program finished with the following exception:
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error: java.lang.NoSuchMethodError:
>>> org.apache.flink.util.NetUtils.getAvailablePort()Lorg/apacheflink/util/NetUtils$Port;
>>> 
>>> Caused by: java.lang.NoSuchMethodError:
>>> org.apache.flink.util.NetUtils.getAvailablePort()Lorg/apacheflink/util/NetUtils$Port;
>>> at
>>> org.apache.flink.client.python.PythonEnvUtils.lambda$startGatewayServer$3(PythonUtils.java:365)
>>> at java.lang.Thread.run(Thread.java:750)
>>> 
>>>
>>> Tried executing with some out of the box examples, yet I get the same
>>> error above.
>>>
>>> Could anybody shed some light on why the error is occurring, and how I
>>> can have it resolved?
>>>
>>> Appreciate any help here.
>>>
>>> Thanks.
>>> Ramana
>>> --
>>> DREAM IT, DO IT
>>>
>>


Re: flink hybrid source问题

2022-09-20 Thread yh z
你好,hybrid source 现在需要基于 FLIP-27 source 来实现(如:FileSource, KafkaSource),对于非
FLIP-27 source 需要做一些修改后才可以使用。如果想参与 hybird source 的扩展,可以在 slack
中加入flink社群,并发起讨论。 关于 source 相关的文档,可以查看官网和 flip 设计和讨论页面(
https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source
)(
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
)。希望能帮到你。

casel.chen  于2022年9月19日周一 19:42写道:

> 我有一个flink实时计算场景是需要先从MaxCompute读取一张表的存量数据,再从相应的kafka topic读取增量数据,一并进行计算处理。
> 看了一下需要用到hybrid source,目前最新flink社区版提供了Kafka/Hive/File
> Source,其他数据源的source是需要自己开发吗?社区有没有一个贡献source的地方?
> 有没有介绍如何自定义基于新版source架构的source文章或博客呢?谢谢!


Is it possible to connect multiple streams

2022-09-20 Thread Deepak kumar Gunjetti
Hi,
My name is Deepak, I am a new user to apache flink. It is one of the best
open source i have used. I want to thank the community for developing such
a wonderful product.

I have one query.
Is it possible to connect multiple streams, like
stream1.connect(stream2).connect(stream3).flatmap(new
RickCoFlatMapFunctionHandler())

Can someone please let me know how I can achieve this.
Thanks,
Deepak


Re: Classloading issues with Flink Operator / Kubernetes Native

2022-09-20 Thread Javier Vegas
jarURI: local:///opt/flink/lib/MYJARNAME.jar

El mar, 20 sept 2022 a las 0:25, Yaroslav Tkachenko ()
escribió:

> Hi Javier,
>
> What do you specify as a jarURI?
>
> On Mon, Sep 19, 2022 at 3:56 PM Javier Vegas  wrote:
>
>> I am doing the same thing (migrating from standalone to operator in
>> native mode) and also have my jar in /opt/flink/lib but for me it works
>> fine, no class loading errors on app startup.
>>
>> El vie, 16 sept 2022 a las 9:28, Yaroslav Tkachenko (<
>> yaros...@goldsky.com>) escribió:
>>
>>> Application mode. I've done a bit more research and created
>>> https://issues.apache.org/jira/browse/FLINK-29288, planning to work on
>>> a PR today.
>>>
>>> TLDR: currently Flink operator always creates /opt/flink/usrlib folder
>>> and forces you to specify the jarURI parameter, which is passed as
>>> pipeline.jars / pipeline.classpaths configuration options. This leads to
>>> the jar being loaded twice by different classloaders (system and user
>>> ones).
>>>
>>> On Fri, Sep 16, 2022 at 2:30 AM Matthias Pohl 
>>> wrote:
>>>
 Are you deploying the job in session or application mode? Could you
 provide the stacktrace. I'm wondering whether that would be helpful to pin
 a code location for further investigation.
 So far, I couldn't come up with a definite answer about placing the jar
 in the lib directory. Initially, I would have thought that it's fine
 considering that all dependencies are included and the job jar itself ends
 up on the user classpath. I'm curious whether Chesnay (CC'd) has an answer
 to that one.

 On Tue, Sep 13, 2022 at 1:40 AM Yaroslav Tkachenko <
 yaros...@goldsky.com> wrote:

> Hey everyone,
>
> I’m migrating a Flink Kubernetes standalone job to the Flink operator
> (with Kubernetes native mode).
>
> I have a lot of classloading issues when trying to run with
> the operator in native mode. For example, I have a Postgres driver as a
> dependency (I can confirm the files are included in the uber jar), but I
> still get "java.sql.SQLException: No suitable driver found for
> jdbc:postgresql:..." exception.
>
> In the Kubernetes standalone setup my uber jar is placed in the
> /opt/flink/lib folder, this is what I specify as "jarURI" in the operator
> config. Is this supported? Should I only be using /opt/flink/usrlib?
>
> Thanks for any suggestions.
>



Re: A question about restoring state with an additional variable with kryo

2022-09-20 Thread Vishal Santoshi
Thanks, I'll check it out.

On Sun, Sep 18, 2022 at 7:33 PM David Anderson  wrote:

> Vishal,
>
> If you decide you can't live with dropping that state, [1] is a complete
> example showing how to migrate from Kryo by using the state processor API.
>
> David
>
> [1]
> https://www.docs.immerok.cloud/docs/cookbook/migrating-state-away-from-kryo/
>
>
> On Fri, Sep 16, 2022 at 8:32 AM Vishal Santoshi 
> wrote:
>
>> Thank you for the clarification. I thought so to,
>>
>> Unfortunately my state are generics based and those are definitely not
>> treated as a POJO , though it has all the constructs ( no arg constructor,
>> getters/setters etc ). I will likely take an at least once hit by
>>
>> Changing the uid of that specific Operator, and restart with Allow
>> non-restored state ... This will ignore state that cannot be restored (
>> for the previous uid ) , construct state for the new uid  and not affect
>> other operators ( including the kafka consumer operators ). I can live with
>> it, I think.
>>
>> On Fri, Sep 16, 2022 at 2:55 AM Schwalbe Matthias <
>> matthias.schwa...@viseca.ch> wrote:
>>
>>> Hi Vishal,
>>>
>>>
>>>
>>> Good news and bad news :
>>>
>>>
>>>
>>>- Bad: Kryo serializer cannot be used for schema evolution, see [1]
>>>- Good: not all is lost here,
>>>   - If you happen to have state that you cannot afford to lose, you
>>>   can transcode it by means of the savepoint API [2],
>>>   - However, this takes quite some effort
>>>- In general, if you ever plan to migrate/extend your schemas,
>>>choose a data type that supports schema migration [1],
>>>- In your case, PoJo types would be the closest to your original
>>>implementation
>>>- You can disable Kryo in configuration to avoid this situation in
>>>the future, by the way,
>>>- Kryo serializer is quite slow compared to the other options and I
>>>believe it is only there as a (emergency) fallback solution: [3]
>>>
>>>
>>>
>>> Feel free to ask for clarification 
>>>
>>>
>>>
>>> Thias
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/#kryo-cannot-be-used-for-schema-evolution
>>>
>>> [2]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/
>>>
>>> [3]
>>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *From:* Vishal Santoshi 
>>> *Sent:* Friday, September 16, 2022 1:17 AM
>>> *To:* user 
>>> *Subject:* Re: A question about restoring state with an additional
>>> variable with kryo
>>>
>>>
>>>
>>> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>>>
>>>
>>>
>>> The exception thrown is as follows. I realize that it is trying to read
>>> the long value. How do I signal to kryo that it is OK and that he object
>>> can have a default value
>>>
>>>
>>>
>>> Caused by: java.io.EOFException: No more bytes left.
>>>
>>> at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
>>> .require(NoFetchingInput.java:80)
>>>
>>> at com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690)
>>>
>>> at com.esotericsoftware.kryo.io.Input.readLong(Input.java:685)
>>>
>>> at com.esotericsoftware.kryo.serializers.
>>> DefaultSerializers$LongSerializer.read(DefaultSerializers.java:133)
>>>
>>> at com.esotericsoftware.kryo.serializers.
>>> DefaultSerializers$LongSerializer.read(DefaultSerializers.java:123)
>>>
>>> at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
>>>
>>> at com.esotericsoftware.kryo.serializers.ObjectField.read(
>>> ObjectField.java:113)
>>>
>>> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
>>> FieldSerializer.java:528)
>>>
>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>
>>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
>>> .deserialize(KryoSerializer.java:354)
>>>
>>> at org.apache.flink.api.common.typeutils.CompositeSerializer
>>> .deserialize(CompositeSerializer.java:156)
>>>
>>> at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(
>>> RocksDBValueState.java:89)
>>>
>>>
>>>
>>> On Thu, Sep 15, 2022 at 7:10 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
>>> << How do I make sure that when reconstituting the state, kryo does not
>>> complain? It tries to map the previous state to the new definition of Class
>>> A and complains that it cannot read the value for `String b`.
>>>
>>>
>>>
>>> >> How do I make sure that when reconstituting the state, kryo does not
>>> complain? It tries to map the previous state to the new definition of Class
>>> A and complains that it cannot read the value for `long b`.
>>>
>>>
>>>
>>> Sorry a typo
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Sep 15, 2022 at 7:04 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
>>> I have state in rocksDB that represents say
>>>
>>>
>>>
>>> class A {
>>>
>>>   

Re: JobManager restarts on job failure

2022-09-20 Thread Gyula Fóra
I see I think we have seen this issue with others before, in Flink 1.15 it
is solved by the newly introduced JobResultStore. The operator also
configures that automatically for 1.15 to avoid this.

Gyula

On Tue, Sep 20, 2022 at 3:27 PM Evgeniy Lyutikov 
wrote:

> Thanks for the answer.
> I think this is not about the operator issue, kubernetes deployment just
> restarts the fallen pod, restarted jobmanager without HA metadata starts
> the job itself from an empty state.
>
> I'm looking for a way to prevent it from exiting in case of an job error
> (we use application mode cluster).
>
>
>
> --
> *От:* Gyula Fóra 
> *Отправлено:* 20 сентября 2022 г. 19:49:37
> *Кому:* Evgeniy Lyutikov
> *Копия:* user@flink.apache.org
> *Тема:* Re: JobManager restarts on job failure
>
> The best thing for you to do would be to upgrade to Flink 1.15 and the
> latest operator version.
> In Flink 1.15 we have the option to interact with the Flink jobmanager
> even after the job FAILED and the operator leverages this for a much more
> robust behaviour.
>
> In any case the operator should not ever start the job from an empty state
> (even if it FAILED), if you think that's happening could you please open a
> JIRA ticket with the accompanying JM and Operator logs?
>
> Thanks
> Gyula
>
> On Tue, Sep 20, 2022 at 1:00 PM Evgeniy Lyutikov 
> wrote:
>
>> Hi,
>> We using flink 1.14.4 with flink kubernetes operator.
>>
>> Sometimes when updating a job, it fails on startup and flink removes all
>> HA metadata and exits the jobmanager.
>>
>>
>> 2022-09-14 14:54:44,534 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring
>> job  from Checkpoint 30829 @ 1663167158684
>> for  located at
>> s3p://flink-checkpoints/k8s-checkpoint-job-name//chk-30829.
>> 2022-09-14 14:54:44,638 INFO
>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job
>>  reached terminal state FAILED.
>> org.apache.flink.runtime.client.JobInitializationException: Could not
>> start the JobMaster.
>> Caused by: java.util.concurrent.CompletionException:
>> java.lang.IllegalStateException: There is no operator for the state
>> 4e1d9dde287c33a35e7970cbe64a40fe
>> 2022-09-14 14:54:44,930 ERROR
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal
>> error occurred in the cluster entrypoint.
>> 2022-09-14 14:54:45,020 INFO
>> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
>> Clean up the high availability data for job
>> .
>> 2022-09-14 14:54:45,020 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting
>> KubernetesApplicationClusterEntrypoint down with application status
>> UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
>> 2022-09-14 14:54:45,026 INFO
>> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting
>> down rest endpoint.
>> 2022-09-14 14:54:46,122 INFO
>> akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Shutting
>> down remote daemon.
>> 2022-09-14 14:54:46,321 INFO
>> akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Remoting
>> shut down.
>>
>> Kubernetes restarts the pod jobmanager and the new instance, not finding
>> the HA metadata, starts the job from an empty state.
>> Is there some option to prevent jobmanager from exiting after an job FAILED
>> state?
>>
>>
>> * -- *“This message contains confidential
>> information/commercial secret. If you are not the intended addressee of
>> this message you may not copy, save, print or forward it to any third party
>> and you are kindly requested to destroy this message and notify the sender
>> thereof by email.
>> Данное сообщение содержит конфиденциальную информацию/информацию,
>> являющуюся коммерческой тайной. Если Вы не являетесь надлежащим адресатом
>> данного сообщения, Вы не вправе копировать, сохранять, печатать или
>> пересылать его каким либо иным лицам. Просьба уничтожить данное сообщение и
>> уведомить об этом отправителя электронным письмом.”
>>
>


Re: JobManager restarts on job failure

2022-09-20 Thread Evgeniy Lyutikov
Thanks for the answer.
I think this is not about the operator issue, kubernetes deployment just 
restarts the fallen pod, restarted jobmanager without HA metadata starts the 
job itself from an empty state.

I'm looking for a way to prevent it from exiting in case of an job error (we 
use application mode cluster).




От: Gyula Fóra 
Отправлено: 20 сентября 2022 г. 19:49:37
Кому: Evgeniy Lyutikov
Копия: user@flink.apache.org
Тема: Re: JobManager restarts on job failure

The best thing for you to do would be to upgrade to Flink 1.15 and the latest 
operator version.
In Flink 1.15 we have the option to interact with the Flink jobmanager even 
after the job FAILED and the operator leverages this for a much more robust 
behaviour.

In any case the operator should not ever start the job from an empty state 
(even if it FAILED), if you think that's happening could you please open a JIRA 
ticket with the accompanying JM and Operator logs?

Thanks
Gyula

On Tue, Sep 20, 2022 at 1:00 PM Evgeniy Lyutikov 
mailto:eblyuti...@avito.ru>> wrote:

Hi,
We using flink 1.14.4 with flink kubernetes operator.

Sometimes when updating a job, it fails on startup and flink removes all HA 
metadata and exits the jobmanager.


2022-09-14 14:54:44,534 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring job 
 from Checkpoint 30829 @ 1663167158684 for 
 located at 
s3p://flink-checkpoints/k8s-checkpoint-job-name//chk-30829.
2022-09-14 14:54:44,638 INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 
 reached terminal state FAILED.
org.apache.flink.runtime.client.JobInitializationException: Could not start the 
JobMaster.
Caused by: java.util.concurrent.CompletionException: 
java.lang.IllegalStateException: There is no operator for the state 
4e1d9dde287c33a35e7970cbe64a40fe
2022-09-14 14:54:44,930 ERROR 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal error 
occurred in the cluster entrypoint.
2022-09-14 14:54:45,020 INFO  
org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - Clean up 
the high availability data for job .
2022-09-14 14:54:45,020 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting 
KubernetesApplicationClusterEntrypoint down with application status UNKNOWN. 
Diagnostics Cluster entrypoint has been closed externally..
2022-09-14 14:54:45,026 INFO  
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting 
down rest endpoint.
2022-09-14 14:54:46,122 INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Shutting down 
remote daemon.
2022-09-14 14:54:46,321 INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Remoting shut 
down.


Kubernetes restarts the pod jobmanager and the new instance, not finding the HA 
metadata, starts the job from an empty state.
Is there some option to prevent jobmanager from exiting after an job FAILED 
state?



“This message contains confidential information/commercial secret. If you are 
not the intended addressee of this message you may not copy, save, print or 
forward it to any third party and you are kindly requested to destroy this 
message and notify the sender thereof by email.
Данное сообщение содержит конфиденциальную информацию/информацию, являющуюся 
коммерческой тайной. Если Вы не являетесь надлежащим адресатом данного 
сообщения, Вы не вправе копировать, сохранять, печатать или пересылать его 
каким либо иным лицам. Просьба уничтожить данное сообщение и уведомить об этом 
отправителя электронным письмом.”


Re: JobManager restarts on job failure

2022-09-20 Thread Gyula Fóra
The best thing for you to do would be to upgrade to Flink 1.15 and the
latest operator version.
In Flink 1.15 we have the option to interact with the Flink jobmanager even
after the job FAILED and the operator leverages this for a much more robust
behaviour.

In any case the operator should not ever start the job from an empty state
(even if it FAILED), if you think that's happening could you please open a
JIRA ticket with the accompanying JM and Operator logs?

Thanks
Gyula

On Tue, Sep 20, 2022 at 1:00 PM Evgeniy Lyutikov 
wrote:

> Hi,
> We using flink 1.14.4 with flink kubernetes operator.
>
> Sometimes when updating a job, it fails on startup and flink removes all
> HA metadata and exits the jobmanager.
>
>
> 2022-09-14 14:54:44,534 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring
> job  from Checkpoint 30829 @ 1663167158684
> for  located at
> s3p://flink-checkpoints/k8s-checkpoint-job-name//chk-30829.
> 2022-09-14 14:54:44,638 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job
>  reached terminal state FAILED.
> org.apache.flink.runtime.client.JobInitializationException: Could not
> start the JobMaster.
> Caused by: java.util.concurrent.CompletionException:
> java.lang.IllegalStateException: There is no operator for the state
> 4e1d9dde287c33a35e7970cbe64a40fe
> 2022-09-14 14:54:44,930 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal
> error occurred in the cluster entrypoint.
> 2022-09-14 14:54:45,020 INFO
> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
> Clean up the high availability data for job
> .
> 2022-09-14 14:54:45,020 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting
> KubernetesApplicationClusterEntrypoint down with application status
> UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
> 2022-09-14 14:54:45,026 INFO
> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting
> down rest endpoint.
> 2022-09-14 14:54:46,122 INFO
> akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Shutting
> down remote daemon.
> 2022-09-14 14:54:46,321 INFO
> akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Remoting
> shut down.
>
> Kubernetes restarts the pod jobmanager and the new instance, not finding
> the HA metadata, starts the job from an empty state.
> Is there some option to prevent jobmanager from exiting after an job FAILED
> state?
>
>
> * -- *“This message contains confidential
> information/commercial secret. If you are not the intended addressee of
> this message you may not copy, save, print or forward it to any third party
> and you are kindly requested to destroy this message and notify the sender
> thereof by email.
> Данное сообщение содержит конфиденциальную информацию/информацию,
> являющуюся коммерческой тайной. Если Вы не являетесь надлежащим адресатом
> данного сообщения, Вы не вправе копировать, сохранять, печатать или
> пересылать его каким либо иным лицам. Просьба уничтожить данное сообщение и
> уведомить об этом отправителя электронным письмом.”
>


Flink SQL suport tfrecord format

2022-09-20 Thread 张颖
hi,when I write a sql like this:




String sqlCreate = "CREATE TABLE fs_table (\n" +
"  `examplestr` bytes\n" +
")  WITH (\n" +
"  'connector'='filesystem',\n" +
"  'format'='raw',\n" +
"  'path'='/tmp/zhangying480'\n" +
")";
 I get an error like this,which means my tfrecord format is wrong,but it is 
correct:
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: java.io.IOException: Length header crc32 checking failed: 
-1837507072 != -252953760, length = 323850
at 
com.jd.realtime.formal.operator.source.inner.TFRecordReader.read(TFRecordReader.java:44)
at 
com.jd.realtime.formal.operator.source.inner.TFRecordInputFormat.nextRecord(TFRecordInputFormat.java:128)
at 
com.jd.realtime.formal.operator.source.inner.TFRecordInputFormat.nextRecord(TFRecordInputFormat.java:17)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:202)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)


I read the source code then have a result:
public class SerializationSchemaAdapter implements Encoder {

private static final long serialVersionUID = 1L;

static final byte LINE_DELIMITER = "\n".getBytes(StandardCharsets.UTF_8)[0];


@Override
public void encode(RowData element, OutputStream stream) throws IOException {
checkOpened();
stream.write(serializationSchema.serialize(element));
stream.write(LINE_DELIMITER);


write line_delimiter will cause the incorrect result,so I realize a 
TFRecordSerializationSchemaAdapter
and I override the FileSystemTableSink,then I fix the problem


  
So far,I only suport filesystemsink with tfrecord format,do you think It is 
necessary to support other system?

















JobManager restarts on job failure

2022-09-20 Thread Evgeniy Lyutikov
Hi,
We using flink 1.14.4 with flink kubernetes operator.

Sometimes when updating a job, it fails on startup and flink removes all HA 
metadata and exits the jobmanager.


2022-09-14 14:54:44,534 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring job 
 from Checkpoint 30829 @ 1663167158684 for 
 located at 
s3p://flink-checkpoints/k8s-checkpoint-job-name//chk-30829.
2022-09-14 14:54:44,638 INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 
 reached terminal state FAILED.
org.apache.flink.runtime.client.JobInitializationException: Could not start the 
JobMaster.
Caused by: java.util.concurrent.CompletionException: 
java.lang.IllegalStateException: There is no operator for the state 
4e1d9dde287c33a35e7970cbe64a40fe
2022-09-14 14:54:44,930 ERROR 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal error 
occurred in the cluster entrypoint.
2022-09-14 14:54:45,020 INFO  
org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - Clean up 
the high availability data for job .
2022-09-14 14:54:45,020 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting 
KubernetesApplicationClusterEntrypoint down with application status UNKNOWN. 
Diagnostics Cluster entrypoint has been closed externally..
2022-09-14 14:54:45,026 INFO  
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting 
down rest endpoint.
2022-09-14 14:54:46,122 INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Shutting down 
remote daemon.
2022-09-14 14:54:46,321 INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Remoting shut 
down.


Kubernetes restarts the pod jobmanager and the new instance, not finding the HA 
metadata, starts the job from an empty state.
Is there some option to prevent jobmanager from exiting after an job FAILED 
state?



"This message contains confidential information/commercial secret. If you are 
not the intended addressee of this message you may not copy, save, print or 
forward it to any third party and you are kindly requested to destroy this 
message and notify the sender thereof by email.
Данное сообщение содержит конфиденциальную информацию/информацию, являющуюся 
коммерческой тайной. Если Вы не являетесь надлежащим адресатом данного 
сообщения, Вы не вправе копировать, сохранять, печатать или пересылать его 
каким либо иным лицам. Просьба уничтожить данное сообщение и уведомить об этом 
отправителя электронным письмом."


Re: serviceAccount permissions issue for high availability in operator 1.1

2022-09-20 Thread Yang Wang
The standalone mode will be supported in the release-1.2, which is
expected to be released at the beginning of October.

Best,
Yang

Javier Vegas  于2022年9月12日周一 04:52写道:

> Hi, Yang!
>
> When you say the operator uses native k8s integration by default, does
> that mean there is a way to change that to use standalone K8s? I haven't
> seen anything about that in the docs, besides a mention that standalone
> support is coming in version 1.2 of the operator.
>
> Thanks,
>
> Javier
>
>
> On Thu, Sep 8, 2022, 22:50 Yang Wang  wrote:
>
>> Since the flink-kubernetes-operator is using native K8s integration[1] by
>> default, you need to give the permissions of pod and deployment as well as
>> ConfigMap.
>>
>> You could find more information about the RBAC here[2].
>>
>> [1].
>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/
>> [2].
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.1/docs/operations/rbac/
>>
>> Best,
>> Yang
>>
>> Javier Vegas  于2022年9月7日周三 04:17写道:
>>
>>> I am migrating a HA standalone Kubernetes app to use the Flink operator.
>>> The HA store is S3 using IRSA so the app needs to run with a serviceAccount
>>> that is authorized to access S3. In standalone mode HA worked once I gave
>>> the account permissions to edit configMaps. But when trying the operator
>>> with the custom serviceAccount, I am getting this error:
>>>
>>> io.fabric8.kubernetes.client.KubernetesClientException: Failure
>>> executing: GET at:
>>> https://172.20.0.1/apis/apps/v1/namespaces/MYNAMESPACE/deployments/MYAPPNAME.
>>> Message: Forbidden!Configured service account doesn't have access. Service
>>> account may have been revoked. deployments.apps "MYAPPNAME" is forbidden:
>>> User "system:serviceaccount:MYNAMESPACE:MYSERVICEACCOUNT" cannot get
>>> resource "deployments" in API group "apps" in the namespace "MYNAMESPACE".
>>>
>>>
>>> Does the serviceAccount needs additional permissions beside configMap
>>> edit to be able to run HA using the operator?
>>>
>>> Thanks,
>>>
>>> Javier Vegas
>>>
>>


flink web ui 异常问题

2022-09-20 Thread yidan zhao
如题,在工作中经常遇到flink任务各种异常,今天我列了下主要的异常,想请大佬们对不同异常的出现场景根据自身经验说下原因、场景、还有可能的优化解决方案。

(1) org.apache.flink.runtime.jobmaster.JobMasterException: TaskManager
with id {...} is no longer reachable.
(2) org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
(3) 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Lost connection to task manager '10.35.213.153/10.35.213.153:2085'.
This indicates that the remote task manager was lost.
(4) org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
readAddress(..) failed: Connection timed out (connection to
'10.35.116.170/10.35.116.170:2031')
(5) 
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
writeAddress(..) failed: Connection reset by peer
(6) java.util.concurrent.TimeoutException: Heartbeat of TaskManager
with id {...} timed out.


Re: Classloading issues with Flink Operator / Kubernetes Native

2022-09-20 Thread Yaroslav Tkachenko
Hi Javier,

What do you specify as a jarURI?

On Mon, Sep 19, 2022 at 3:56 PM Javier Vegas  wrote:

> I am doing the same thing (migrating from standalone to operator in native
> mode) and also have my jar in /opt/flink/lib but for me it works fine, no
> class loading errors on app startup.
>
> El vie, 16 sept 2022 a las 9:28, Yaroslav Tkachenko ()
> escribió:
>
>> Application mode. I've done a bit more research and created
>> https://issues.apache.org/jira/browse/FLINK-29288, planning to work on a
>> PR today.
>>
>> TLDR: currently Flink operator always creates /opt/flink/usrlib folder
>> and forces you to specify the jarURI parameter, which is passed as
>> pipeline.jars / pipeline.classpaths configuration options. This leads to
>> the jar being loaded twice by different classloaders (system and user
>> ones).
>>
>> On Fri, Sep 16, 2022 at 2:30 AM Matthias Pohl 
>> wrote:
>>
>>> Are you deploying the job in session or application mode? Could you
>>> provide the stacktrace. I'm wondering whether that would be helpful to pin
>>> a code location for further investigation.
>>> So far, I couldn't come up with a definite answer about placing the jar
>>> in the lib directory. Initially, I would have thought that it's fine
>>> considering that all dependencies are included and the job jar itself ends
>>> up on the user classpath. I'm curious whether Chesnay (CC'd) has an answer
>>> to that one.
>>>
>>> On Tue, Sep 13, 2022 at 1:40 AM Yaroslav Tkachenko 
>>> wrote:
>>>
 Hey everyone,

 I’m migrating a Flink Kubernetes standalone job to the Flink operator
 (with Kubernetes native mode).

 I have a lot of classloading issues when trying to run with
 the operator in native mode. For example, I have a Postgres driver as a
 dependency (I can confirm the files are included in the uber jar), but I
 still get "java.sql.SQLException: No suitable driver found for
 jdbc:postgresql:..." exception.

 In the Kubernetes standalone setup my uber jar is placed in the
 /opt/flink/lib folder, this is what I specify as "jarURI" in the operator
 config. Is this supported? Should I only be using /opt/flink/usrlib?

 Thanks for any suggestions.

>>>