Re: flink 1.12.0 k8s session部署异常

2021-03-24 Thread Yang Wang
这个问题的根本原因是云上LoadBalancer一直在给Flink创建的service发送RST包导致了
这个JIRA[1]可以了解更多信息

临时绕过去的方案就是在log4j2配置里面把org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint这个类的log级别调到ERROR

[1]. https://issues.apache.org/jira/browse/FLINK-18129

Best,
Yang

18756225...@163.com <18756225...@163.com> 于2021年3月24日周三 下午5:45写道:

> 我也遇到这个问题,集群可以提交正常提交任务,但是jobmanager的日志一直有这个, 请问可有办法解决?
>
>
> 发件人: casel.chen
> 发送时间: 2021-02-07 16:33
> 收件人: user-zh@flink.apache.org
> 主题: flink 1.12.0 k8s session部署异常
> 在k8s上部署sesson模式的flink集群遇到jobmanager报如下错误,请问这是什么原因造成的?要如何fix?
>
>
> 2021-02-07 08:21:41,873 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting
> RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher
> at akka://flink/user/rpc/dispatcher_1 .
> 2021-02-07 08:21:43,506 WARN
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint   [] - Unhandled
> exception
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> ~[?:1.8.0_275]
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> ~[?:1.8.0_275]
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> ~[?:1.8.0_275]
> at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_275]
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> ~[?:1.8.0_275]
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
> 2021-02-07 08:21:43,940 WARN
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint   [] - Unhandled
> exception
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> ~[?:1.8.0_275]
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> ~[?:1.8.0_275]
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> ~[?:1.8.0_275]
> at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_275]
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> ~[?:1.8.0_275]
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> 

Re: EOFException on attempt to scale up job with RocksDB state backend

2021-03-24 Thread Alexey Trenikhun
Hi Yun,
Finally I was able to try to rescale with block blobs configured - rescaled 
from 6 to 8 w/o problem. So loos like indeed there is problem with page blob.

Thank you for help,
Alexey

From: Alexey Trenikhun 
Sent: Thursday, March 18, 2021 11:31 PM
To: Yun Tang ; Tzu-Li (Gordon) Tai ; 
user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

I Yun,
I've changed configuration to use block blobs, however due to another issue 
[1], I can't make savepoint, I hope eventually job will able to process 
backlog, then I will take savepoint, re-test and let you know

Thanks,
Alexey

[1] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-fail-due-to-timeout-td42125.html#a42248
Apache Flink User Mailing List archive. - Checkpoint fail due to 
timeout
Checkpoint fail due to timeout. Hello, We are experiencing the problem with 
checkpoints failing due to timeout (already set to 30 minute, still failing), 
checkpoints were not too big before they...
apache-flink-user-mailing-list-archive.2336050.n4.nabble.com



From: Yun Tang 
Sent: Thursday, March 18, 2021 5:08 AM
To: Alexey Trenikhun ; Tzu-Li (Gordon) Tai 
; user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi Alexey,

Flink would only write once for checkpointed files. Could you try to write 
checkpointed files as block blob format and see whether the problem still 
existed?

Best
Yun Tang

From: Alexey Trenikhun 
Sent: Thursday, March 18, 2021 13:54
To: Yun Tang ; Tzu-Li (Gordon) Tai ; 
user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi Yun,
How underlying storage explains fact that without re-scale I can restore from 
savepoint? Does Flink write file once or many times, if many times, then 
potentially could be problem with 50,000 blocks per blob limit, I'm I right? 
Should I try block blob with compaction like described in [1] or without 
compaction?

Thanks,
Alexey

From: Yun Tang 
Sent: Wednesday, March 17, 2021 9:31 PM
To: Alexey Trenikhun ; Tzu-Li (Gordon) Tai 
; user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi Alexey,

I am not familiar with azure blob storage and I cannot load the "_metadata" 
with your given file locally.

Currently, I highly suspect this strange rescaling behavior is related with 
your underlying storage, could you try to use block blob instead of page blob 
[1] to see whether this behavior still existed?

[1] 
https://hadoop.apache.org/docs/current/hadoop-azure/index.html#Block_Blob_with_Compaction_Support_and_Configuration


Best
Yun Tang


From: Alexey Trenikhun 
Sent: Thursday, March 18, 2021 12:00
To: Yun Tang ; Tzu-Li (Gordon) Tai ; 
user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi Yun,
Azure web UI shows size of all files created by Flink as 128Mib * X (128, 256, 
640), see screenshot attached. In my understanding this is because Flink 
creates them as Page Blobs. In same storage other application creates files as 
block blobs and they have sizes not rounded on 128Mib


Thanks,
Alexey


From: Yun Tang 
Sent: Wednesday, March 17, 2021 8:38 PM
To: Alexey Trenikhun ; Tzu-Li (Gordon) Tai 
; user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi Alexey,

I tried to load your _metadata as checkpoint via 
Checkpoints#loadCheckpointMetadata [1] but found this file is actually not a 
savepoint meta, have you ever uploaded the correct files?
Moreover, I noticed that both size of 77e77928-cb26-4543-bd41-e785fcac49f0 and 
_metadata are 128MB which is much larger than its correct capacity, is this 
expected on azure blob storage or you just uploaded the wrong files?

[1] 
https://github.com/apache/flink/blob/956c0716fdbf20bf53305fe4d023fa2bea412595/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L99

Best
Yun Tang

From: Alexey Trenikhun 
Sent: Thursday, March 18, 2021 0:45
To: Yun Tang ; Tzu-Li (Gordon) Tai ; 
user@flink.apache.org 
Subject: Re: EOFException on attempt to scale up job with RocksDB state backend

Hi Yun,

I've copied 77e77928-cb26-4543-bd41-e785fcac49f0 and _metadata to Google drive:
https://drive.google.com/drive/folders/1J3nwvQupLBT5ZaN_qEmc2y_-MgFz0cLb?usp=sharing

Compression was never enabled (docs says that RocksDB's incremental checkpoints 
always use snappy compression, not sure does it have effect on savepoint or not)

Thanks,
Alexey

From: Yun Tang 
Sent: 

Re: Re: About Memory Spilling to Disk in Flink

2021-03-24 Thread Guowei Ma
Hi, Roc
Thanks for your detailed explanation.
I could not find any "stream" operator that uses `ExternalSorterBuilder` by
"find usage" of the IDEA.
Best,
Guowei


On Wed, Mar 24, 2021 at 3:27 PM Roc Marshal  wrote:

> Hi, Guowei Ma.
> As far as I know, flink writes some in-memory data to disk when memory
> is running low. I noticed that flink uses ExternalSorterBuilder for batch
> operations in the org.apache.flink.runtime.operator.sort package, but I'm
> curious to confirm if this technique is also used in stream mode.
> Thank you.
>
> Best,
> Roc
>
>
>
>
>
> At 2021-03-24 15:01:48, "Guowei Ma"  wrote:
>
> Hi, Roc
> Could you explain more about your question?
> Best,
> Guowei
>
>
> On Wed, Mar 24, 2021 at 2:47 PM Roc Marshal  wrote:
>
>> Hi,
>>
>> Can someone tell me where flink uses memory spilling to write to
>> disk?
>> Thank you.
>>
>> Best, Roc.
>>
>>
>>
>>
>
>
>
>


Re: [EXTERNAL] Re: PyFlink DataStream Example Kafka/Kinesis?

2021-03-24 Thread Bohinski, Kevin
Hi Shuiqiang,

Thanks for letting me know. Feel free to send any beginner level contributions 
for this effort my way  .

Best,
kevin

From: Shuiqiang Chen 
Date: Wednesday, March 24, 2021 at 10:31 PM
To: "Bohinski, Kevin" 
Cc: user 
Subject: [EXTERNAL] Re: PyFlink DataStream Example Kafka/Kinesis?

Hi Kevin,

Kinesis connector is not supported yet in Python DataStream API. We will add it 
in the future.

Best,
Shuiqiang

Bohinski, Kevin mailto:kevin_bohin...@comcast.com>> 
于2021年3月25日周四 上午5:03写道:
Is there a kinesis example?

From: "Bohinski, Kevin" 
mailto:kevin_bohin...@comcast.com>>
Date: Wednesday, March 24, 2021 at 4:40 PM
To: "Bohinski, Kevin" 
mailto:kevin_bohin...@comcast.com>>
Subject: Re: PyFlink DataStream Example Kafka/Kinesis?

Nevermind, found this for anyone else looking: 
https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py

From: "Bohinski, Kevin" 
mailto:kevin_bohin...@comcast.com>>
Date: Wednesday, March 24, 2021 at 4:38 PM
To: user mailto:user@flink.apache.org>>
Subject: PyFlink DataStream Example Kafka/Kinesis?

Hi,

Is there an example kafka/kinesis source or sink for the PyFlink DataStream API?

Best,
kevin


Re: PyFlink DataStream Example Kafka/Kinesis?

2021-03-24 Thread Shuiqiang Chen
Sure, no problem.  You can refer to the implementation of Kafka connector,
they are very much alike.

Xinbin Huang  于2021年3月25日周四 上午10:55写道:

> Hi Shuiqiang,
>
> Thanks for the quick response on creating the ticket for Kinesis
> Connector. Do you mind giving me the chance to try to implement the
> connector over the weekend?
>
> I am interested in contributing to Flink, and I think this can be a good
> starting point to me
>
> Best
> Bin
>
> On Wed, Mar 24, 2021 at 7:49 PM Shuiqiang Chen 
> wrote:
>
>> I have just created the jira
>> https://issues.apache.org/jira/browse/FLINK-21966 and will finish it
>> soon.
>>
>> Best,
>> Shuiqiang
>>
>> Xinbin Huang  于2021年3月25日周四 上午10:43写道:
>>
>>> Hi Shuiqiang,
>>>
>>> I am interested in the same feature. Do we have a ticket to track this
>>> right now?
>>>
>>> Best
>>> Bin
>>>
>>> On Wed, Mar 24, 2021 at 7:30 PM Shuiqiang Chen 
>>> wrote:
>>>
 Hi Kevin,

 Kinesis connector is not supported yet in Python DataStream API. We
 will add it in the future.

 Best,
 Shuiqiang

 Bohinski, Kevin  于2021年3月25日周四 上午5:03写道:

> Is there a kinesis example?
>
>
>
> *From: *"Bohinski, Kevin" 
> *Date: *Wednesday, March 24, 2021 at 4:40 PM
> *To: *"Bohinski, Kevin" 
> *Subject: *Re: PyFlink DataStream Example Kafka/Kinesis?
>
>
>
> Nevermind, found this for anyone else looking:
> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
>
>
>
> *From: *"Bohinski, Kevin" 
> *Date: *Wednesday, March 24, 2021 at 4:38 PM
> *To: *user 
> *Subject: *PyFlink DataStream Example Kafka/Kinesis?
>
>
>
> Hi,
>
>
>
> Is there an example kafka/kinesis source or sink for the PyFlink
> DataStream API?
>
>
>
> Best,
>
> kevin
>



Re: 退订

2021-03-24 Thread Kezhu Wang
你需要发邮件到 

Best,
Kezhu Wang

On March 25, 2021 at 10:15:56, drewfranklin (drewfrank...@163.com) wrote:

退订


Re: With the checkpoint interval of the same size, the Flink 1.12 version of the job checkpoint time-consuming increase and production failure, the Flink1.9 job is running normally

2021-03-24 Thread Haihang Jing
Hi,Congxian ,thanks for your replay.
job run on Flink1.9 (checkpoint interval 3min)

 
job run on Flink1.12 (checkpoint interval 10min)

 
job run on Flink1.12 (checkpoint interval 3min)
Pic1:Time used to complete the checkpoint in 1.12 is longer(5m32s):

 
Pic2:Start delay(4m27s):

 
Pic3:Next checkpoint failed(task141 ack n/a):

 
Pic4:Did not see back pressure and data skew:

 
Pic5:Subtask deal same data nums ,checkpoint endToEnd fast:

 
Best,
Haihang



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: PyFlink DataStream Example Kafka/Kinesis?

2021-03-24 Thread Shuiqiang Chen
I have just created the jira
https://issues.apache.org/jira/browse/FLINK-21966 and will finish it soon.

Best,
Shuiqiang

Xinbin Huang  于2021年3月25日周四 上午10:43写道:

> Hi Shuiqiang,
>
> I am interested in the same feature. Do we have a ticket to track this
> right now?
>
> Best
> Bin
>
> On Wed, Mar 24, 2021 at 7:30 PM Shuiqiang Chen 
> wrote:
>
>> Hi Kevin,
>>
>> Kinesis connector is not supported yet in Python DataStream API. We will
>> add it in the future.
>>
>> Best,
>> Shuiqiang
>>
>> Bohinski, Kevin  于2021年3月25日周四 上午5:03写道:
>>
>>> Is there a kinesis example?
>>>
>>>
>>>
>>> *From: *"Bohinski, Kevin" 
>>> *Date: *Wednesday, March 24, 2021 at 4:40 PM
>>> *To: *"Bohinski, Kevin" 
>>> *Subject: *Re: PyFlink DataStream Example Kafka/Kinesis?
>>>
>>>
>>>
>>> Nevermind, found this for anyone else looking:
>>> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
>>>
>>>
>>>
>>> *From: *"Bohinski, Kevin" 
>>> *Date: *Wednesday, March 24, 2021 at 4:38 PM
>>> *To: *user 
>>> *Subject: *PyFlink DataStream Example Kafka/Kinesis?
>>>
>>>
>>>
>>> Hi,
>>>
>>>
>>>
>>> Is there an example kafka/kinesis source or sink for the PyFlink
>>> DataStream API?
>>>
>>>
>>>
>>> Best,
>>>
>>> kevin
>>>
>>


Re: With the checkpoint interval of the same size, the Flink 1.12 version of the job checkpoint time-consuming increase and production failure, the Flink1.9 job is running normally

2021-03-24 Thread Haihang Jing
Hi,Congxian ,thanks for your replay.
job run on Flink1.9 (checkpoint interval 3min)

 
job run on Flink1.12 (checkpoint interval 10min)

 
job run on Flink1.12 (checkpoint interval 3min)
Pic1:Time used to complete the checkpoint in 1.12 is longer(5m32s):

 
Pic2:Start delay(4m27s):

 
Pic3:Next checkpoint failed(task141 ack n/a):

 
Pic4:Did not see back pressure and data skew:

 
Pic5:Subtask deal same data nums ,checkpoint endToEnd fast:

 
Best,
Haihang




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: PyFlink DataStream Example Kafka/Kinesis?

2021-03-24 Thread Shuiqiang Chen
Hi Kevin,

Kinesis connector is not supported yet in Python DataStream API. We will
add it in the future.

Best,
Shuiqiang

Bohinski, Kevin  于2021年3月25日周四 上午5:03写道:

> Is there a kinesis example?
>
>
>
> *From: *"Bohinski, Kevin" 
> *Date: *Wednesday, March 24, 2021 at 4:40 PM
> *To: *"Bohinski, Kevin" 
> *Subject: *Re: PyFlink DataStream Example Kafka/Kinesis?
>
>
>
> Nevermind, found this for anyone else looking:
> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
>
>
>
> *From: *"Bohinski, Kevin" 
> *Date: *Wednesday, March 24, 2021 at 4:38 PM
> *To: *user 
> *Subject: *PyFlink DataStream Example Kafka/Kinesis?
>
>
>
> Hi,
>
>
>
> Is there an example kafka/kinesis source or sink for the PyFlink
> DataStream API?
>
>
>
> Best,
>
> kevin
>


退订

2021-03-24 Thread drewfranklin
退订



State size increasing exponentially in Flink v1.9

2021-03-24 Thread Almeida, Julius
Hey,
Hope you all are doing well!

I am using flink v1.9 with RocksDBStateBackend, but over time the state size is 
increasing exponentially.

I am using MapState in my project & seeing memory spike, after looking at heap 
dump I see duplicates in it.

I also have logic added to remove expired events form the MapState
Eg.: MapState.remove(key)

Can anyone give me pointers to find more details on it.

Heap Dump pointed to 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java#L811

Thanks,
Julius


Re: PyFlink DataStream Example Kafka/Kinesis?

2021-03-24 Thread Bohinski, Kevin
Is there a kinesis example?

From: "Bohinski, Kevin" 
Date: Wednesday, March 24, 2021 at 4:40 PM
To: "Bohinski, Kevin" 
Subject: Re: PyFlink DataStream Example Kafka/Kinesis?

Nevermind, found this for anyone else looking: 
https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py

From: "Bohinski, Kevin" 
Date: Wednesday, March 24, 2021 at 4:38 PM
To: user 
Subject: PyFlink DataStream Example Kafka/Kinesis?

Hi,

Is there an example kafka/kinesis source or sink for the PyFlink DataStream API?

Best,
kevin


Re: PyFlink DataStream Example Kafka/Kinesis?

2021-03-24 Thread Bohinski, Kevin
Nevermind, found this for anyone else looking: 
https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py


From: "Bohinski, Kevin" 
Date: Wednesday, March 24, 2021 at 4:38 PM
To: user 
Subject: PyFlink DataStream Example Kafka/Kinesis?

Hi,

Is there an example kafka/kinesis source or sink for the PyFlink DataStream API?

Best,
kevin


PyFlink DataStream Example Kafka/Kinesis?

2021-03-24 Thread Bohinski, Kevin
Hi,

Is there an example kafka/kinesis source or sink for the PyFlink DataStream API?

Best,
kevin


Re: Evenly distribute task slots across task-manager

2021-03-24 Thread Vignesh Ramesh
Hi Matthias,

Thanks for your reply. In my case, yes the upstream operator for the
operator which is not distributed evenly among task managers is a flink
Kafka connector with a rebalance(shuffling).

Regards,
Vignesh

On Tue, 23 Mar, 2021, 6:48 pm Matthias Pohl,  wrote:

> There was a similar discussion recently in this mailing list about
> distributing the work onto different TaskManagers. One finding Xintong
> shared there [1] was that the parameter cluster.evenly-spread-out-slots is
> used to evenly allocate slots among TaskManagers but not how the tasks are
> actually distributed among the allocated slots. It would be interesting to
> know more about your job. If the upstream operator does some shuffling, you
> might run into the issue of the task executions not being distributed
> evenly anymore.
>
> Matthias
>
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Evenly-Spreading-Out-Source-Tasks-tp42108p42235.html
>
> On Tue, Mar 23, 2021 at 1:42 PM Matthias Pohl 
> wrote:
>
>> Hi Vignesh,
>> are you trying to achieve an even distribution of tasks for this one
>> operator that has the parallelism set to 16? Or do you observe the
>> described behavior also on a job level?
>> I'm adding Chesnay to the thread as he might have more insights on this
>> topic.
>>
>> Best,
>> Matthias
>>
>> On Mon, Mar 22, 2021 at 6:31 PM Vignesh Ramesh 
>> wrote:
>>
>>> Hello Everyone,
>>>
>>> Can someone help me with a solution?
>>>
>>> I have a flink job(2 task-managers) with a job parallelism of 64 and
>>> task slot of 64.
>>> I have a parallelism set for one of the operators as 16. This
>>> operator(16 parallelism) slots are not getting evenly distributed across
>>> two task managers. It often takes higher task slots like 10/11 in one task
>>> manager and 5/6 in other task manager.
>>>
>>> I'am using flink version 1.11.2. I tried adding 
>>> cluster.evenly-spread-out-slots:
>>> true but it didn't work. Any solution is greatly appreciated.
>>>
>>> Thanks in advance,
>>>
>>> Regards,
>>> Vignesh
>>>
>>>


Re: DataDog and Flink

2021-03-24 Thread Vishal Santoshi
yep, not a single EP that does all the dump but something like this works (
dirty but who cares :)) ..  The vertex metrics are the most numerous any
way
```curl -s  http:///jobs/[job_id] | jq -r '.vertices' | jq
'.[].id' |  xargs
-I {}  curl http://xx/jobs/[job_id]/vertices/{}/metrics | jq

On Wed, Mar 24, 2021 at 9:56 AM Vishal Santoshi 
wrote:

> Yes, I will do that.
>
> Regarding the metrics dump through REST, it does provide for the TM
> specific but  refuses to do it for all jobs and vertices/operators etc
> .Moreover I am not sure I have access to the vertices ( vertex_id ) readily
> from the UI.
>
> curl http://[jm]/taskmanagers/[tm_id]
> curl http://[jm]/taskmanagers/[tm_id]/metrics
>
>
>
> On Wed, Mar 24, 2021 at 4:24 AM Arvid Heise  wrote:
>
>> Hi Vishal,
>>
>> REST API is the most direct way to get through all metrics as Matthias
>> pointed out. Additionally, you could also add a JMX reporter and log to the
>> machines to check.
>>
>> But in general, I think you are on the right track. You need to reduce
>> the metrics that are sent to DD by configuring the scope / excluding
>> variables.
>>
>> Furthermore, I think it would be a good idea to make the timeout
>> configurable. Could you open a ticket for that?
>>
>> Best,
>>
>> Arvid
>>
>> On Wed, Mar 24, 2021 at 9:02 AM Matthias Pohl 
>> wrote:
>>
>>> Hi Vishal,
>>> what about the TM metrics' REST endpoint [1]. Is this something you
>>> could use to get all the metrics for a specific TaskManager? Or are you
>>> looking for something else?
>>>
>>> Best,
>>> Matthias
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/rest_api.html#taskmanagers-metrics
>>>
>>> On Tue, Mar 23, 2021 at 10:59 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 That said, is there a way to get a dump of all metrics exposed by TM. I
 was searching for it and I bet we could get it for ServieMonitor on k8s (
 scrape ) but am missing a way to het a TM and dump all metrics that are
 pushed.

 Thanks and regards.

 On Tue, Mar 23, 2021 at 5:56 PM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> I guess there is a bigger issue here. We dropped the property to 500.
> We also realized that this failure happened on a TM that had one specific
> job running on it. What was good ( but surprising ) that the exception was
> the more protocol specific 413  ( as in the chunk is greater then some 
> size
> limit DD has on a request.
>
> Failed to send request to Datadog (response was Response{protocol=h2,
> code=413, message=, url=
> https://app.datadoghq.com/api/v1/series?api_key=**}
> 
> )
>
> which implies that the Socket timeout was masking this issue. The 2000
> was just a huge payload that DD was unable to parse in time ( or was slow
> to upload etc ). Now we could go lower but that makes less sense. We could
> play with
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html#system-scope
> to reduce the size of the tags ( or keys ).
>
>
>
>
>
>
>
>
>
> On Tue, Mar 23, 2021 at 11:33 AM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> If we look at this
>> 
>> code , the metrics are divided into chunks up-to a max size. and
>> enqueued
>> .
>> The Request
>> 
>> has a 3 second read/connect/write timeout which IMHO should have been
>> configurable ( or is it ) . While the number metrics ( all metrics )
>> exposed by flink cluster is pretty high ( and the names of the metrics
>> along with tags ) , it may make sense to limit the number of metrics in a
>> single chunk ( to ultimately limit the size of a single chunk ). There is
>> this configuration which allows for reducing the metrics in a single 
>> chunk
>>
>> metrics.reporter.dghttp.maxMetricsPerRequest: 2000
>>
>> We could decrease this to 1500 ( 1500 is pretty, not based on any
>> empirical reasoning ) and see if that stabilizes the dispatch. It is
>> inevitable that the number of requests will grow and we may hit the
>> throttle but then we know the exception rather than the timeouts that are
>> generally less intuitive.
>>
>> Any 

Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint

2021-03-24 Thread Alexey Trenikhun
Hi Yang,
HA data was cleared, but it would be re-created  when Kubernetes will restart 
failed (due to code 2) job. So upgrade will happen on life job. I guess upgrade 
procedure, should recheck or monitor kubernetes job to ensure that it is 
completed

Thanks,
Alexey


From: Yang Wang 
Sent: Tuesday, March 23, 2021 11:17:18 PM
To: Alexey Trenikhun 
Cc: Flink User Mail List 
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) 
savepoint

Hi Alexey,

>From your attached logs, I do not think the new start JobManager will recover 
>from the wrong savepoint.
Because you could find the following logs to indicate that the HA related 
ConfigMaps have been cleaned up successfully.

1393 {"ts":"2021-03-20T02:02:18.506Z","message":"Finished cleaning up the high 
availability 
data.","logger_name":"org.apache.flink.kubernetes.highavailability.KubernetesHaServices","thread_name":"AkkaRpcService-Supervisor-Termination-Future-Executor-thread-1","level":"INFO","level_value":2}


So, now your question is why the JobManager pod exited with code 2?
It seems that "clusterEntrypoint.getTerminationFuture().get()" failed with a 
timeout when terminating.

Alexey Trenikhun mailto:yen...@msn.com>> 于2021年3月20日周六 
上午10:16写道:
Hi Yang,
I think I've reproduced the problem - job was cancelled, however something went 
wrong, JM exited with code 2, and as result Kubernetes restarted JM pod. JM log 
is attached.

Thanks,
Alexey

From: Yang Wang mailto:danrtsey...@gmail.com>>
Sent: Monday, March 15, 2021 1:26 AM
To: Alexey Trenikhun mailto:yen...@msn.com>>
Cc: Flink User Mail List mailto:user@flink.apache.org>>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) 
savepoint

Feel free to share the terminated JobManager logs if you could reproduce this 
issue again.
Maybe "kubectl logs {pod_name} --previous" could help.


Best,
Yang

Alexey Trenikhun mailto:yen...@msn.com>> 于2021年3月15日周一 下午2:28写道:
With 1.12.1 it happened quite often, with 1.12.2 not that match, I think I saw 
it once or twice for ~20 cancels, when it happened, job actually restarted on 
cancel, did not grab log at that time, but chances good that I will able to 
reproduce.
Thanks,
Alexey


From: Yang Wang mailto:danrtsey...@gmail.com>>
Sent: Sunday, March 14, 2021 7:50:21 PM
To: Alexey Trenikhun mailto:yen...@msn.com>>
Cc: Flink User Mail List mailto:user@flink.apache.org>>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) 
savepoint

If the HA related ConfigMaps still exists, then I am afraid the data located on 
the distributed storage should also exist.
So I suggest to delete the HA related storage as well.

Delete all the HA related data manually should help in your current situation. 
After then you could recover from the new savepoint.
However, I do not think this is a normal behavior. Since when the application 
reached the terminal state(e.g. FINISHED, FAILED, CANCELLED),
all HA related data should be cleaned up automatically.

Could you help to provide the JobManager logs when you are trying to cancel the 
job? I believe using `kubectl logs -f {pod_name}` could dump
the logs in real time.

Best,
Yang

Alexey Trenikhun mailto:yen...@msn.com>> 于2021年3月12日周五 
上午12:47写道:
Hi Yang,
Upgrade procedure uses POST /jobs/:jobid/savepoints with cancel-job=true, but 
perhaps I hit FLINK-21028. This lead to question, if normal via API 
take-savepoint-and-cancel-job fails, what steps should be done outside Flink to 
be able to resume from savepoint with new job version? Is deleting Kubernetes 
Job and HA configmaps enough, or something in persisted storage should be 
deleted as well?

Thanks,
Alexey

From: Yang Wang mailto:danrtsey...@gmail.com>>
Sent: Thursday, March 11, 2021 2:59 AM
To: Alexey Trenikhun mailto:yen...@msn.com>>
Cc: Flink User Mail List mailto:user@flink.apache.org>>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) 
savepoint

Hi Alexey,

>From your attached logs, it seems that the leader related config map is reused.
Then the Flink application is recovered instead of submitting a new one. This is
the root cause it is trying to recover from a wrong savepoint which is 
specified in
your last submission.

> So how to fix this?
If you want to stop the application, I strongly suggest to cancel the flink job 
with savepoint
instead of directly deleting all the K8s resources. After then, you will find 
that the leader
related config maps will be deleted automatically after the job is cancelled.

Best,
Yang

Alexey Trenikhun mailto:yen...@msn.com>> 于2021年3月10日周三 
下午12:16写道:
Hi Yang,
The problem is re-occurred, full JM log is attached

Thanks,
Alexey

From: Yang Wang mailto:danrtsey...@gmail.com>>
Sent: Sunday, February 28, 2021 10:04 PM
To: Alexey Trenikhun mailto:yen...@msn.com>>
Cc: Flink User Mail List 

Re: OOM issues with Python Objects

2021-03-24 Thread Kevin Lam
Hi Dian,

I have unit tests for which both sets of code (Row subclass vs. custom
Python class) passes. The OOM occurs when reading a large amount of data
from a kafka topic.

At the moment I don't have a simple example to reproduce the issue, I'll
let you know.

On Tue, Mar 23, 2021 at 2:17 AM Dian Fu  wrote:

> Hi Kevin,
>
> Is it possible to provide a simple example to reproduce this issue?
>
> PS: It will use pickle to perform the serialization/deserialization if you
> don't specify the type info.
>
> Regards,
> Dian
>
>
> On Mon, Mar 22, 2021 at 10:55 PM Arvid Heise  wrote:
>
>> Hi Kevin,
>>
>> yes I understood that, but then your Python class contains a Row field,
>> where no mapping exists. I'm assuming PyFlink tries to do a deep conversion
>> and fails to do so by ending in some infinite loop.
>>
>> On Mon, Mar 22, 2021 at 3:48 PM Kevin Lam  wrote:
>>
>>> Thanks for the response Arvid! Point of clarification, *things do NOT
>>> OOM when I use the Row subclass*. Instead, the code that doesn't use
>>> the Row subclass is the code that OOMs (ie. the simple python class).
>>>
>>>
>>>
>>> On Mon, Mar 22, 2021 at 10:24 AM Arvid Heise  wrote:
>>>
 Hi Kevin,

 I suspect that this is because Row is not supported as a Python field
 [1]; it's supposed to be a dict that is mapped to a Row by Flink.
 Maybe it runs in some infinite loop while trying serialize and hence
 the OOM.

 Subclassing Row might be an undocumented feature.

 I'm also pulling in Dian who knows more about PyFlink.

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/datastream-api-users-guide/data_types.html

 On Fri, Mar 19, 2021 at 3:55 PM Kevin Lam 
 wrote:

> Hi all,
>
> I've encountered an interesting issue where I observe an OOM issue in
> my Flink Application when I use a DataStream of Python Objects, but when I
> make that Python Object a Subclass of pyflink.common.types.Row and provide
> TypeInformation, there is no issue.
>
> For the OOM scenario, no elements get processed, the application runs
> without printing output and then eventually crashes with 
> java.lang.OutOfMemoryError:
> Java heap space
>
> Any insights into why this might be happening? Appreciate any
> help/suggestions.
> I've included some code that illustrates the two situations below [0].
>
> Thanks in advance!
>
> [0]:
>
> Code Snippet A: OOM scenario
>
> class InputWrapper:
> """Helper class, used to make streams of the same type"""
>
> def __init__(self, key: str, contents: Row = None):
> self.key = key
> self.contents = contents
>
> x_ds = x_ds.map(
> lambda d: InputWrapper(key=d['key'], contents=d))
> y_ds = y_ds.map(
> lambda o: InputWrapper(key=o['key'], contents=o))
> union = x_ds.union(y_ds)
> union.print()
>
> Code Snippet B: Working scenario:
>
> class InputWrapper(Row):
> """Helper class, used to make streams of the same type"""
>
> def __init__(self, key: str, contents: Row = None):
> super().__init__(key, contents)
>
> x_ds = x_ds.map(
> lambda d: InputWrapper(key=d['key'], contents=d),
> output_type=InputWrapperTypeInfo())
> y_ds = y_ds.map(
> lambda o: InputWrapper(key=o['key'], contents=o),
> output_type=InputWrapperTypeInfo())
> union = x_ds.union(y_ds)
> union.print()
>
>
>


Native kubernetes execution and History server

2021-03-24 Thread Lukáš Drbal
Hi,

I would like to use native kubernetes execution [1] for one batch job and
let scheduling on kubernetes. Flink version: 1.12.2.

Kubernetes job:
apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: scheduled-job
spec:
  schedule: "*/1 * * * *"
  jobTemplate:
spec:
  template:
metadata:
  labels:
app: super-flink-batch-job
spec:
  containers:
  - name: runner
image: localhost:5000/batch-flink-app-v3:latest
imagePullPolicy: Always
command:
  - /bin/sh
  - -c
  - /opt/flink/bin/flink run-application --target
kubernetes-application -Dkubernetes.service-account=flink-service-account
-Dkubernetes.rest-service.exposed.type=NodePort
-Dkubernetes.cluster-id=batch-job-cluster
-Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest
-Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY
-Ds3.secret-key=SECRETKEY
-Djobmanager.archive.fs.dir=s3://flink/completed-jobs/
-Ds3.path-style-access=true -Ds3.ssl.enabled=false
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
-Dhigh-availability.storageDir=s3://flink/flink-ha
local:///opt/flink/usrlib/job.jar
  restartPolicy: OnFailure


This works well for me but I would like to write the result to the archive
path and show it in the History server (running as separate deployment in
k8)

Anytime it creates JobId= which obviously
leads to

Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
already been submitted.
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
~[?:1.8.0_282]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
~[?:1.8.0_282]
at
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.api.java.DataSet.collect(DataSet.java:417)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.api.java.DataSet.print(DataSet.java:1748)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96)
~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_282]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_282]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_282]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
... 10 more

I assume it is because it will spawn a completely new cluster for each run.

Can I somehow set jobId or I'm trying to do something unsupported/bad?

Thanks for advice.

L.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html


Re: DataDog and Flink

2021-03-24 Thread Vishal Santoshi
Yes, I will do that.

Regarding the metrics dump through REST, it does provide for the TM
specific but  refuses to do it for all jobs and vertices/operators etc
.Moreover I am not sure I have access to the vertices ( vertex_id ) readily
from the UI.

curl http://[jm]/taskmanagers/[tm_id]
curl http://[jm]/taskmanagers/[tm_id]/metrics



On Wed, Mar 24, 2021 at 4:24 AM Arvid Heise  wrote:

> Hi Vishal,
>
> REST API is the most direct way to get through all metrics as Matthias
> pointed out. Additionally, you could also add a JMX reporter and log to the
> machines to check.
>
> But in general, I think you are on the right track. You need to reduce the
> metrics that are sent to DD by configuring the scope / excluding variables.
>
> Furthermore, I think it would be a good idea to make the timeout
> configurable. Could you open a ticket for that?
>
> Best,
>
> Arvid
>
> On Wed, Mar 24, 2021 at 9:02 AM Matthias Pohl 
> wrote:
>
>> Hi Vishal,
>> what about the TM metrics' REST endpoint [1]. Is this something you could
>> use to get all the metrics for a specific TaskManager? Or are you looking
>> for something else?
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/rest_api.html#taskmanagers-metrics
>>
>> On Tue, Mar 23, 2021 at 10:59 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> That said, is there a way to get a dump of all metrics exposed by TM. I
>>> was searching for it and I bet we could get it for ServieMonitor on k8s (
>>> scrape ) but am missing a way to het a TM and dump all metrics that are
>>> pushed.
>>>
>>> Thanks and regards.
>>>
>>> On Tue, Mar 23, 2021 at 5:56 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 I guess there is a bigger issue here. We dropped the property to 500.
 We also realized that this failure happened on a TM that had one specific
 job running on it. What was good ( but surprising ) that the exception was
 the more protocol specific 413  ( as in the chunk is greater then some size
 limit DD has on a request.

 Failed to send request to Datadog (response was Response{protocol=h2,
 code=413, message=, url=
 https://app.datadoghq.com/api/v1/series?api_key=**}
 
 )

 which implies that the Socket timeout was masking this issue. The 2000
 was just a huge payload that DD was unable to parse in time ( or was slow
 to upload etc ). Now we could go lower but that makes less sense. We could
 play with
 https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html#system-scope
 to reduce the size of the tags ( or keys ).









 On Tue, Mar 23, 2021 at 11:33 AM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> If we look at this
> 
> code , the metrics are divided into chunks up-to a max size. and
> enqueued
> .
> The Request
> 
> has a 3 second read/connect/write timeout which IMHO should have been
> configurable ( or is it ) . While the number metrics ( all metrics )
> exposed by flink cluster is pretty high ( and the names of the metrics
> along with tags ) , it may make sense to limit the number of metrics in a
> single chunk ( to ultimately limit the size of a single chunk ). There is
> this configuration which allows for reducing the metrics in a single chunk
>
> metrics.reporter.dghttp.maxMetricsPerRequest: 2000
>
> We could decrease this to 1500 ( 1500 is pretty, not based on any
> empirical reasoning ) and see if that stabilizes the dispatch. It is
> inevitable that the number of requests will grow and we may hit the
> throttle but then we know the exception rather than the timeouts that are
> generally less intuitive.
>
> Any thoughts?
>
>
>
> On Mon, Mar 22, 2021 at 10:37 AM Arvid Heise  wrote:
>
>> Hi Vishal,
>>
>> I have no experience in the Flink+DataDog setup but worked a bit with
>> DataDog before.
>> I'd agree that the timeout does not seem like a rate limit. It would
>> also be odd that the other TMs with a similar rate still pass. So I'd
>> suspect n/w issues.
>> Can you log into the TM's machine and try out manually how the system
>> behaves?
>>

Re: flink sql jmh failure

2021-03-24 Thread jie mei
Hi, Yik San

I use a library wroten by myself and trying to verify the performance.


Yik San Chan  于2021年3月24日周三 下午9:07写道:

> Hi Jie,
>
> I am curious what library do you use to get the ClickHouseTableBuilder
>
> On Wed, Mar 24, 2021 at 8:41 PM jie mei  wrote:
>
>> Hi, Community
>>
>> I run a jmh benchmark task get blew error, which use flink sql consuming
>> data from data-gen connector(10_000_000) and write data to clickhouse. blew
>> is partly log and you can see completable log by attached file
>>
>> *My jmh benchmark code as blew:*
>>
>> @Benchmark
>> @Threads(1)
>> @Fork(1)
>> public void sinkBenchmark() throws IOException {
>>
>>   StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment
>>   .getExecutionEnvironment();
>>   streamEnv.enableCheckpointing(6);
>>
>>   EnvironmentSettings settings = EnvironmentSettings.newInstance()
>>   .useBlinkPlanner()
>>   .inStreamingMode().build();
>>   TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, 
>> settings);
>>
>>   // create clickhouse table
>>   new ClickHouseTableBuilder(tableEnv,
>>   parseSchema("clickhouse_sink_table.sql"))
>>   .database("benchmark")
>>   .table("bilophus_sink_benchmark")
>>   .address("jdbc:clickhouse://localhost:8123")
>>   .build();
>>
>>   // create mock data table
>>   tableEnv.executeSql(
>>   parseSchema("clickhouse_source_table.sql") +
>>   "WITH (" +
>>   "'connector' = 'datagen'," +
>>   "'number-of-rows' = '1000')");
>>
>>   tableEnv.executeSql(
>>   "INSERT INTO CLICKHOUSE_SINK_BENCHMARK SELECT '2020-12-12', * FROM 
>> CLICKHOUSE_SOURCE_BENCHMARK");
>>
>> }
>>
>> *running command:*
>>
>> mvn clean package -DskipTests
>>
>> 
>>   org.codehaus.mojo
>>   exec-maven-plugin
>>   1.6.0
>>   
>> 
>>   test-benchmarks
>>   test
>>   
>> exec
>>   
>> 
>>   
>>   
>> false
>> test
>> java
>> 
>>   -Xmx6g
>>   -classpath
>>   
>>   org.openjdk.jmh.Main
>>   
>>   -foe
>>   true
>>   
>>   -f
>>   1
>>   -i
>>   1
>>   -wi
>>   0
>>   -rf
>>   csv
>>   .*
>> 
>>   
>> 
>>
>>
>> Non-finished threads:
>>
>> Thread[Source: TableSourceScan(table=[[default_catalog, default_database,
>> CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint,
>> first_int, second_int, first_float, second_float, first_double,
>> second_double, first_string, s
>> econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0,
>> first_bigint, second_bigint, first_int, second_int, first_float,
>> second_float, first_double, second_double, first_string, second_string]) ->
>> Sink: Sink(table=[default_catal
>> og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint,
>> second_bigint, first_int, second_int, first_float, second_float,
>> first_double, second_double, first_string, second_string]) (1/6),5,Flink
>> Task Threads]
>>  at sun.misc.Unsafe.park(Native Method)
>>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>>  at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
>>
>>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>  at java.lang.Thread.run(Thread.java:748)
>>
>> Thread[flink-akka.actor.default-dispatcher-8,5,main]
>>  at sun.misc.Unsafe.park(Native Method)
>>  at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
>>  at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>  at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>> Thread[flink-akka.actor.default-dispatcher-2,5,main]
>>  at sun.misc.Unsafe.park(Native Method)
>>  at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
>>  at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>  at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>> Thread[Source: TableSourceScan(table=[[default_catalog, default_database,
>> CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint,
>> first_int, second_int, first_float, second_float, first_double,
>> second_double, first_string, s
>> econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0,
>> first_bigint, second_bigint, first_int, 

Re: flink sql jmh failure

2021-03-24 Thread Yik San Chan
Hi Jie,

I am curious what library do you use to get the ClickHouseTableBuilder

On Wed, Mar 24, 2021 at 8:41 PM jie mei  wrote:

> Hi, Community
>
> I run a jmh benchmark task get blew error, which use flink sql consuming
> data from data-gen connector(10_000_000) and write data to clickhouse. blew
> is partly log and you can see completable log by attached file
>
> *My jmh benchmark code as blew:*
>
> @Benchmark
> @Threads(1)
> @Fork(1)
> public void sinkBenchmark() throws IOException {
>
>   StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment
>   .getExecutionEnvironment();
>   streamEnv.enableCheckpointing(6);
>
>   EnvironmentSettings settings = EnvironmentSettings.newInstance()
>   .useBlinkPlanner()
>   .inStreamingMode().build();
>   TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, 
> settings);
>
>   // create clickhouse table
>   new ClickHouseTableBuilder(tableEnv,
>   parseSchema("clickhouse_sink_table.sql"))
>   .database("benchmark")
>   .table("bilophus_sink_benchmark")
>   .address("jdbc:clickhouse://localhost:8123")
>   .build();
>
>   // create mock data table
>   tableEnv.executeSql(
>   parseSchema("clickhouse_source_table.sql") +
>   "WITH (" +
>   "'connector' = 'datagen'," +
>   "'number-of-rows' = '1000')");
>
>   tableEnv.executeSql(
>   "INSERT INTO CLICKHOUSE_SINK_BENCHMARK SELECT '2020-12-12', * FROM 
> CLICKHOUSE_SOURCE_BENCHMARK");
>
> }
>
> *running command:*
>
> mvn clean package -DskipTests
>
> 
>   org.codehaus.mojo
>   exec-maven-plugin
>   1.6.0
>   
> 
>   test-benchmarks
>   test
>   
> exec
>   
> 
>   
>   
> false
> test
> java
> 
>   -Xmx6g
>   -classpath
>   
>   org.openjdk.jmh.Main
>   
>   -foe
>   true
>   
>   -f
>   1
>   -i
>   1
>   -wi
>   0
>   -rf
>   csv
>   .*
> 
>   
> 
>
>
> Non-finished threads:
>
> Thread[Source: TableSourceScan(table=[[default_catalog, default_database,
> CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint,
> first_int, second_int, first_float, second_float, first_double,
> second_double, first_string, s
> econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0,
> first_bigint, second_bigint, first_int, second_int, first_float,
> second_float, first_double, second_double, first_string, second_string]) ->
> Sink: Sink(table=[default_catal
> og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint,
> second_bigint, first_int, second_int, first_float, second_float,
> first_double, second_double, first_string, second_string]) (1/6),5,Flink
> Task Threads]
>  at sun.misc.Unsafe.park(Native Method)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>
>  at
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
>
>  at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298)
>
>  at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
>
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>  at java.lang.Thread.run(Thread.java:748)
>
> Thread[flink-akka.actor.default-dispatcher-8,5,main]
>  at sun.misc.Unsafe.park(Native Method)
>  at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
>  at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> Thread[flink-akka.actor.default-dispatcher-2,5,main]
>  at sun.misc.Unsafe.park(Native Method)
>  at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
>  at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> Thread[Source: TableSourceScan(table=[[default_catalog, default_database,
> CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint,
> first_int, second_int, first_float, second_float, first_double,
> second_double, first_string, s
> econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0,
> first_bigint, second_bigint, first_int, second_int, first_float,
> second_float, first_double, second_double, first_string, second_string]) ->
> Sink: Sink(table=[default_catal
> og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint,
> second_bigint, first_int, second_int, first_float, second_float,
> 

Re: With the checkpoint interval of the same size, the Flink 1.12 version of the job checkpoint time-consuming increase and production failure, the Flink1.9 job is running normally

2021-03-24 Thread Congxian Qiu
Hi
From the description, the time used to complete the checkpoint in 1.12
is longer. could you share more detail about the time consumption when
running job on 1.9 and 1.12?
Best,
Congxian


Haihang Jing  于2021年3月23日周二 下午7:22写道:

> 【Appearance】For jobs with the same configuration (checkpoint interval: 3
> minutes, job logic: regular join), flink1.9 runs normally. After flink1.12
> runs for a period of time, the checkpoint creation time increases, and
> finally the checkpoint creation fails.
>
> 【Analysis】After learning flink1.10, the checkpoint mechanism is adjusted.
> The receiver will not cache the data after a single barrier arrives when
> the
> barrier is aligned, which means that the sender must wait for credit
> feedback to transmit data after the barrier is aligned, so the sender will
> generate certain The cold start of Flink affects the delay and network
> throughput. Therefore, the checkpoint interval is adjusted to 10 minutes
> for
> comparative testing, and it is found that after the adjustment (interval is
> 10), the job running on flink 1.12 is running normally.
>
> issue:https://issues.apache.org/jira/browse/FLINK-16404
>
> 【Question】1.Have you encountered the same problem?
>2.Can  flink1.12 set small checkpoint interval?
>
> The checkpoint interval is 3 minutes after the flink1.12 job runs for 5
> hours, the checkpoint creation fails, the specific exception stack:
> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
> failure threshold.
>
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:96)
>
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:65)
>
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1924)
>
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1897)
>
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:93)
>
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2038)
>
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>
> at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Pyflink tutorial output

2021-03-24 Thread Robert Cullen
Ah, there they are.  Thanks!

On Tue, Mar 23, 2021 at 10:26 PM Dian Fu  wrote:

> How did you check the output when submitting to the kubernetes session
> cluster? I ask this because the output should be written to the local
> directory “/tmp/output” on the TaskManagers where the jobs are running on.
>
> Regards,
> Dian
>
> 2021年3月24日 上午2:40,Robert Cullen  写道:
>
> I’m running this script taken from the Flink website: tutorial.py
>
> python tutorial.py
>
> from pyflink.common.serialization import SimpleStringEncoder
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.connectors import StreamingFileSink
>
> def tutorial():
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> ds = env.from_collection(
> collection=[(1, 'aaa'), (2, 'bbb')],
> type_info=Types.ROW([Types.INT(), Types.STRING()]))
> ds.add_sink(StreamingFileSink
> .for_row_format('/tmp/output', SimpleStringEncoder())
> .build())
> env.execute("tutorial_job")
>
> if __name__ == '__main__':
> tutorial()
>
> It correctly outputs a part file to the /tmp/output directory when I run
> it locally. However when I run this on my kubernetes session cluster there
> is no output. Any ideas?
>
> ./bin/flink run \
> --target kubernetes-session \
> -Dkubernetes.cluster-id=flink-jobmanager -Dkubernetes.namespace=cmdaa \
> --pyModule tutorial \
> --pyFiles /opt/flink-1.12.0/examples/tutorial.py \
> --detached
>
> --
> Robert Cullen
> 240-475-4490
>
>
>

-- 
Robert Cullen
240-475-4490


Re: flink sql count distonct 优化

2021-03-24 Thread Robin Zhang
Hi,guomuhua
  开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。

Best,
Robin


guomuhua wrote
> 在SQL中,如果开启了 local-global 参数:set
> table.optimizer.agg-phase-strategy=TWO_PHASE;
> 或者开启了Partial-Final 参数:set table.optimizer.distinct-agg.split.enabled=true;
>  set
> table.optimizer.distinct-agg.split.bucket-num=1024;
> 还需要对应的将SQL改写为两段式吗?
> 例如:
> 原SQL:
> SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,
> 
> 对所需DISTINCT字段buy_id模1024自动打散后,SQL:
> SELECT day, SUM(cnt) total
> FROM (
> SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
> FROM T GROUP BY day, MOD(buy_id, 1024))
> GROUP BY day
> 
> 还是flink会帮我自动改写SQL,我不用关心?
> 
> 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
> http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png;
>  
> 
> 
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/





--
Sent from: http://apache-flink.147419.n8.nabble.com/


yarn模式下应用的core/hdfs/yarn site不生效

2021-03-24 Thread 崔博
hi all
如果使用yarn-cluster模式提交认为,并没有将应用下的hdfs-site coresite 
yarnsite上传依赖,而是使用yarn集群的默认配置。如果yarn集群的默认配置和应用的配置存在差异化,目前只能通过-yt解决。
为什么不上传这些依赖,只上传了flink-conf.yaml?

Re: Fault Tolerance with RocksDBStateBackend

2021-03-24 Thread Maminspapin
Ok, thank you, Guowei Ma 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: FileSystemTableSink支持自定义分隔符写入

2021-03-24 Thread easonliu30624700
通过设置properties(csv.field-delimiter)可以指定分隔符。不过只能指定单字符。多字符分割不支持。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?

2021-03-24 Thread vishalovercome
Let me make the example more concrete. Say O1 gets as input a data stream T1
which it splits into two using some function and produces DataStreams of
type T2 and T3, each of which are partitioned by the same key function TK.
Now after O2 processes a stream, it could sometimes send the stream to O3
(T4) using the same key function again. Now I want to know whether: 

1. Data from streams T3 with key K and T4 with key K end up affecting the
state variables for the same key K or different. I would think that would be
the case but wanted a confirmation
2. An explicit join is needed or not, i.e. whether this will achieve what I
want:

result2 = T1.filter(fn2).keyBy(TK).map(richfn2).keyBy(TK).map(whatever O3
does)
result3 = T1.filter(fn3).keyBy(TK).map(whatever O3 does)




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


回复: flink 1.12.0 k8s session部署异常

2021-03-24 Thread 18756225...@163.com
我也遇到这个问题,集群可以提交正常提交任务,但是jobmanager的日志一直有这个, 请问可有办法解决?

 
发件人: casel.chen
发送时间: 2021-02-07 16:33
收件人: user-zh@flink.apache.org
主题: flink 1.12.0 k8s session部署异常
在k8s上部署sesson模式的flink集群遇到jobmanager报如下错误,请问这是什么原因造成的?要如何fix?
 
 
2021-02-07 08:21:41,873 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Starting RPC endpoint for 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher at 
akka://flink/user/rpc/dispatcher_1 .
2021-02-07 08:21:43,506 WARN  
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint   [] - Unhandled 
exception
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:1.8.0_275]
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) 
~[?:1.8.0_275]
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) 
~[?:1.8.0_275]
at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_275]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) 
~[?:1.8.0_275]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
2021-02-07 08:21:43,940 WARN  
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint   [] - Unhandled 
exception
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:1.8.0_275]
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) 
~[?:1.8.0_275]
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) 
~[?:1.8.0_275]
at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_275]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) 
~[?:1.8.0_275]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]


Re: Flink 消费kafka ,写ORC文件

2021-03-24 Thread Jacob
谢谢回复

简单实现了一下BucketAssigner,可以实现需求


@Override
public String getBucketId(Map element, Context context) 
{
if(context.timestamp() - context.currentProcessingTime() < 0) {
return "dt="+context.timestamp();
}
return null;
}



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Fault Tolerance with RocksDBStateBackend

2021-03-24 Thread Maminspapin
Hi everyone,

I want to build a flink cluster with 3 machines.
What if I choose RocksDBStateBackend with next settings:

#==
# Fault tolerance and checkpointing
#==

state.backend: rocksdb
state.checkpoints.dir: file:///home/flink/checkpoints
state.backend.incremental: true
jobmanager.execution.failover-strategy: region


Is it necessary to use distributed file systems like hdfs to provide fault
tolerance (i.e. state.checkpoints.dir: hdfs:///home/flink/checkpoints)?

There is no fault tolerance with file:// schema?

Thanks,
Yuri L.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


退订

2021-03-24 Thread hongton122
退订中文邮件列表

Re: Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?

2021-03-24 Thread David Anderson
For an example of a similar join implemented as a RichCoFlatMap, see [1].
For more background, the Flink docs have a tutorial [2] on how to work with
connected streams.

[1] https://github.com/apache/flink-training/tree/master/rides-and-fares
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/learn-flink/etl.html#connected-streams

On Wed, Mar 24, 2021 at 8:55 AM Matthias Pohl 
wrote:

> 1. yes - the same key would affect the same state variable
> 2. you need a join to have the same operator process both streams
>
> Matthias
>
> On Wed, Mar 24, 2021 at 7:29 AM vishalovercome 
> wrote:
>
>> Let me make the example more concrete. Say O1 gets as input a data stream
>> T1
>> which it splits into two using some function and produces DataStreams of
>> type T2 and T3, each of which are partitioned by the same key function TK.
>> Now after O2 processes a stream, it could sometimes send the stream to O3
>> (T4) using the same key function again. Now I want to know whether:
>>
>> 1. Data from streams T3 with key K and T4 with key K end up affecting the
>> state variables for the same key K or different. I would think that would
>> be
>> the case but wanted a confirmation
>> 2. An explicit join is needed or not, i.e. whether this will achieve what
>> I
>> want:
>>
>> result2 = T1.filter(fn2).keyBy(TK).map(richfn2).keyBy(TK).map(whatever O3
>> does)
>> result3 = T1.filter(fn3).keyBy(TK).map(whatever O3 does)
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>


Re: DataDog and Flink

2021-03-24 Thread Arvid Heise
Hi Vishal,

REST API is the most direct way to get through all metrics as Matthias
pointed out. Additionally, you could also add a JMX reporter and log to the
machines to check.

But in general, I think you are on the right track. You need to reduce the
metrics that are sent to DD by configuring the scope / excluding variables.

Furthermore, I think it would be a good idea to make the timeout
configurable. Could you open a ticket for that?

Best,

Arvid

On Wed, Mar 24, 2021 at 9:02 AM Matthias Pohl 
wrote:

> Hi Vishal,
> what about the TM metrics' REST endpoint [1]. Is this something you could
> use to get all the metrics for a specific TaskManager? Or are you looking
> for something else?
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/rest_api.html#taskmanagers-metrics
>
> On Tue, Mar 23, 2021 at 10:59 PM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> That said, is there a way to get a dump of all metrics exposed by TM. I
>> was searching for it and I bet we could get it for ServieMonitor on k8s (
>> scrape ) but am missing a way to het a TM and dump all metrics that are
>> pushed.
>>
>> Thanks and regards.
>>
>> On Tue, Mar 23, 2021 at 5:56 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> I guess there is a bigger issue here. We dropped the property to 500. We
>>> also realized that this failure happened on a TM that had one specific job
>>> running on it. What was good ( but surprising ) that the exception was the
>>> more protocol specific 413  ( as in the chunk is greater then some size
>>> limit DD has on a request.
>>>
>>> Failed to send request to Datadog (response was Response{protocol=h2,
>>> code=413, message=, url=
>>> https://app.datadoghq.com/api/v1/series?api_key=**}
>>> 
>>> )
>>>
>>> which implies that the Socket timeout was masking this issue. The 2000
>>> was just a huge payload that DD was unable to parse in time ( or was slow
>>> to upload etc ). Now we could go lower but that makes less sense. We could
>>> play with
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html#system-scope
>>> to reduce the size of the tags ( or keys ).
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Mar 23, 2021 at 11:33 AM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 If we look at this
 
 code , the metrics are divided into chunks up-to a max size. and
 enqueued
 .
 The Request
 
 has a 3 second read/connect/write timeout which IMHO should have been
 configurable ( or is it ) . While the number metrics ( all metrics )
 exposed by flink cluster is pretty high ( and the names of the metrics
 along with tags ) , it may make sense to limit the number of metrics in a
 single chunk ( to ultimately limit the size of a single chunk ). There is
 this configuration which allows for reducing the metrics in a single chunk

 metrics.reporter.dghttp.maxMetricsPerRequest: 2000

 We could decrease this to 1500 ( 1500 is pretty, not based on any
 empirical reasoning ) and see if that stabilizes the dispatch. It is
 inevitable that the number of requests will grow and we may hit the
 throttle but then we know the exception rather than the timeouts that are
 generally less intuitive.

 Any thoughts?



 On Mon, Mar 22, 2021 at 10:37 AM Arvid Heise  wrote:

> Hi Vishal,
>
> I have no experience in the Flink+DataDog setup but worked a bit with
> DataDog before.
> I'd agree that the timeout does not seem like a rate limit. It would
> also be odd that the other TMs with a similar rate still pass. So I'd
> suspect n/w issues.
> Can you log into the TM's machine and try out manually how the system
> behaves?
>
> On Sat, Mar 20, 2021 at 1:44 PM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> Hello folks,
>>   This is quite strange. We see a TM stop reporting
>> metrics to DataDog .The logs from that specific TM  for every
>> DataDog dispatch time out with* java.net.SocketTimeoutException:
>> timeout *and that seems to repeat over every dispatch to DataDog. It
>> seems it is on a 10 seconds cadence per container. The TM remains 
>> humming,
>> so 

Re: DataDog and Flink

2021-03-24 Thread Matthias Pohl
Hi Vishal,
what about the TM metrics' REST endpoint [1]. Is this something you could
use to get all the metrics for a specific TaskManager? Or are you looking
for something else?

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/rest_api.html#taskmanagers-metrics

On Tue, Mar 23, 2021 at 10:59 PM Vishal Santoshi 
wrote:

> That said, is there a way to get a dump of all metrics exposed by TM. I
> was searching for it and I bet we could get it for ServieMonitor on k8s (
> scrape ) but am missing a way to het a TM and dump all metrics that are
> pushed.
>
> Thanks and regards.
>
> On Tue, Mar 23, 2021 at 5:56 PM Vishal Santoshi 
> wrote:
>
>> I guess there is a bigger issue here. We dropped the property to 500. We
>> also realized that this failure happened on a TM that had one specific job
>> running on it. What was good ( but surprising ) that the exception was the
>> more protocol specific 413  ( as in the chunk is greater then some size
>> limit DD has on a request.
>>
>> Failed to send request to Datadog (response was Response{protocol=h2,
>> code=413, message=, url=
>> https://app.datadoghq.com/api/v1/series?api_key=**}
>> 
>> )
>>
>> which implies that the Socket timeout was masking this issue. The 2000
>> was just a huge payload that DD was unable to parse in time ( or was slow
>> to upload etc ). Now we could go lower but that makes less sense. We could
>> play with
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html#system-scope
>> to reduce the size of the tags ( or keys ).
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Tue, Mar 23, 2021 at 11:33 AM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> If we look at this
>>> 
>>> code , the metrics are divided into chunks up-to a max size. and
>>> enqueued
>>> .
>>> The Request
>>> 
>>> has a 3 second read/connect/write timeout which IMHO should have been
>>> configurable ( or is it ) . While the number metrics ( all metrics )
>>> exposed by flink cluster is pretty high ( and the names of the metrics
>>> along with tags ) , it may make sense to limit the number of metrics in a
>>> single chunk ( to ultimately limit the size of a single chunk ). There is
>>> this configuration which allows for reducing the metrics in a single chunk
>>>
>>> metrics.reporter.dghttp.maxMetricsPerRequest: 2000
>>>
>>> We could decrease this to 1500 ( 1500 is pretty, not based on any
>>> empirical reasoning ) and see if that stabilizes the dispatch. It is
>>> inevitable that the number of requests will grow and we may hit the
>>> throttle but then we know the exception rather than the timeouts that are
>>> generally less intuitive.
>>>
>>> Any thoughts?
>>>
>>>
>>>
>>> On Mon, Mar 22, 2021 at 10:37 AM Arvid Heise  wrote:
>>>
 Hi Vishal,

 I have no experience in the Flink+DataDog setup but worked a bit with
 DataDog before.
 I'd agree that the timeout does not seem like a rate limit. It would
 also be odd that the other TMs with a similar rate still pass. So I'd
 suspect n/w issues.
 Can you log into the TM's machine and try out manually how the system
 behaves?

 On Sat, Mar 20, 2021 at 1:44 PM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> Hello folks,
>   This is quite strange. We see a TM stop reporting
> metrics to DataDog .The logs from that specific TM  for every DataDog
> dispatch time out with* java.net.SocketTimeoutException: timeout *and
> that seems to repeat over every dispatch to DataDog. It seems it is on a 
> 10
> seconds cadence per container. The TM remains humming, so does not seem to
> be under memory/CPU distress. And the exception is *not* transient.
> It just stops dead and from there on timeout.
>
> Looking at SLA provided by DataDog any throttling exception should
> pretty much not be a SocketTimeOut, till of course the reporting the
> specific issue is off. This thus appears very much a n/w issue which
> appears weird as other TMs with the same n/w just hum along, sending their
> metrics successfully. The other issue could be just the amount of metrics
> and the current volume for the TM is prohibitive. That said the exception
> is still not helpful.
>
> Any ideas from folks who have used 

Re: Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?

2021-03-24 Thread Matthias Pohl
1. yes - the same key would affect the same state variable
2. you need a join to have the same operator process both streams

Matthias

On Wed, Mar 24, 2021 at 7:29 AM vishalovercome  wrote:

> Let me make the example more concrete. Say O1 gets as input a data stream
> T1
> which it splits into two using some function and produces DataStreams of
> type T2 and T3, each of which are partitioned by the same key function TK.
> Now after O2 processes a stream, it could sometimes send the stream to O3
> (T4) using the same key function again. Now I want to know whether:
>
> 1. Data from streams T3 with key K and T4 with key K end up affecting the
> state variables for the same key K or different. I would think that would
> be
> the case but wanted a confirmation
> 2. An explicit join is needed or not, i.e. whether this will achieve what I
> want:
>
> result2 = T1.filter(fn2).keyBy(TK).map(richfn2).keyBy(TK).map(whatever O3
> does)
> result3 = T1.filter(fn3).keyBy(TK).map(whatever O3 does)
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Streaming Counter

2021-03-24 Thread Matthias Pohl
Hi Vijayendra,
what about the example from the docs you already referred to [1]?

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html#counter

On Tue, Mar 23, 2021 at 6:48 PM Vijayendra Yadav 
wrote:

> Hi Pohl,
>
> Thanks for getting back to me so quickly. I am looking for a sample
> example where I can increment counters on each stage #1 thru #3 for
> DATASTREAM.
> Then probably I can print it using slf4j.
>
> Thanks,
> Vijay
>
> On Tue, Mar 23, 2021 at 6:35 AM Matthias Pohl 
> wrote:
>
>> Hi Vijayendra,
>> thanks for reaching out to the Flink community. What do you mean by
>> displaying it in your local IDE? Would it be ok to log the information out
>> onto stdout? You might want to have a look at the docs about setting up a
>> slf4j metrics report [1] if that's the case.
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter
>>
>> On Tue, Mar 23, 2021 at 2:09 AM Vijayendra Yadav 
>> wrote:
>>
>>> Hi Team,
>>>
>>> Could you provide a sample how to pass Flink Datastream Source and sink
>>> results to increment COUNTER and then I want to display the Counter in
>>> Local IDE.
>>> Counter to display for #1 through #3.
>>>
>>> 1) DataStream messageStream = env.addSource(Kinesis Source);
>>> 2) DataStream outputStream =
>>> messageStream.rebalance().map(CustomMapFunction());
>>> 3) outputStream.addSink(Streaming File Sink).
>>>
>>> public class MyMapper extends RichMapFunction {
>>>   private transient Counter counter;
>>>
>>>   @Override
>>>   public void open(Configuration config) {
>>> this.counter = getRuntimeContext()
>>>   .getMetricGroup()
>>>   .counter("myCounter");
>>>   }
>>>
>>>   @Override
>>>   public String map(String value) throws Exception {
>>> this.counter.inc();
>>> return value;
>>>   }}
>>>
>>>
>>> Thanks,
>>> Vijay
>>>
>>


FileSystemTableSink支持自定义分隔符写入

2021-03-24 Thread 刘医生
Hi,有个疑问:
FileSystemTableSink 目前看是有csv和json的普通文本写入。
后续能支持配置 “字段分隔符” 写入文件系统吗?


Re:Re: About Memory Spilling to Disk in Flink

2021-03-24 Thread Roc Marshal
Hi, Guowei Ma.
As far as I know, flink writes some in-memory data to disk when memory is 
running low. I noticed that flink uses ExternalSorterBuilder for batch 
operations in the org.apache.flink.runtime.operator.sort package, but I'm 
curious to confirm if this technique is also used in stream mode.
Thank you.


Best, 
Roc













At 2021-03-24 15:01:48, "Guowei Ma"  wrote:

Hi, Roc
Could you explain more about your question? 

Best,
Guowei




On Wed, Mar 24, 2021 at 2:47 PM Roc Marshal  wrote:

Hi, 


Can someone tell me where flink uses memory spilling to write to disk? 
Thank you.


Best, Roc.




 

Re: interval join 如何用 process time

2021-03-24 Thread Smile
你好,DataStream API 中的 Interval Join 目前还不支持 process time,参考 [1].
不过如果不要去严格准确的 process time 的话,是否可以在 Join 之前把 process time 用某个字段带出来,当 event
time 用?

[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/joining.html#interval-join



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: About Memory Spilling to Disk in Flink

2021-03-24 Thread Guowei Ma
Hi, Roc
Could you explain more about your question?
Best,
Guowei


On Wed, Mar 24, 2021 at 2:47 PM Roc Marshal  wrote:

> Hi,
>
> Can someone tell me where flink uses memory spilling to write to disk?
> Thank you.
>
> Best, Roc.
>
>
>
>


About Memory Spilling to Disk in Flink

2021-03-24 Thread Roc Marshal
Hi, 


Can someone tell me where flink uses memory spilling to write to disk? 
Thank you.


Best, Roc.

Re: Fault Tolerance with RocksDBStateBackend

2021-03-24 Thread Guowei Ma
Hi,
You need some persistent storages(like hdfs) for the checkpoint. It is
Flink's fault tolerance prerequisites.[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/checkpointing.html#prerequisites
Best,
Guowei


On Wed, Mar 24, 2021 at 1:21 PM Maminspapin  wrote:

> Hi everyone,
>
> I want to build a flink cluster with 3 machines.
> What if I choose RocksDBStateBackend with next settings:
>
>
> #==
> # Fault tolerance and checkpointing
>
> #==
>
> state.backend: rocksdb
> state.checkpoints.dir: file:///home/flink/checkpoints
> state.backend.incremental: true
> jobmanager.execution.failover-strategy: region
>
>
> Is it necessary to use distributed file systems like hdfs to provide fault
> tolerance (i.e. state.checkpoints.dir: hdfs:///home/flink/checkpoints)?
>
> There is no fault tolerance with file:// schema?
>
> Thanks,
> Yuri L.
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint

2021-03-24 Thread Yang Wang
Hi Alexey,

>From your attached logs, I do not think the new start JobManager will
recover from the wrong savepoint.
Because you could find the following logs to indicate that the HA related
ConfigMaps have been cleaned up successfully.

1393 {"ts":"2021-03-20T02:02:18.506Z","message":"Finished cleaning up the
high availability
data.","logger_name":"org.apache.flink.kubernetes.highavailability.KubernetesHaServices","thread_name":"AkkaRpcService-Supervisor-Termination-Future-Executor-thread-1","level":"INFO","level_value":2}


So, now your question is why the JobManager pod exited with code 2?
It seems that "clusterEntrypoint.getTerminationFuture().get()" failed with
a timeout when terminating.

Alexey Trenikhun  于2021年3月20日周六 上午10:16写道:

> Hi Yang,
> I think I've reproduced the problem - job was cancelled, however something
> went wrong, JM exited with code 2, and as result Kubernetes restarted JM
> pod. JM log is attached.
>
> Thanks,
> Alexey
> --
> *From:* Yang Wang 
> *Sent:* Monday, March 15, 2021 1:26 AM
> *To:* Alexey Trenikhun 
> *Cc:* Flink User Mail List 
> *Subject:* Re: Kubernetes HA - attempting to restore from wrong
> (non-existing) savepoint
>
> Feel free to share the terminated JobManager logs if you could reproduce
> this issue again.
> Maybe "kubectl logs {pod_name} --previous" could help.
>
>
> Best,
> Yang
>
> Alexey Trenikhun  于2021年3月15日周一 下午2:28写道:
>
> With 1.12.1 it happened quite often, with 1.12.2 not that match, I think I
> saw it once or twice for ~20 cancels, when it happened, job actually
> restarted on cancel, did not grab log at that time, but chances good that I
> will able to reproduce.
> Thanks,
> Alexey
>
> --
> *From:* Yang Wang 
> *Sent:* Sunday, March 14, 2021 7:50:21 PM
> *To:* Alexey Trenikhun 
> *Cc:* Flink User Mail List 
> *Subject:* Re: Kubernetes HA - attempting to restore from wrong
> (non-existing) savepoint
>
> If the HA related ConfigMaps still exists, then I am afraid the data
> located on the distributed storage should also exist.
> So I suggest to delete the HA related storage as well.
>
> Delete all the HA related data manually should help in your current
> situation. After then you could recover from the new savepoint.
> However, I do not think this is a normal behavior. Since when the
> application reached the terminal state(e.g. FINISHED, FAILED, CANCELLED),
> all HA related data should be cleaned up automatically.
>
> Could you help to provide the JobManager logs when you are trying to
> cancel the job? I believe using `kubectl logs -f {pod_name}` could dump
> the logs in real time.
>
> Best,
> Yang
>
> Alexey Trenikhun  于2021年3月12日周五 上午12:47写道:
>
> Hi Yang,
> Upgrade procedure uses POST /jobs/:jobid/savepoints with cancel-job=true,
> but perhaps I hit FLINK-21028. This lead to question, if normal via API
> take-savepoint-and-cancel-job fails, what steps should be done outside
> Flink to be able to resume from savepoint with new job version? Is deleting
> Kubernetes Job and HA configmaps enough, or something in persisted storage
> should be deleted as well?
>
> Thanks,
> Alexey
> --
> *From:* Yang Wang 
> *Sent:* Thursday, March 11, 2021 2:59 AM
> *To:* Alexey Trenikhun 
> *Cc:* Flink User Mail List 
> *Subject:* Re: Kubernetes HA - attempting to restore from wrong
> (non-existing) savepoint
>
> Hi Alexey,
>
> From your attached logs, it seems that the leader related config map is
> reused.
> Then the Flink application is recovered instead of submitting a new one.
> This is
> the root cause it is trying to recover from a wrong savepoint which is
> specified in
> your last submission.
>
> > So how to fix this?
> If you want to stop the application, I strongly suggest to cancel the
> flink job with savepoint
> instead of directly deleting all the K8s resources. After then, you will
> find that the leader
> related config maps will be deleted automatically after the job is
> cancelled.
>
> Best,
> Yang
>
> Alexey Trenikhun  于2021年3月10日周三 下午12:16写道:
>
> Hi Yang,
> The problem is re-occurred, full JM log is attached
>
> Thanks,
> Alexey
> --
> *From:* Yang Wang 
> *Sent:* Sunday, February 28, 2021 10:04 PM
> *To:* Alexey Trenikhun 
> *Cc:* Flink User Mail List 
> *Subject:* Re: Kubernetes HA - attempting to restore from wrong
> (non-existing) savepoint
>
> Hi Alexey,
>
> It seems that the KubernetesHAService works well since all the checkpoints
> have been cleaned up when the job is canceled.
> And we could find related logs "Found 0 checkpoints in
> KubernetesStateHandleStore{configMapName='gsp--jobmanager-leader'}.".
>
> However, it is a little strange that the CheckpointCoordinator is
> recovering from a wrong savepoint path. Could you share the
> full JobManager logs? One possible reason I could guess is the application
> cluster entrypoint is not creating a new JobGraph from the specified
> arguments.
>