Querying a database in a page able manner

2024-02-02 Thread Lasse Nedergaard
Hi. 
I have a case where I consume queries from a Kafka topic. I have to execute the 
queries against a database in page able manner. I would like to send each 
result page downstream as the query can return millions of rows and I 
can’t/don’t want to wait until all rows are returned. 
Normally I would use an async operator for external call but I can’t see how it 
would be possible to complete the future many times. 
An alternative could be to implement the logic in a flat map function but then 
I’m not sure how to implement it in the best way. 

Any best practices for this kind of pattern?

Thanks in advance 

 Best regards
Lasse Nedergaard



Re: Re: Case for a Broadcast join + hint in Flink Streaming SQL

2024-02-02 Thread David Anderson
I've seen enough demand for a streaming broadcast join in the community to
justify a FLIP -- I think it's a good idea, and look forward to the
discussion.

David

On Fri, Feb 2, 2024 at 6:55 AM Feng Jin  wrote:

> +1 a FLIP for this topic.
>
>
> Best,
> Feng
>
> On Fri, Feb 2, 2024 at 10:26 PM Martijn Visser 
> wrote:
>
>> Hi,
>>
>> I would definitely expect a FLIP on this topic before moving to
>> implementation.
>>
>> Best regards,
>>
>> Martijn
>>
>> On Fri, Feb 2, 2024 at 12:47 PM Xuyang  wrote:
>>
>>> Hi, Prabhjot.
>>>
>>> IIUC, the main reasons why the community has not previously considered
>>> supporting join hints only in batch mode are as follows:
>>> 1. In batch mode, multiple join type algorithms were implemented quite
>>> early on, and
>>> 2. Stream processing represents a long-running scenario, and it is quite
>>> difficult to determine whether a small table will become a large table
>>> after a long period of operation.
>>>
>>> However, as you mentioned, join hints do indeed have their significance
>>> in streaming. If you want to support the implementation of "join hints +
>>> broadcast join" in streaming, the changes I can currently think of include:
>>> 1. At optimizer, changing the exchange on the small table side to
>>> broadcast instead of hash (InputProperty#BROADCAST).
>>> 2. Unknown changes required at the table runtime level.
>>>
>>> You can also discuss it within the community through JIRA, FLIP, or the
>>> dev mailing list.
>>>
>>>
>>> --
>>> Best!
>>> Xuyang
>>>
>>>
>>> At 2024-02-02 00:46:01, "Prabhjot Bharaj via user" <
>>> user@flink.apache.org> wrote:
>>>
>>> Hi Feng,
>>>
>>> Thanks for your prompt response.
>>> If we were to solve this in Flink, my higher level viewpoint is:
>>>
>>> 1. First to implement Broadcast join in Flink Streaming SQL, that works
>>> across Table api (e.g. via a `left.join(right, ,
>>> join_type="broadcast")
>>> 2. Then, support a Broadcast hint that would utilize this new join based
>>> on the hint type
>>>
>>> What do you think about this ?
>>> Would you have some pointers on how/where to start on the first part ?
>>> (I'm thinking we'd have to extend the Broadcast state pattern for this
>>> purpose)
>>>
>>> Thanks,
>>> Prabhjot
>>>
>>> On Thu, Feb 1, 2024 at 11:40 AM Feng Jin  wrote:
>>>
 Hi Prabhjot

 I think this is a reasonable scenario. If there is a large table and a
 very small table for regular join, without broadcasting the regular join,
 it can easily cause data skew.
 We have also encountered similar problems too. Currently, we can only
 copy multiple copies of the small table using the union all and append
 random values to alleviate data skewness.


 Best,
 Feng

 On Fri, Feb 2, 2024 at 12:24 AM Prabhjot Bharaj via user <
 user@flink.apache.org> wrote:

> Hello folks,
>
>
> We have a use case where we have a few stream-stream joins, requiring
> us to join a very large table with a much smaller table, essentially
> enriching the large table with a permutation on the smaller table 
> (Consider
> deriving all orders/sessions for a new location). Given the nature of the
> dataset, if we use a typical join that uses Hash distribution to co-locate
> the records for each join key, we end up with a very skewed join (a few
> task slots getting all of the work, as against a good distribution).
>
>
> We’ve internally implemented a Salting based solution where we salt
> the smaller table and join it with the larger table. While this works in
> the POC stage, we’d like to leverage flink as much as possible to do such 
> a
> join.
>
>
> By the nature of the problem, a broadcast join seems theoretically
> helpful. We’ve done an exploration on query hints supported in Flink,
> starting with this FLIP
> 
> and this FLIP
> 
> .
>
>
> Currently, the Optimizer doesn't consider the Broadcast hint in the
> `Exchange` step of the join, when creating the physical plan (Possibly
> because the hint would require the stream-stream join to also support
> Broadcast join with SQL)
>
>
> Notice that the Query AST (Abstract Syntax Tree) has the broadcast
> hint parsed from the query:
>
>
> ```sql
>
> ...
>
> ...
>
> joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0]
> options:[gpla)
>
> ...
>
> ```
>
>
> However, the Flink optimizer ignores the hint and still represents the
> join as a regular `hash` join in the `Exchange` step:
>
>
> ```sql
>
> ...
>
> ...
>
> :- Exchange(distribution=[hash[shop_id, join_key]])
>
> ...
>
> `

Re: Re: Case for a Broadcast join + hint in Flink Streaming SQL

2024-02-02 Thread Feng Jin
+1 a FLIP for this topic.


Best,
Feng

On Fri, Feb 2, 2024 at 10:26 PM Martijn Visser 
wrote:

> Hi,
>
> I would definitely expect a FLIP on this topic before moving to
> implementation.
>
> Best regards,
>
> Martijn
>
> On Fri, Feb 2, 2024 at 12:47 PM Xuyang  wrote:
>
>> Hi, Prabhjot.
>>
>> IIUC, the main reasons why the community has not previously considered
>> supporting join hints only in batch mode are as follows:
>> 1. In batch mode, multiple join type algorithms were implemented quite
>> early on, and
>> 2. Stream processing represents a long-running scenario, and it is quite
>> difficult to determine whether a small table will become a large table
>> after a long period of operation.
>>
>> However, as you mentioned, join hints do indeed have their significance
>> in streaming. If you want to support the implementation of "join hints +
>> broadcast join" in streaming, the changes I can currently think of include:
>> 1. At optimizer, changing the exchange on the small table side to
>> broadcast instead of hash (InputProperty#BROADCAST).
>> 2. Unknown changes required at the table runtime level.
>>
>> You can also discuss it within the community through JIRA, FLIP, or the
>> dev mailing list.
>>
>>
>> --
>> Best!
>> Xuyang
>>
>>
>> At 2024-02-02 00:46:01, "Prabhjot Bharaj via user" 
>> wrote:
>>
>> Hi Feng,
>>
>> Thanks for your prompt response.
>> If we were to solve this in Flink, my higher level viewpoint is:
>>
>> 1. First to implement Broadcast join in Flink Streaming SQL, that works
>> across Table api (e.g. via a `left.join(right, ,
>> join_type="broadcast")
>> 2. Then, support a Broadcast hint that would utilize this new join based
>> on the hint type
>>
>> What do you think about this ?
>> Would you have some pointers on how/where to start on the first part ?
>> (I'm thinking we'd have to extend the Broadcast state pattern for this
>> purpose)
>>
>> Thanks,
>> Prabhjot
>>
>> On Thu, Feb 1, 2024 at 11:40 AM Feng Jin  wrote:
>>
>>> Hi Prabhjot
>>>
>>> I think this is a reasonable scenario. If there is a large table and a
>>> very small table for regular join, without broadcasting the regular join,
>>> it can easily cause data skew.
>>> We have also encountered similar problems too. Currently, we can only
>>> copy multiple copies of the small table using the union all and append
>>> random values to alleviate data skewness.
>>>
>>>
>>> Best,
>>> Feng
>>>
>>> On Fri, Feb 2, 2024 at 12:24 AM Prabhjot Bharaj via user <
>>> user@flink.apache.org> wrote:
>>>
 Hello folks,


 We have a use case where we have a few stream-stream joins, requiring
 us to join a very large table with a much smaller table, essentially
 enriching the large table with a permutation on the smaller table (Consider
 deriving all orders/sessions for a new location). Given the nature of the
 dataset, if we use a typical join that uses Hash distribution to co-locate
 the records for each join key, we end up with a very skewed join (a few
 task slots getting all of the work, as against a good distribution).


 We’ve internally implemented a Salting based solution where we salt the
 smaller table and join it with the larger table. While this works in the
 POC stage, we’d like to leverage flink as much as possible to do such a
 join.


 By the nature of the problem, a broadcast join seems theoretically
 helpful. We’ve done an exploration on query hints supported in Flink,
 starting with this FLIP
 
 and this FLIP
 
 .


 Currently, the Optimizer doesn't consider the Broadcast hint in the
 `Exchange` step of the join, when creating the physical plan (Possibly
 because the hint would require the stream-stream join to also support
 Broadcast join with SQL)


 Notice that the Query AST (Abstract Syntax Tree) has the broadcast hint
 parsed from the query:


 ```sql

 ...

 ...

 joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0]
 options:[gpla)

 ...

 ```


 However, the Flink optimizer ignores the hint and still represents the
 join as a regular `hash` join in the `Exchange` step:


 ```sql

 ...

 ...

 :- Exchange(distribution=[hash[shop_id, join_key]])

 ...

 ```


 In Flink `StreamExecExchange`, the translation happens only via the
 `HASH` distribution type
 .
 unlike in the Flink `BatchExecExchange`, the translation can happen via a
 multitude of 

Re: Re: Case for a Broadcast join + hint in Flink Streaming SQL

2024-02-02 Thread Martijn Visser
Hi,

I would definitely expect a FLIP on this topic before moving to
implementation.

Best regards,

Martijn

On Fri, Feb 2, 2024 at 12:47 PM Xuyang  wrote:

> Hi, Prabhjot.
>
> IIUC, the main reasons why the community has not previously considered
> supporting join hints only in batch mode are as follows:
> 1. In batch mode, multiple join type algorithms were implemented quite
> early on, and
> 2. Stream processing represents a long-running scenario, and it is quite
> difficult to determine whether a small table will become a large table
> after a long period of operation.
>
> However, as you mentioned, join hints do indeed have their significance in
> streaming. If you want to support the implementation of "join hints +
> broadcast join" in streaming, the changes I can currently think of include:
> 1. At optimizer, changing the exchange on the small table side to
> broadcast instead of hash (InputProperty#BROADCAST).
> 2. Unknown changes required at the table runtime level.
>
> You can also discuss it within the community through JIRA, FLIP, or the
> dev mailing list.
>
>
> --
> Best!
> Xuyang
>
>
> At 2024-02-02 00:46:01, "Prabhjot Bharaj via user" 
> wrote:
>
> Hi Feng,
>
> Thanks for your prompt response.
> If we were to solve this in Flink, my higher level viewpoint is:
>
> 1. First to implement Broadcast join in Flink Streaming SQL, that works
> across Table api (e.g. via a `left.join(right, ,
> join_type="broadcast")
> 2. Then, support a Broadcast hint that would utilize this new join based
> on the hint type
>
> What do you think about this ?
> Would you have some pointers on how/where to start on the first part ?
> (I'm thinking we'd have to extend the Broadcast state pattern for this
> purpose)
>
> Thanks,
> Prabhjot
>
> On Thu, Feb 1, 2024 at 11:40 AM Feng Jin  wrote:
>
>> Hi Prabhjot
>>
>> I think this is a reasonable scenario. If there is a large table and a
>> very small table for regular join, without broadcasting the regular join,
>> it can easily cause data skew.
>> We have also encountered similar problems too. Currently, we can only
>> copy multiple copies of the small table using the union all and append
>> random values to alleviate data skewness.
>>
>>
>> Best,
>> Feng
>>
>> On Fri, Feb 2, 2024 at 12:24 AM Prabhjot Bharaj via user <
>> user@flink.apache.org> wrote:
>>
>>> Hello folks,
>>>
>>>
>>> We have a use case where we have a few stream-stream joins, requiring us
>>> to join a very large table with a much smaller table, essentially enriching
>>> the large table with a permutation on the smaller table (Consider deriving
>>> all orders/sessions for a new location). Given the nature of the dataset,
>>> if we use a typical join that uses Hash distribution to co-locate the
>>> records for each join key, we end up with a very skewed join (a few task
>>> slots getting all of the work, as against a good distribution).
>>>
>>>
>>> We’ve internally implemented a Salting based solution where we salt the
>>> smaller table and join it with the larger table. While this works in the
>>> POC stage, we’d like to leverage flink as much as possible to do such a
>>> join.
>>>
>>>
>>> By the nature of the problem, a broadcast join seems theoretically
>>> helpful. We’ve done an exploration on query hints supported in Flink,
>>> starting with this FLIP
>>> 
>>> and this FLIP
>>> 
>>> .
>>>
>>>
>>> Currently, the Optimizer doesn't consider the Broadcast hint in the
>>> `Exchange` step of the join, when creating the physical plan (Possibly
>>> because the hint would require the stream-stream join to also support
>>> Broadcast join with SQL)
>>>
>>>
>>> Notice that the Query AST (Abstract Syntax Tree) has the broadcast hint
>>> parsed from the query:
>>>
>>>
>>> ```sql
>>>
>>> ...
>>>
>>> ...
>>>
>>> joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0]
>>> options:[gpla)
>>>
>>> ...
>>>
>>> ```
>>>
>>>
>>> However, the Flink optimizer ignores the hint and still represents the
>>> join as a regular `hash` join in the `Exchange` step:
>>>
>>>
>>> ```sql
>>>
>>> ...
>>>
>>> ...
>>>
>>> :- Exchange(distribution=[hash[shop_id, join_key]])
>>>
>>> ...
>>>
>>> ```
>>>
>>>
>>> In Flink `StreamExecExchange`, the translation happens only via the
>>> `HASH` distribution type
>>> .
>>> unlike in the Flink `BatchExecExchange`, the translation can happen via a
>>> multitude of options
>>> 
>>> (`HASH/BROADCAST`).
>>>
>>>
>>>
>>> Quoting this F

Re:Re: Case for a Broadcast join + hint in Flink Streaming SQL

2024-02-02 Thread Xuyang
Hi, Prabhjot. 


IIUC, the main reasons why the community has not previously considered 
supporting join hints only in batch mode are as follows: 
1. In batch mode, multiple join type algorithms were implemented quite early 
on, and 
2. Stream processing represents a long-running scenario, and it is quite 
difficult to determine whether a small table will become a large table after a 
long period of operation. 


However, as you mentioned, join hints do indeed have their significance in 
streaming. If you want to support the implementation of "join hints + broadcast 
join" in streaming, the changes I can currently think of include: 
1. At optimizer, changing the exchange on the small table side to broadcast 
instead of hash (InputProperty#BROADCAST). 
2. Unknown changes required at the table runtime level. 


You can also discuss it within the community through JIRA, FLIP, or the dev 
mailing list.




--

Best!
Xuyang




At 2024-02-02 00:46:01, "Prabhjot Bharaj via user"  
wrote:

Hi Feng,


Thanks for your prompt response.
If we were to solve this in Flink, my higher level viewpoint is:


1. First to implement Broadcast join in Flink Streaming SQL, that works across 
Table api (e.g. via a `left.join(right, , join_type="broadcast")
2. Then, support a Broadcast hint that would utilize this new join based on the 
hint type


What do you think about this ?
Would you have some pointers on how/where to start on the first part ? (I'm 
thinking we'd have to extend the Broadcast state pattern for this purpose)


Thanks,
Prabhjot


On Thu, Feb 1, 2024 at 11:40 AM Feng Jin  wrote:

Hi Prabhjot


I think this is a reasonable scenario. If there is a large table and a very 
small table for regular join, without broadcasting the regular join, it can 
easily cause data skew.
We have also encountered similar problems too. Currently, we can only copy 
multiple copies of the small table using the union all and append random values 
to alleviate data skewness.




Best,
Feng



On Fri, Feb 2, 2024 at 12:24 AM Prabhjot Bharaj via user 
 wrote:


Hello folks,




We have a use case where we have a few stream-stream joins, requiring us to 
join a very large table with a much smaller table, essentially enriching the 
large table with a permutation on the smaller table (Consider deriving all 
orders/sessions for a new location). Given the nature of the dataset, if we use 
a typical join that uses Hash distribution to co-locate the records for each 
join key, we end up with a very skewed join (a few task slots getting all of 
the work, as against a good distribution).




We’ve internally implemented a Salting based solution where we salt the smaller 
table and join it with the larger table. While this works in the POC stage, 
we’d like to leverage flink as much as possible to do such a join.




By the nature of the problem, a broadcast join seems theoretically helpful. 
We’ve done an exploration on query hints supported in Flink, starting withthis 
FLIP andthis FLIP. 




Currently, the Optimizer doesn't consider the Broadcast hint in the `Exchange` 
step of the join, when creating the physical plan (Possibly because the hint 
would require the stream-stream join to also support Broadcast join with SQL)




Notice that the Query AST (Abstract Syntax Tree) has the broadcast hint parsed 
from the query:




```sql

...

...

joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0] options:[gpla)

...

```




However, the Flink optimizer ignores the hint and still represents the join as 
a regular `hash` join in the `Exchange` step:




```sql

...

...

:- Exchange(distribution=[hash[shop_id, join_key]])

...

```




In Flink `StreamExecExchange`, the translation happens only viathe `HASH` 
distribution type. unlike in the Flink `BatchExecExchange`, the translation can 
happen viaa multitude of options (`HASH/BROADCAST`).





Quotingthis Flink mailing list discussion for the FLIP that implemented the 
Broadcast join hint for batch sql:




> But currently, only in batch the optimizer has different Join strategies for 
> Join and

> there is no choice of join strategies in the stream. The join hints listed in 
> the current 

> flip should be ignored (maybe can be warned) in streaming mode. When in the

> future the stream mode has the choice of join strategies, I think that's a 
> good time > to discuss that the join hint can affect the streaming SQL.




What do you folks think about the possibility of a Broadcast join for Streaming 
Sql along with its corresponding Broadcast hint, that lets the user choose the 
kind of distribution they’d want with the dataset ?

Happy to learn more about this and hopefully implement it, if it doesn’t sound 
like a terrible idea.




Thanks,

Prabhjot





Re:sink upsert materializer in SQL job

2024-02-02 Thread Xuyang
Hi, Maj.


> 1. Does the materializer support jobs containing different types of joins 
> (more specifically regular and temporal joins)? 
> 2. Does the materializer support different types of input connectors: kafka 
> with both debezium-avro-confluent and avro-confluent formats and upsert-kafka 
> with avro-confluent format? All with well defined primary key (PK)

The common answer to both questions is "no." The upsert materializer is only 
related to the sink and the node before the sink (usually a join or an 
aggregation, etc.).

By default (with table.exec.sink.upsert-materialize = AUTO), the upsert 
materializer will appear when the upsert key of the upstream node before the 
sink and the pk of the sink do not match. Usually, we do not need to manually 
set this parameter to FORCE.




Suppose we have a source table T1, with a schema of "a", "b", "c", and "a" is 
the pk. Downstream, "b" is used as the join key to join with table T2, and the 
result is written into table T3, where "a" is also the pk. The global 
parallelism is set to 2. 

The source will issue (+I, a1, b1, c1), (-U, a1, b1, c1), (+U, a1, b2, c2). 
Because the join key is "b", the upsert key for the join becomes "b", which 
does not match the sink's pk "a", hence a sink materializer is produced.

Since the join key is "b", (+I, a1, b1, c1) and (-U, a1, b1, c1) will be sent 
to the first parallel instance of the join "join1", and (+U, a1, b2, c2) will 
be sent to the second parallel instance of the join "join2". At the same time, 
since the sink's pk is "a", these three pieces of data are actually related in 
sequence at the sink.




In practice, due to different processing speeds of join1 and join2, the sink 
may receive the following three possible sequences:

(+I, a1, b1, c1), (-U, a1, b1, c1), (+U, a1, b2, c2)
(+U, a1, b2, c2), (+I, a1, b1, c1), (-U, a1, b1, c1)
(+I, a1, b1, c1), (+U, a1, b2, c2), (-U, a1, b1, c1)

(Note that the two pieces of data (+I, a1, b1, c1) and (-U, a1, b1, c1) must be 
in order because they are processed by the single parallel "join1".)




Without an upsert materializer, in cases 2 and 3, the sink would ultimately 
receive -U, leading to data deletion.

The upsert materializer is used to correctly the finally issue (+U, a1, b2, c2) 
in cases 2 and 3.




Regrettably, Flink currently does not have a means of online debugging. To 
confirm the logic related to the upsert materializer, you may need to download 
the repo from the Flink repository, build & compile it, and then run the 
SinkUpsertMaterializerTest test class to observe and test youself.




Regarding the upsert key, you can use EXPLAIN CHANGELOG_MODE ... to view them 
in the plan.




If there are any issues with the above, please correct me.







--

Best!
Xuyang




At 2024-01-31 20:24:57, "Marek Maj"  wrote:

Hello Flink Community,
In our Flink SQL job we are experiencing undesirable behavior that is related 
to events reordering (more below in background section)
I have a few questions related to sink upsert materializer, the answer to them 
should help me understand its capabilities:


1. Does the materializer support jobs containing different types of joins (more 
specifically regular and temporal joins)? 
2. Does the materializer support different types of input connectors: kafka 
with both debezium-avro-confluent and avro-confluent formats and upsert-kafka 
with avro-confluent format? All with well defined primary key (PK)
3. What is the recommended way to debug sink materializer? I outputed compiled 
plan for SQL job and I can see that upsert materializer is on, but I am not 
sure if I can extract more information about its behavior


Flink version we are using: 1.16.1


best regards
Marek




Background:
We have deployed Flink SQL job that uses multiple joins to enrich data coming 
from main table. Altogether we have 11 different joins used for enrichment: 
temporal joins as well as regular joins (both: left and inner).
All source tables and output table use kafka topics under the hood. Grain of 
the main table does not change: main table and output table are using the same 
non-nullable column for their PK. Job parallelism is 16


We are experiencing data reorder that is presented below
Data in kafka input topic for main table (correct order, all in the same 
partition):




Data in kafka output topic after enrichment (events reordered, all in the same 
partition):




Highlighted event in the source becomes the last in the output and as a 
consequence incorrectly overrides enriched values to nulls.
Reordering happens probably due to one slow join where there is a significant 
data skew and output from one parallel task of this operator is delayed 
compared to other parallel tasks. This join uses as its key value that is null 
in first event, and non-null in all other 4 events



However, my understanding is that events reordering should be corrected by 
SinkUpsertMaterializer operator. Our configuration contains:
table.exec.

Re: Jobmanager restart after it has been requested to stop

2024-02-02 Thread Yang Wang
If you could find the "Deregistering Flink Kubernetes cluster, clusterId"
in the JobManager log, then it is not the expected behavior.

Having the full logs of JobManager Pod before restarted will help a lot.



Best,
Yang

On Fri, Feb 2, 2024 at 1:26 PM Liting Liu (litiliu) via user <
user@flink.apache.org> wrote:

> Hi, community:
> I'm running a Flink 1.14.3 job with flink-Kubernetes-operator-1.6.0 on the
> AWS. I found my flink jobmananger container's thread restarted after this
> flinkdeployment has been requested to stop, here is the log of jobmanager:
>
> 2024-02-01 21:57:48,977 tn="flink-akka.actor.default-dispatcher-107478"
> INFO
>  org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap
> [] - Application CANCELED:
> java.util.concurrent.CompletionException:
> org.apache.flink.client.deployment.application.UnsuccessfulExecutionException:
> Application Status: CANCELED
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$unwrapJobResultException$6(ApplicationDispatcherBootstrap.java:353)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> ~[?:1.8.0_322]
> 2024-02-01 21:57:48,984 tn="flink-akka.actor.default-dispatcher-107484"
> INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] -
> Shutting down rest endpoint.
> 2024-02-01 21:57:49,103 tn="flink-akka.actor.default-dispatcher-107478"
> INFO
>  
> org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent
> [] - Closing components.
> 2024-02-01 21:57:49,105 tn="flink-akka.actor.default-dispatcher-107484"
> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] -
> Stopped dispatcher akka.tcp://flink@
> 2024-02-01 21:57:49,112
> tn="AkkaRpcService-Supervisor-Termination-Future-Executor-thread-1" INFO
>  org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Stopping
> Akka RPC service.
> 2024-02-01 21:57:49,286 tn="flink-metrics-15" INFO
>  akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Remoting
> shut down.
> 2024-02-01 21:57:49,387 tn="main" INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> Terminating cluster entrypoint process
> KubernetesApplicationClusterEntrypoint with exit code 0.
> 2024-02-01 21:57:53,828 tn="main" INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties
> 2024-02-01 21:57:54,287 tn="main" INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Starting
> KubernetesApplicationClusterEntrypoint.
>
>
> I found the JM main container's containerId remains the same, after the JM
> auto-restart.
> why did this process start to run after it had been requested to stop?
>
>