[jira] [Created] (FLINK-32050) Bump Jackson to 2.14.3

2023-05-10 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-32050:
---

 Summary: Bump Jackson to 2.14.3
 Key: FLINK-32050
 URL: https://issues.apache.org/jira/browse/FLINK-32050
 Project: Flink
  Issue Type: Technical Debt
Reporter: Sergey Nuyanzin
Assignee: Sergey Nuyanzin


There is FLINK-32032 with upgrade of flink-shaded where flink-shaded's jackson 
is bumping to 2.14.x.
It would make sense also bump transitive dep jackson



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32045) optimize task deployment performance for large-scale jobs

2023-05-10 Thread Zhilong Hong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721632#comment-17721632
 ] 

Zhilong Hong edited comment on FLINK-32045 at 5/11/23 5:18 AM:
---

Thank you for proposing these optimizations, Weihua!

??1. Add a configuration to enable the distribution of shuffle descriptors via 
the blob server according to the parallelism.??

For the threshold to enable the distribution of shuffle descriptors via the 
blob server, originally I'm thinking about adding a new configuration called 
something like "blob.deployement.offload.minsize" (I forgot the original name). 
This configuration was eventually dropped, because we don't want to introduce a 
new configuration that would require users to have advanced knowledge before 
configuring it.

However, I think enabling the distribution of shuffle descriptors via the blob 
server according to the parallelism is a better solution for this situation. 
It's more understandable and easier to configure. We can also set a large 
default value for this configuration. What do you think [~zhuzh]?

??2. Introduce a cache for shuffle descriptors in the TaskManager??

We thought about introducing a cache for shuffle descriptors in the TaskManager 
earlier. Since users usually won't set a large number for the configuration 
"taskmanager.numberOfTaskSlots", which means there would only be a few slots in 
a TaskManager (for example, 8?). There won't be a lot of deserialization work 
on the TaskManager side. So, I'm wondering how much performance it would 
improve with a cache for shuffle descriptors in the TaskManager.

Also, there's another question arises for the cache. How to update the cache?  
Currently, the cache in JobManager is cleared in two scenarios: (1) 
ConsumerPartitionGroup is released (2) The producer of an IntermediateResult 
encounters a failover. To clear the caches in the TaskManager at the same time, 
we may need to introduce a few complicated RPC calls between JobManager and 
TaskManager to achieve it. In my opinion, it's a bit of complicated.

The third concern is about the session mode. If users submitted a lot of jobs 
to a session in a rapid speed, the cache would flush the heap memory in a short 
time, and causes unexpected influence for user's tasks. We can use a LRUCache 
or FIFOCache for this situation. However, it's not easy for us to decide the 
size of the cache, because we don't know how large the TaskManager would be.

In my opinion, introducing a cache for ShuffleDescriptors in the TaskManager 
may require more discussions. Please correct me if I missed anything or 
anything I said is wrong. Thank you.


was (Author: thesharing):
Thank you for proposing these optimizations, Weihua!
 # For the threshold to enable the distribution of shuffle descriptors via the 
blob server, originally I'm thinking about adding a new configuration called 
something like "blob.deployement.offload.minsize" (I forgot the original name). 
This configuration was eventually dropped, because we don't want to introduce a 
new configuration that would require users to have advanced knowledge before 
configuring it.

However, I think enabling the distribution of shuffle descriptors via the blob 
server according to the parallelism is a better solution for this situation. 
It's more understandable and easier to configure. We can also set a large 
default value for this configuration. What do you think [~zhuzh]?


 # We thought about introducing a cache for shuffle descriptors in the 
TaskManager earlier. Since users usually won't set a large number for the 
configuration "taskmanager.numberOfTaskSlots", which means there would only be 
a few slots in a TaskManager (for example, 8?). There won't be a lot of 
deserialization work on the TaskManager side. So, I'm wondering how much 
performance it would improve with a cache for shuffle descriptors in the 
TaskManager.

Also, there's another question arises for the cache. How to update the cache?  
Currently, the cache in JobManager is cleared in two scenarios: (1) 
ConsumerPartitionGroup is released (2) The producer of an IntermediateResult 
encounters a failover. To clear the caches in the TaskManager at the same time, 
we may need to introduce a few complicated RPC calls between JobManager and 
TaskManager to achieve it. In my opinion, it's a bit of complicated.

The third concern is about the session mode. If users submitted a lot of jobs 
to a session in a rapid speed, the cache would flush the heap memory in a short 
time, and causes unexpected influence for user's tasks. We can use a LRUCache 
or FIFOCache for this situation. However, it's not easy for us to decide the 
size of the cache, because we don't know how large the TaskManager would be.

In my opinion, introducing a cache for ShuffleDescriptors in the TaskManager 
may require more discussions. Please correct me if I missed 

[jira] [Commented] (FLINK-32045) optimize task deployment performance for large-scale jobs

2023-05-10 Thread Zhilong Hong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721632#comment-17721632
 ] 

Zhilong Hong commented on FLINK-32045:
--

Thank you for proposing these optimizations, Weihua!
 # For the threshold to enable the distribution of shuffle descriptors via the 
blob server, originally I'm thinking about adding a new configuration called 
something like "blob.deployement.offload.minsize" (I forgot the original name). 
This configuration was eventually dropped, because we don't want to introduce a 
new configuration that would require users to have advanced knowledge before 
configuring it.

However, I think enabling the distribution of shuffle descriptors via the blob 
server according to the parallelism is a better solution for this situation. 
It's more understandable and easier to configure. We can also set a large 
default value for this configuration. What do you think [~zhuzh]?


 # We thought about introducing a cache for shuffle descriptors in the 
TaskManager earlier. Since users usually won't set a large number for the 
configuration "taskmanager.numberOfTaskSlots", which means there would only be 
a few slots in a TaskManager (for example, 8?). There won't be a lot of 
deserialization work on the TaskManager side. So, I'm wondering how much 
performance it would improve with a cache for shuffle descriptors in the 
TaskManager.

Also, there's another question arises for the cache. How to update the cache?  
Currently, the cache in JobManager is cleared in two scenarios: (1) 
ConsumerPartitionGroup is released (2) The producer of an IntermediateResult 
encounters a failover. To clear the caches in the TaskManager at the same time, 
we may need to introduce a few complicated RPC calls between JobManager and 
TaskManager to achieve it. In my opinion, it's a bit of complicated.

The third concern is about the session mode. If users submitted a lot of jobs 
to a session in a rapid speed, the cache would flush the heap memory in a short 
time, and causes unexpected influence for user's tasks. We can use a LRUCache 
or FIFOCache for this situation. However, it's not easy for us to decide the 
size of the cache, because we don't know how large the TaskManager would be.

In my opinion, introducing a cache for ShuffleDescriptors in the TaskManager 
may require more discussions. Please correct me if I missed anything or 
anything I said is wrong. Thank you.

> optimize task deployment performance for large-scale jobs
> -
>
> Key: FLINK-32045
> URL: https://issues.apache.org/jira/browse/FLINK-32045
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Weihua Hu
>Priority: Major
>
> h1. Background
> In FLINK-21110, we cache shuffle descriptors on the job manager side and 
> support using blob servers to offload these descriptors in order to reduce 
> the cost of tasks deployment.
> I think there is also some improvement we could do for large-scale jobs.
>  # The default min size to enable distribution via blob server is 1MB. But 
> for a large wordcount job with 2 parallelism, the size of serialized 
> shuffle descriptors is only 300KB. It means users need to lower the 
> "blob.offload.minsize", but the value is hard for users to decide.
>  # The task executor side still needs to load blob files and deserialize 
> shuffle descriptors for each task. Since these operations are running in the 
> main thread, it may be pending other RPCs from the job manager.
> h1. Propose
>  # Enable distribute shuffle descriptors via blob server automatically. This 
> could be decided by the edge number of the current shuffle descriptor. The 
> blob offload will be enabled when the edge number exceeds an internal 
> threshold.
>  # Introduce cache of deserialized shuffle descriptors on the task executor 
> side. This could reduce the cost of reading from local blob files and 
> deserialization. Of course, the cache should have TTL to avoid occupying too 
> much memory. And the cache should have the same switch mechanism as the blob 
> server offload.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] WencongLiu commented on pull request #22341: [FLINK-27204] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor

2023-05-10 Thread via GitHub


WencongLiu commented on PR #22341:
URL: https://github.com/apache/flink/pull/22341#issuecomment-1543301331

   @XComp Thanks for your reply! 
   I only have a small questions. What should I define the return types of
   
   `void createDirtyResult(JobResultEntry) `
   `void markResultAsClean(JobID)`
   
   Currently the invokers of these two methods need sync behaviors. If we give 
it a Completable,and the invokers must use future.get(). From this point, 
refracting return types of these two methods is a little meaningless.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31635) Support writing records to the new tiered store architecture

2023-05-10 Thread Yuxin Tan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuxin Tan updated FLINK-31635:
--
Description: 
Support writing records to the new tiered store architecture.

To achieve the goal, this mainly includes the following two parts. 
1. Introduces the tiered storage architecture.
2. The producer-side implementation of the architecture

> Support writing records to the new tiered store architecture
> 
>
> Key: FLINK-31635
> URL: https://issues.apache.org/jira/browse/FLINK-31635
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Affects Versions: 1.18.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Support writing records to the new tiered store architecture.
> To achieve the goal, this mainly includes the following two parts. 
> 1. Introduces the tiered storage architecture.
> 2. The producer-side implementation of the architecture



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31963) java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned checkpoints

2023-05-10 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721592#comment-17721592
 ] 

Hangxiang Yu commented on FLINK-31963:
--

[~srichter] No, I haven't used side-outputs.
The problematic nodes / connection in my job: keyedProcessFunction -> sink (The 
partitioner type is rebalance).

The exception is thrown while scaling down sink node.

> java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned 
> checkpoints
> -
>
> Key: FLINK-31963
> URL: https://issues.apache.org/jira/browse/FLINK-31963
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.17.0
> Environment: Flink: 1.17.0
> FKO: 1.4.0
> StateBackend: RocksDB(Genetic Incremental Checkpoint & Unaligned Checkpoint 
> enabled)
>Reporter: Tan Kim
>Priority: Critical
>  Labels: stability
> Attachments: image-2023-04-29-02-49-05-607.png, jobmanager_error.txt, 
> taskmanager_error.txt
>
>
> I'm testing Autoscaler through Kubernetes Operator and I'm facing the 
> following issue.
> As you know, when a job is scaled down through the autoscaler, the job 
> manager and task manager go down and then back up again.
> When this happens, an index out of bounds exception is thrown and the state 
> is not restored from a checkpoint.
> [~gyfora] told me via the Flink Slack troubleshooting channel that this is 
> likely an issue with Unaligned Checkpoint and not an issue with the 
> autoscaler, but I'm opening a ticket with Gyula for more clarification.
> Please see the attached JM and TM error logs.
> Thank you.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31635) Support writing records to the new tiered store architecture

2023-05-10 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721577#comment-17721577
 ] 

Xintong Song commented on FLINK-31635:
--

master (1.18): 80a924309ce910715c4079c7e52e9d560318bd38

> Support writing records to the new tiered store architecture
> 
>
> Key: FLINK-31635
> URL: https://issues.apache.org/jira/browse/FLINK-31635
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Affects Versions: 1.18.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-31635) Support writing records to the new tiered store architecture

2023-05-10 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song closed FLINK-31635.

Fix Version/s: 1.18.0
   Resolution: Done

> Support writing records to the new tiered store architecture
> 
>
> Key: FLINK-31635
> URL: https://issues.apache.org/jira/browse/FLINK-31635
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Affects Versions: 1.18.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] xintongsong closed pull request #22330: [FLINK-31635][network] Support writing records to the new tiered store architecture

2023-05-10 Thread via GitHub


xintongsong closed pull request #22330: [FLINK-31635][network] Support writing 
records to the new tiered store architecture
URL: https://github.com/apache/flink/pull/22330


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-2186) Rework CSV import to support very wide files

2023-05-10 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-2186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated FLINK-2186:

Component/s: (was: Library / Machine Learning)

> Rework CSV import to support very wide files
> 
>
> Key: FLINK-2186
> URL: https://issues.apache.org/jira/browse/FLINK-2186
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Scala
>Reporter: Theodore Vasiloudis
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> auto-unassigned, pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In the current readVcsFile implementation, importing CSV files with many 
> columns can become from cumbersome to impossible.
> For example to import an 11 column file we need to write:
> {code}
> val cancer = env.readCsvFile[(String, String, String, String, String, String, 
> String, String, String, String, 
> String)]("/path/to/breast-cancer-wisconsin.data")
> {code}
> For many use cases in Machine Learning we might have CSV files with thousands 
> or millions of columns that we want to import as vectors.
> In that case using the current readCsvFile method becomes impossible.
> We therefore need to rework the current function, or create a new one that 
> will allow us to import CSV files with an arbitrary number of columns.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #22564: [FLINK-31891][runtime] Introduce AdaptiveScheduler per-task failure enrichment/labeling

2023-05-10 Thread via GitHub


flinkbot commented on PR #22564:
URL: https://github.com/apache/flink/pull/22564#issuecomment-1542982484

   
   ## CI report:
   
   * 1d25a737714dcd7c6d5dc1b3d306fc8fa163e601 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31891) Introduce AdaptiveScheduler per-task failure enrichment/labeling

2023-05-10 Thread Panagiotis Garefalakis (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Panagiotis Garefalakis updated FLINK-31891:
---
Summary: Introduce AdaptiveScheduler per-task failure enrichment/labeling  
(was: Placeholder)

> Introduce AdaptiveScheduler per-task failure enrichment/labeling
> 
>
> Key: FLINK-31891
> URL: https://issues.apache.org/jira/browse/FLINK-31891
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Panagiotis Garefalakis
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31891) Placeholder

2023-05-10 Thread Panagiotis Garefalakis (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Panagiotis Garefalakis updated FLINK-31891:
---
Summary: Placeholder  (was: Introduce JobMaster TM disconnect failure 
enrichment/labeling)

> Placeholder
> ---
>
> Key: FLINK-31891
> URL: https://issues.apache.org/jira/browse/FLINK-31891
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Panagiotis Garefalakis
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-kafka] RamanVerma commented on a diff in pull request #28: [FLINK-32019][Connector/Kafka] EARLIEST offset strategy for partitions discoveried later based on FLIP-288

2023-05-10 Thread via GitHub


RamanVerma commented on code in PR #28:
URL: 
https://github.com/apache/flink-connector-kafka/pull/28#discussion_r1190320189


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java:
##
@@ -56,54 +58,75 @@ public int getVersion() {
 
 @Override
 public byte[] serialize(KafkaSourceEnumState enumState) throws IOException 
{
-return serializeTopicPartitions(enumState.assignedPartitions());
+
+Set assignedPartitions = 
enumState.assignedPartitions();
+Set unassignedInitialPartitons = 
enumState.unassignedInitialPartitons();
+boolean initialDiscoveryFinished = 
enumState.initialDiscoveryFinished();
+return serializeTopicPartitions(
+assignedPartitions, unassignedInitialPartitons, 
initialDiscoveryFinished);
 }
 
 @Override
 public KafkaSourceEnumState deserialize(int version, byte[] serialized) 
throws IOException {
-if (version == CURRENT_VERSION) {
-final Set assignedPartitions = 
deserializeTopicPartitions(serialized);
-return new KafkaSourceEnumState(assignedPartitions);
-}
-
-// Backward compatibility
-if (version == VERSION_0) {
-Map> currentPartitionAssignment =
-SerdeUtils.deserializeSplitAssignments(
-serialized, new KafkaPartitionSplitSerializer(), 
HashSet::new);
-Set currentAssignedSplits = new HashSet<>();
-currentPartitionAssignment.forEach(
-(reader, splits) ->
-splits.forEach(
-split -> 
currentAssignedSplits.add(split.getTopicPartition(;
-return new KafkaSourceEnumState(currentAssignedSplits);
+switch (version) {
+case CURRENT_VERSION:
+return deserializeTopicPartitionsV2(serialized);
+case VERSION_1:
+final Set assignedPartitions =
+deserializeTopicPartitionsV1(serialized);
+return new KafkaSourceEnumState(assignedPartitions, new 
HashSet<>(), true);
+case VERSION_0:
+Map> 
currentPartitionAssignment =
+SerdeUtils.deserializeSplitAssignments(
+serialized, new 
KafkaPartitionSplitSerializer(), HashSet::new);
+Set currentAssignedSplits = new HashSet<>();
+currentPartitionAssignment.forEach(
+(reader, splits) ->
+splits.forEach(
+split ->
+currentAssignedSplits.add(
+
split.getTopicPartition(;
+return new KafkaSourceEnumState(currentAssignedSplits, new 
HashSet<>(), true);
+default:
+throw new IOException(
+String.format(
+"The bytes are serialized with version %d, "
++ "while this deserializer only 
supports version up to %d",
+version, CURRENT_VERSION));
 }
-
-throw new IOException(
-String.format(
-"The bytes are serialized with version %d, "
-+ "while this deserializer only supports 
version up to %d",
-version, CURRENT_VERSION));
 }
 
-private static byte[] serializeTopicPartitions(Collection 
topicPartitions)
+private static byte[] serializeTopicPartitions(
+Collection assignedPartitions,
+Collection unassignedInitialPartitons,

Review Comment:
   typo `unassignedInitialPartitons` -> `unassignedInitialPartitions`



##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java:
##
@@ -56,54 +58,75 @@ public int getVersion() {
 
 @Override
 public byte[] serialize(KafkaSourceEnumState enumState) throws IOException 
{
-return serializeTopicPartitions(enumState.assignedPartitions());
+
+Set assignedPartitions = 
enumState.assignedPartitions();
+Set unassignedInitialPartitons = 
enumState.unassignedInitialPartitons();
+boolean initialDiscoveryFinished = 
enumState.initialDiscoveryFinished();
+return serializeTopicPartitions(

Review Comment:
   Maybe we can just get rid of the private method now. 
   We are serializing more than just topic partitions (initialDiscoveryFinished 
is a boolean) so the method name needs to change. Also, there is no other 
caller. So, let's just do everything in serialize method itself.



##

[GitHub] [flink] snuyanzin commented on a diff in pull request #22558: [FLINK-28744][table] Upgrade Calcite to 1.31.0

2023-05-10 Thread via GitHub


snuyanzin commented on code in PR #22558:
URL: https://github.com/apache/flink/pull/22558#discussion_r1190413352


##
flink-table/flink-sql-jdbc-driver/pom.xml:
##
@@ -84,6 +84,12 @@
${project.version}
test

+   
+   com.google.guava
+   guava
+   ${guava.version}
+   provided
+   

Review Comment:
   Suddenly with Calcite 1.31.0 it starts failing with guava's `ImmutableList` 
not found. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22558: [FLINK-28744][table] Upgrade Calcite to 1.31.0

2023-05-10 Thread via GitHub


snuyanzin commented on code in PR #22558:
URL: https://github.com/apache/flink/pull/22558#discussion_r1190411735


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java:
##
@@ -469,10 +469,10 @@ private static List allTypesBasic() {
 .fromCase(BIGINT(), DEFAULT_NEGATIVE_BIGINT, 539222987)
 .fromCase(FLOAT(), DEFAULT_POSITIVE_FLOAT, 123)
 .fromCase(FLOAT(), DEFAULT_NEGATIVE_FLOAT, -123)
-.fromCase(FLOAT(), 9234567891.12, 644633299)
+.fromCase(FLOAT(), 9234567891.12, 2147483647)

Review Comment:
   All the changes is this file are a consequence of 
https://issues.apache.org/jira/browse/CALCITE-4861



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22558: [FLINK-28744][table] Upgrade Calcite to 1.31.0

2023-05-10 Thread via GitHub


snuyanzin commented on code in PR #22558:
URL: https://github.com/apache/flink/pull/22558#discussion_r1190409133


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java:
##
@@ -526,7 +526,7 @@ private static List allTypesBasic() {
 .fromCase(BIGINT(), DEFAULT_NEGATIVE_BIGINT, 
DEFAULT_NEGATIVE_BIGINT)
 .fromCase(FLOAT(), DEFAULT_POSITIVE_FLOAT, 123L)
 .fromCase(FLOAT(), DEFAULT_NEGATIVE_FLOAT, -123L)
-.fromCase(FLOAT(), 9234567891.12, 9234567891L)
+.fromCase(FLOAT(), 9234567891.12, 9234568192L)

Review Comment:
   This looks a bit weird however after checking it the result is same as for
   ```java
   Float.parseFloat("9234567891.12");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22558: [FLINK-28744][table] Upgrade Calcite to 1.31.0

2023-05-10 Thread via GitHub


snuyanzin commented on code in PR #22558:
URL: https://github.com/apache/flink/pull/22558#discussion_r1190407800


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java:
##
@@ -469,10 +469,10 @@ private static List allTypesBasic() {
 .fromCase(BIGINT(), DEFAULT_NEGATIVE_BIGINT, 539222987)
 .fromCase(FLOAT(), DEFAULT_POSITIVE_FLOAT, 123)
 .fromCase(FLOAT(), DEFAULT_NEGATIVE_FLOAT, -123)
-.fromCase(FLOAT(), 9234567891.12, 644633299)
+.fromCase(FLOAT(), 9234567891.12, 2147483647)

Review Comment:
   This looks a bit weird however after checking it the result is same as for
   ```java
   Float.parseFloat("9234567891.12");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22558: [FLINK-28744][table] Upgrade Calcite to 1.31.0

2023-05-10 Thread via GitHub


snuyanzin commented on code in PR #22558:
URL: https://github.com/apache/flink/pull/22558#discussion_r1190406330


##
flink-table/flink-table-planner/pom.xml:
##
@@ -99,16 +99,10 @@ under the License.



-   
-   
-   apiguardian-api
-   org.apiguardian
-   1.1.2
-   

Review Comment:
   As mentioned above Calcite 1.31.0 now depends on 1.1.2 api.guardian (same as 
junit-jupiter) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22558: [FLINK-28744][table] Upgrade Calcite to 1.31.0

2023-05-10 Thread via GitHub


snuyanzin commented on code in PR #22558:
URL: https://github.com/apache/flink/pull/22558#discussion_r1190405680


##
flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl:
##
@@ -1335,12 +1335,14 @@ SqlNode RichSqlInsert() :
 final SqlNodeList keywordList;
 final List extendedKeywords = new ArrayList();
 final SqlNodeList extendedKeywordList;
-SqlNode table;
-SqlNodeList extendList = null;
+final SqlIdentifier tableName;
+SqlNode tableRef;
+final SqlNodeList extendList;

Review Comment:
   Changes in this file are a consequence  of refactor of parser 
https://github.com/apache/calcite/commit/515f3356a6a1ab4bd570c1c20bec9a7e5d4aca5f



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32048) DecimalITCase.testAggMinGroupBy fails with "Insufficient number of network buffers"

2023-05-10 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721542#comment-17721542
 ] 

Sergey Nuyanzin commented on FLINK-32048:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48885=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12654

> DecimalITCase.testAggMinGroupBy fails with "Insufficient number of network 
> buffers"
> ---
>
> Key: FLINK-32048
> URL: https://issues.apache.org/jira/browse/FLINK-32048
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner, Tests
>Affects Versions: 1.18.0
>Reporter: Lijie Wang
>Priority: Critical
>  Labels: test-stability
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48855=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4]
> {code:java}
> May 10 09:37:41 Caused by: java.io.IOException: Insufficient number of 
> network buffers: required 1, but only 0 available. The total number of 
> network buffers is currently set to 2048 of 32768 bytes each. You can 
> increase this number by setting the configuration keys 
> 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 
> 'taskmanager.memory.network.max'.
> May 10 09:37:41   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalCreateBufferPool(NetworkBufferPool.java:495)
> May 10 09:37:41   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:456)
> May 10 09:37:41   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory.lambda$createBufferPoolFactory$3(SingleInputGateFactory.java:330)
> May 10 09:37:41   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:274)
> May 10 09:37:41   at 
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:105)
> May 10 09:37:41   at 
> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:969)
> May 10 09:37:41   at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:654)
> May 10 09:37:41   at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> May 10 09:37:41   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29533) Add proper table style to Flink website

2023-05-10 Thread Sakshi Sharma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721540#comment-17721540
 ] 

Sakshi Sharma commented on FLINK-29533:
---

Can I work on this issue?

> Add proper table style to Flink website
> ---
>
> Key: FLINK-29533
> URL: https://issues.apache.org/jira/browse/FLINK-29533
> Project: Flink
>  Issue Type: Bug
>  Components: Project Website
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: starter
> Attachments: Screenshot from 2022-10-07 08-23-01.png
>
>
> Tables can be created using simple markdown syntax. But the corresponding 
> rendered table lacks proper styling:  !Screenshot from 2022-10-07 
> 08-23-01.png!
> Several blog post work around that by adding a custom style:
>  * [Apache Flink Kubernetes Operator 1.0.0 Release 
> Announcement|https://flink.apache.org/news/2022/06/05/release-kubernetes-operator-1.0.0.html]
>  * [Improving speed and stability of checkpointing with generic log-based 
> incremental 
> checkpoints|https://flink.apache.org/2022/05/30/changelog-state-backend.html]
> What about coming up with a common style that doesn't require people to come 
> up with their own custom style per post.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-kafka] RamanVerma commented on a diff in pull request #28: [FLINK-32019][Connector/Kafka] EARLIEST offset strategy for partitions discoveried later based on FLIP-288

2023-05-10 Thread via GitHub


RamanVerma commented on code in PR #28:
URL: 
https://github.com/apache/flink-connector-kafka/pull/28#discussion_r1190292883


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java:
##
@@ -22,18 +22,69 @@
 
 import org.apache.kafka.common.TopicPartition;
 
+import java.util.HashSet;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /** The state of Kafka source enumerator. */
 @Internal
 public class KafkaSourceEnumState {
-private final Set assignedPartitions;
+/** Partitions with status: ASSIGNED or UNASSIGNED_INITIAL. */
+private final Set partitions;
+/**
+ * this flag will be marked as true if inital partitions are discovered 
after enumerator starts.
+ */
+private final boolean initialDiscoveryFinished;
 
-KafkaSourceEnumState(Set assignedPartitions) {
-this.assignedPartitions = assignedPartitions;
+KafkaSourceEnumState(
+Set assignPartitions,

Review Comment:
   please change the parameter names to `assignedPartitions` and 
`unassignedInitialPartitions`



##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/TopicPartitionWithAssignStatus.java:
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.kafka.common.TopicPartition;
+
+/** Kafka partition with assign status. */
+@Internal
+public class TopicPartitionWithAssignStatus {
+private final TopicPartition topicPartition;
+private final long assignStatus;

Review Comment:
   `assignmentStatus` would convey the meaning better than `assignStatus`
   Also, I would prefer `TopicPartitionAndAssignmentStatus` over 
`TopicPartitionWithAssignStatus`



##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java:
##
@@ -22,18 +22,69 @@
 
 import org.apache.kafka.common.TopicPartition;
 
+import java.util.HashSet;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /** The state of Kafka source enumerator. */
 @Internal
 public class KafkaSourceEnumState {
-private final Set assignedPartitions;
+/** Partitions with status: ASSIGNED or UNASSIGNED_INITIAL. */
+private final Set partitions;
+/**
+ * this flag will be marked as true if inital partitions are discovered 
after enumerator starts.
+ */
+private final boolean initialDiscoveryFinished;
 
-KafkaSourceEnumState(Set assignedPartitions) {
-this.assignedPartitions = assignedPartitions;
+KafkaSourceEnumState(
+Set assignPartitions,
+Set unAssignInitialPartitions,
+boolean initialDiscoveryFinished) {
+this.partitions = new HashSet<>();
+partitions.addAll(
+assignPartitions.stream()
+.map(
+topicPartition ->
+new TopicPartitionWithAssignStatus(
+topicPartition,
+
TopicPartitionWithAssignStatus.ASSIGNED))
+.collect(Collectors.toSet()));
+partitions.addAll(
+unAssignInitialPartitions.stream()
+.map(
+topicPartition ->
+new TopicPartitionWithAssignStatus(
+topicPartition,
+
TopicPartitionWithAssignStatus.UNASSIGNED_INITIAL))
+.collect(Collectors.toSet()));
+this.initialDiscoveryFinished = initialDiscoveryFinished;
+}
+
+public Set partitions() {
+return partitions;
 }
 
 public Set assignedPartitions() {
-return assignedPartitions;
+return partitions.stream()

Review Comment:
   Lines 68-74 and 78-84 duplicate the code a bit. 
   Maybe you can define a private method to abstract the common code and call 
it from 

[GitHub] [flink] architgyl commented on pull request #22509: [FLINK-31983] Add yarn Acls capability to Flink containers

2023-05-10 Thread via GitHub


architgyl commented on PR #22509:
URL: https://github.com/apache/flink/pull/22509#issuecomment-1542652921

   > Thanks for the patch. Left some comments.
   > 
   > Ideally we should have a test case verifying that the ACL actually works. 
Have you verified the ACL actually works with a real deployment in YARN?
   
   @becketqin I have verified locally with the possible scenarios for both the 
use cases:
   - view the logs
   - kill application
   Also, verified the wildcard behavior. Updated tests related to it and also, 
added post change behavior in the Testing part of the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-32049) CoordinatedSourceRescaleITCase.testDownscaling fails on AZP

2023-05-10 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-32049:
---

 Summary: CoordinatedSourceRescaleITCase.testDownscaling fails on 
AZP
 Key: FLINK-32049
 URL: https://issues.apache.org/jira/browse/FLINK-32049
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Common
Affects Versions: 1.17.1
Reporter: Sergey Nuyanzin


CoordinatedSourceRescaleITCase.testDownscaling fails with
{noformat}
May 08 03:19:14 [ERROR] Failures: 
May 08 03:19:14 [ERROR]   
CoordinatedSourceRescaleITCase.testDownscaling:75->resumeCheckpoint:107 
May 08 03:19:14 Multiple Failures (1 failure)
May 08 03:19:14 -- failure 1 --
May 08 03:19:14 [Any cause contains message 'successfully restored checkpoint'] 
May 08 03:19:14 Expecting any element of:
May 08 03:19:14   [org.apache.flink.runtime.client.JobExecutionException: Job 
execution failed.
May 08 03:19:14 at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
May 08 03:19:14 at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
May 08 03:19:14 at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
May 08 03:19:14 ...(45 remaining lines not displayed - this can be 
changed with Assertions.setMaxStackTraceElementsDisplayed),
May 08 03:19:14 org.apache.flink.runtime.JobException: Recovery is 
suppressed by NoRestartBackoffTimeStrategy
May 08 03:19:14 at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
May 08 03:19:14 at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
May 08 03:19:14 at 
org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258)
May 08 03:19:14 ...(35 remaining lines not displayed - this can be 
changed with Assertions.setMaxStackTraceElementsDisplayed),
May 08 03:19:14 java.lang.IllegalStateException: This executor has been 
registered.
May 08 03:19:14 at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
May 08 03:19:14 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.registerSubtask(ChannelStateWriteRequestExecutorImpl.java:341)
May 08 03:19:14 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory.getOrCreateExecutor(ChannelStateWriteRequestExecutorFactory.java:63)
May 08 03:19:14 ...(17 remaining lines not displayed - this can be 
changed with Assertions.setMaxStackTraceElementsDisplayed)]
May 08 03:19:14 to satisfy the given assertions requirements but none did:
May 08 03:19:14 
May 08 03:19:14 org.apache.flink.runtime.client.JobExecutionException: Job 
execution failed.
May 08 03:19:14 at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
May 08 03:19:14 at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
May 08 03:19:14 at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
May 08 03:19:14 ...(45 remaining lines not displayed - this can be 
changed with Assertions.setMaxStackTraceElementsDisplayed)
May 08 03:19:14 error: 
May 08 03:19:14 Expecting throwable message:
May 08 03:19:14   "Job execution failed."
May 08 03:19:14 to contain:
May 08 03:19:14   "successfully restored checkpoint"
May 08 03:19:14 but did not.
May 08 03:19:14 

{noformat}
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48772=logs=fc7981dc-d266-55b0-5fff-f0d0a2294e36=1a9b228a-3e0e-598f-fc81-c321539dfdbf=7191



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32048) DecimalITCase.testAggMinGroupBy fails with "Insufficient number of network buffers"

2023-05-10 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721505#comment-17721505
 ] 

Sergey Nuyanzin commented on FLINK-32048:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48844=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12658

> DecimalITCase.testAggMinGroupBy fails with "Insufficient number of network 
> buffers"
> ---
>
> Key: FLINK-32048
> URL: https://issues.apache.org/jira/browse/FLINK-32048
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner, Tests
>Affects Versions: 1.18.0
>Reporter: Lijie Wang
>Priority: Critical
>  Labels: test-stability
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48855=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4]
> {code:java}
> May 10 09:37:41 Caused by: java.io.IOException: Insufficient number of 
> network buffers: required 1, but only 0 available. The total number of 
> network buffers is currently set to 2048 of 32768 bytes each. You can 
> increase this number by setting the configuration keys 
> 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 
> 'taskmanager.memory.network.max'.
> May 10 09:37:41   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalCreateBufferPool(NetworkBufferPool.java:495)
> May 10 09:37:41   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:456)
> May 10 09:37:41   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory.lambda$createBufferPoolFactory$3(SingleInputGateFactory.java:330)
> May 10 09:37:41   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:274)
> May 10 09:37:41   at 
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:105)
> May 10 09:37:41   at 
> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:969)
> May 10 09:37:41   at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:654)
> May 10 09:37:41   at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> May 10 09:37:41   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32048) DecimalITCase.testAggMinGroupBy fails with "Insufficient number of network buffers"

2023-05-10 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721504#comment-17721504
 ] 

Sergey Nuyanzin commented on FLINK-32048:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48863=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12370

> DecimalITCase.testAggMinGroupBy fails with "Insufficient number of network 
> buffers"
> ---
>
> Key: FLINK-32048
> URL: https://issues.apache.org/jira/browse/FLINK-32048
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner, Tests
>Affects Versions: 1.18.0
>Reporter: Lijie Wang
>Priority: Critical
>  Labels: test-stability
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48855=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4]
> {code:java}
> May 10 09:37:41 Caused by: java.io.IOException: Insufficient number of 
> network buffers: required 1, but only 0 available. The total number of 
> network buffers is currently set to 2048 of 32768 bytes each. You can 
> increase this number by setting the configuration keys 
> 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 
> 'taskmanager.memory.network.max'.
> May 10 09:37:41   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalCreateBufferPool(NetworkBufferPool.java:495)
> May 10 09:37:41   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:456)
> May 10 09:37:41   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory.lambda$createBufferPoolFactory$3(SingleInputGateFactory.java:330)
> May 10 09:37:41   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:274)
> May 10 09:37:41   at 
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:105)
> May 10 09:37:41   at 
> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:969)
> May 10 09:37:41   at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:654)
> May 10 09:37:41   at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> May 10 09:37:41   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-31982) Build image from source Dockerfile error in main

2023-05-10 Thread James Busche (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

James Busche resolved FLINK-31982.
--
Fix Version/s: kubernetes-operator-1.5.0
 Release Note: Can successfully build the 1.5.0 container using podman 
while on Red Hat 9.0 and podman version 4.4.1
   Resolution: Workaround

> Build image from source Dockerfile error in main
> 
>
> Key: FLINK-31982
> URL: https://issues.apache.org/jira/browse/FLINK-31982
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: James Busche
>Priority: Major
> Fix For: kubernetes-operator-1.5.0
>
>
> I'm noticing a problem trying to build the Debian Flink Operator image from 
> the Dockerfile in the main branch.
>  
> podman build -f Dockerfile -t debian-release:1.5.0-rc1
> 
> [INFO] Compiling 9 source files to 
> /app/flink-kubernetes-operator-autoscaler/target/test-classes
> 
> [ERROR] COMPILATION ERROR : 
> [INFO] -
> [ERROR] 
> /app/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java:[59,8]
>  error while writing 
> org.apache.flink.kubernetes.operator.autoscaler.ScalingMetricEvaluatorTest: 
> /app/flink-kubernetes-operator-autoscaler/target/test-classes/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.class:
>  Too many open files
> [ERROR] 
> /app/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java:[78,29]
>  cannot access org.apache.flink.kubernetes.operator.autoscaler.ScalingSummary
>   bad class file: 
> /app/flink-kubernetes-operator-autoscaler/target/classes/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.class
>     unable to access file: java.nio.file.FileSystemException: 
> /app/flink-kubernetes-operator-autoscaler/target/classes/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.class:
>  Too many open files
>     Please remove or make sure it appears in the correct subdirectory of the 
> classpath.
> [ERROR] 
> /app/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java:[84,29]
>  incompatible types: inferred type does not conform to equality constraint(s)
>  
> I've tried increasing my nofiles to unlimited, but still see the error.
> I tried building the release 1.4.0 and it built fine, so not certain what's 
> recently changed in 1.5.0. Maybe it builds fine in Docker instead of podman?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31982) Build image from source Dockerfile error in main

2023-05-10 Thread James Busche (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721498#comment-17721498
 ] 

James Busche commented on FLINK-31982:
--

I'm going to mark this as resolved - that the resolution is to use a later 
version of Red Hat (Red Hat 9.0) for building the 1.5.0 release of the Flink 
Operator.

> Build image from source Dockerfile error in main
> 
>
> Key: FLINK-31982
> URL: https://issues.apache.org/jira/browse/FLINK-31982
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: James Busche
>Priority: Major
>
> I'm noticing a problem trying to build the Debian Flink Operator image from 
> the Dockerfile in the main branch.
>  
> podman build -f Dockerfile -t debian-release:1.5.0-rc1
> 
> [INFO] Compiling 9 source files to 
> /app/flink-kubernetes-operator-autoscaler/target/test-classes
> 
> [ERROR] COMPILATION ERROR : 
> [INFO] -
> [ERROR] 
> /app/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java:[59,8]
>  error while writing 
> org.apache.flink.kubernetes.operator.autoscaler.ScalingMetricEvaluatorTest: 
> /app/flink-kubernetes-operator-autoscaler/target/test-classes/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.class:
>  Too many open files
> [ERROR] 
> /app/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java:[78,29]
>  cannot access org.apache.flink.kubernetes.operator.autoscaler.ScalingSummary
>   bad class file: 
> /app/flink-kubernetes-operator-autoscaler/target/classes/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.class
>     unable to access file: java.nio.file.FileSystemException: 
> /app/flink-kubernetes-operator-autoscaler/target/classes/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.class:
>  Too many open files
>     Please remove or make sure it appears in the correct subdirectory of the 
> classpath.
> [ERROR] 
> /app/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java:[84,29]
>  incompatible types: inferred type does not conform to equality constraint(s)
>  
> I've tried increasing my nofiles to unlimited, but still see the error.
> I tried building the release 1.4.0 and it built fine, so not certain what's 
> recently changed in 1.5.0. Maybe it builds fine in Docker instead of podman?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31982) Build image from source Dockerfile error in main

2023-05-10 Thread James Busche (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721495#comment-17721495
 ] 

James Busche commented on FLINK-31982:
--

I'm not sure why it is that the 1.5.x build is failing with podman on my Red 
Hat 8.x servers, but 1.4.0 worked fine.

The podman version there is a bit older:

_podman --version_

_podman version 4.2.0_

I just tried it on a Red Hat 9.0 server and it built great.  podman there is:

_podman --version_

_podman version 4.4.1_

> Build image from source Dockerfile error in main
> 
>
> Key: FLINK-31982
> URL: https://issues.apache.org/jira/browse/FLINK-31982
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: James Busche
>Priority: Major
>
> I'm noticing a problem trying to build the Debian Flink Operator image from 
> the Dockerfile in the main branch.
>  
> podman build -f Dockerfile -t debian-release:1.5.0-rc1
> 
> [INFO] Compiling 9 source files to 
> /app/flink-kubernetes-operator-autoscaler/target/test-classes
> 
> [ERROR] COMPILATION ERROR : 
> [INFO] -
> [ERROR] 
> /app/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java:[59,8]
>  error while writing 
> org.apache.flink.kubernetes.operator.autoscaler.ScalingMetricEvaluatorTest: 
> /app/flink-kubernetes-operator-autoscaler/target/test-classes/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.class:
>  Too many open files
> [ERROR] 
> /app/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java:[78,29]
>  cannot access org.apache.flink.kubernetes.operator.autoscaler.ScalingSummary
>   bad class file: 
> /app/flink-kubernetes-operator-autoscaler/target/classes/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.class
>     unable to access file: java.nio.file.FileSystemException: 
> /app/flink-kubernetes-operator-autoscaler/target/classes/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.class:
>  Too many open files
>     Please remove or make sure it appears in the correct subdirectory of the 
> classpath.
> [ERROR] 
> /app/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java:[84,29]
>  incompatible types: inferred type does not conform to equality constraint(s)
>  
> I've tried increasing my nofiles to unlimited, but still see the error.
> I tried building the release 1.4.0 and it built fine, so not certain what's 
> recently changed in 1.5.0. Maybe it builds fine in Docker instead of podman?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] JTaky commented on pull request #22394: [FLINK-31780][component=Runtime/Coordination] Allow users to disable 'Ensemble tracking' feature for ZooKeeper Curator framework

2023-05-10 Thread via GitHub


JTaky commented on PR #22394:
URL: https://github.com/apache/flink/pull/22394#issuecomment-1542593976

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #22546: [FLINK-32032] Upgrade to flink-shaded 17.0

2023-05-10 Thread via GitHub


XComp commented on code in PR #22546:
URL: https://github.com/apache/flink/pull/22546#discussion_r1190164832


##
pom.xml:
##
@@ -376,7 +376,7 @@ under the License.

org.apache.flink
flink-shaded-netty
-   
4.1.90.Final-${flink.shaded.version}

Review Comment:
   The commit message doesn't match the change? :thinking: 



##
pom.xml:
##
@@ -122,8 +122,8 @@ under the License.
4
true
-XX:+UseG1GC 
-Xms256m
-   16.1
-   
2.13.4

Review Comment:
   There's also `` property further down in the file that is 
used for test code. I suggest that we keep these two dependencies in sync. We 
would have to change some parameter initialization (see 
[CURATOR-663](https://issues.apache.org/jira/browse/CURATOR-663)) if we decide 
to upgrade.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] clownxc commented on a diff in pull request #22555: [FLINK-31706] [runtime] The default source parallelism should be the same as ex…

2023-05-10 Thread via GitHub


clownxc commented on code in PR #22555:
URL: https://github.com/apache/flink/pull/22555#discussion_r1190156245


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java:
##
@@ -546,7 +553,7 @@ static DefaultVertexParallelismAndInputInfosDecider from(
 configuration.get(
 
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_AVG_DATA_VOLUME_PER_TASK),
 configuration.get(
-BatchExecutionOptions
-
.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM));
+
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM),
+configuration.get(CoreOptions.DEFAULT_PARALLELISM));
 }

Review Comment:
   > parallelism.default should be obtained from ExecutionConfig instead of 
jobmaster Configuration
   
   I don't know if my idea is correct, looking forward to your reply.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on pull request #22380: [FLINK-31773][runtime] Separate DefaultLeaderElectionService.start(LeaderContender) into two separate methods for starting the driver and regis

2023-05-10 Thread via GitHub


XComp commented on PR #22380:
URL: https://github.com/apache/flink/pull/22380#issuecomment-1542484791

   > Are we maybe missing some start call somewhere?
   > In one of the tests the job submission never went through, and another HA 
e2e test is also failing.
   
   Just to have this documented: The problem was (again) the fact that there's 
a grant call being triggered from within the  
`MultipleComponentLeaderElectionDriverAdapter` while instantiating the driver. 
Initially, I only allowed runnables being pushed to the leader event thread if 
the driver is initialized. This prevented the initial grant call (if leadership 
was already acquired) to be handled properly.
   
   I'm gonna reorganize/rebase the branch and make it ready to be reviewed 
again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-32029) FutureUtils.handleUncaughtException swallows exceptions that are caused by the exception handler code

2023-05-10 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl resolved FLINK-32029.
---
Fix Version/s: 1.16.2
   1.18.0
   1.17.1
   Resolution: Fixed

> FutureUtils.handleUncaughtException swallows exceptions that are caused by 
> the exception handler code
> -
>
> Key: FLINK-32029
> URL: https://issues.apache.org/jira/browse/FLINK-32029
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.17.0, 1.16.1, 1.18.0
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.2, 1.18.0, 1.17.1
>
>
> I ran into an issue in FLINK-31773 where the passed exception handler relies 
> on the {{leaderContender}} field of the {{DefaultLeaderElectionService}}. 
> This field can become {{null}} with the changes of FLINK-31773. But the 
> {{null}} check was missed in the error handling code. This bug wasn't exposed 
> because {{FutureUtils.handleUncaughtException(..)}} expects the passed error 
> handler callback to be bug-free.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32029) FutureUtils.handleUncaughtException swallows exceptions that are caused by the exception handler code

2023-05-10 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721280#comment-17721280
 ] 

Matthias Pohl edited comment on FLINK-32029 at 5/10/23 4:15 PM:


master: 88b3432a2845952543a8396aeb8e2cddab77b509
1.17: 1d85f6ffc00789e0239f8ed7164af03b81e8dfae
1.16: 55c748647fdf68050312c08bfaba574a423a8464


was (Author: mapohl):
master: 88b3432a2845952543a8396aeb8e2cddab77b509
1.17: tba
1.16: tba

> FutureUtils.handleUncaughtException swallows exceptions that are caused by 
> the exception handler code
> -
>
> Key: FLINK-32029
> URL: https://issues.apache.org/jira/browse/FLINK-32029
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.17.0, 1.16.1, 1.18.0
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Major
>  Labels: pull-request-available
>
> I ran into an issue in FLINK-31773 where the passed exception handler relies 
> on the {{leaderContender}} field of the {{DefaultLeaderElectionService}}. 
> This field can become {{null}} with the changes of FLINK-31773. But the 
> {{null}} check was missed in the error handling code. This bug wasn't exposed 
> because {{FutureUtils.handleUncaughtException(..)}} expects the passed error 
> handler callback to be bug-free.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] XComp merged pull request #22562: [BP-1.16][FLINK-32029][core] Adds fallback error handling to FutureUtils.handleUncaughtException.

2023-05-10 Thread via GitHub


XComp merged PR #22562:
URL: https://github.com/apache/flink/pull/22562


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on pull request #22562: [BP-1.16][FLINK-32029][core] Adds fallback error handling to FutureUtils.handleUncaughtException.

2023-05-10 Thread via GitHub


XComp commented on PR #22562:
URL: https://github.com/apache/flink/pull/22562#issuecomment-1542478232

   PR #22561 was approved.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp merged pull request #22561: [BP-1.17][FLINK-32029][core] Adds fallback error handling to FutureUtils.handleUncaughtException.

2023-05-10 Thread via GitHub


XComp merged PR #22561:
URL: https://github.com/apache/flink/pull/22561


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #21736: [FLINK-27518][tests] Refactor migration tests to support version update automatically

2023-05-10 Thread via GitHub


XComp commented on code in PR #21736:
URL: https://github.com/apache/flink/pull/21736#discussion_r1190087803


##
flink-test-utils-parent/flink-migration-test-utils/README.md:
##
@@ -0,0 +1,99 @@
+# Add State Migration Tests
+
+This module collects tools that help to generate test data for the state 
migration tests.
+
+The following dependency need to be added to the module's Maven config in case 
a
+migration test is meant to be added to that module:
+
+```xml
+
+org.apache.flink
+fink-migration-test-utils
+${project.version}
+test
+
+```
+
+and the following profile
+
+```xml
+
+generate-migration-test-data
+
+
+
+maven-antrun-plugin
+
+
+generate-migration-test-data
+package
+
+run
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+```
+
+To show the log during generating, add
+
+```
+logger.migration.name = org.apache.flink.test.migration
+logger.migration.level = INFO
+```
+
+to the `log4j2-test.properties` of this module.
+
+The state migration tests should satisfy
+
+1. The tests are named like `*(Test|ITCase).(java|scala)`.
+2. The test class name is the same with the file name.
+3. The test implements `org.apache.flink.test.util.MigrationTest` and the 
snapshots generator methods are labeled
+   with `@SnapshotsGenerator` or `@ParameterizedSnapshotsGenerator`.
+
+# Generating Snapshots
+
+To generate the snapshots for all the tests,
+execute from within the target version's release branch:
+
+```shell
+mvn clean package -Pgenerate-migration-test-data -Dgenerate.version=1.17 -nsu 
-Dfast -DskipTests
+```
+
+The version (`1.17` in the command above) should be replaced with the target 
one.
+
+By default, it will search for the migration tests under `src/test/java` and 
`src/test/scala`. It is also supported
+to change the default search paths or only generating for specific classes:

Review Comment:
   ```suggestion
   to change the default search paths or only generate for specific classes:
   ```



##
flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java:
##
@@ -104,16 +121,33 @@ public abstract class AbstractOperatorRestoreTestBase 
extends TestLogger {
 .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
 .build());
 
-private final boolean allowNonRestoredState;
 private final ScheduledExecutor scheduledExecutor =
 new 
ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor());
 
-protected AbstractOperatorRestoreTestBase() {
-this(true);
+protected AbstractOperatorRestoreTestBase(FlinkVersion flinkVersion) {
+this.flinkVersion = flinkVersion;
 }
 
-protected AbstractOperatorRestoreTestBase(boolean allowNonRestoredState) {
-this.allowNonRestoredState = allowNonRestoredState;
+protected void internalGenerateSnapshots(FlinkVersion targetVersion) 
throws Exception {
+ClusterClient clusterClient = cluster.getClusterClient();
+final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
+
+// Submit job with old version savepoint and create a migrated 
savepoint in the new version.
+// Any old version is ok, and we choose 1.8 directly.
+String savepointPath = migrateJob(FlinkVersion.v1_8, clusterClient, 
deadline);
+
+Path targetPath = getSavepointPath(targetVersion);
+Files.createDirectories(targetPath);
+
+// copy the savepoint to the give directory

Review Comment:
   ```suggestion
   // copy the savepoint to the given directory
   ```
   nit



##
flink-annotations/src/main/java/org/apache/flink/FlinkVersion.java:
##
@@ -87,7 +87,11 @@ public static Optional byCode(String code) {
 return Optional.ofNullable(CODE_MAP.get(code));
 }
 
-/** Returns the current version. */
+public static FlinkVersion valueOf(int majorVersion, int minorVersion) {
+return FlinkVersion.valueOf("v" + majorVersion + "_" + minorVersion);
+}
+
+/** Returns the current master version. */

Review Comment:
   

[GitHub] [flink-kubernetes-operator] ashangit commented on a diff in pull request #590: [FLINK-32012] Rely on savepoint mechanism when performing rollback with saveoint upgrade mode

2023-05-10 Thread via GitHub


ashangit commented on code in PR #590:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/590#discussion_r1190128820


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##
@@ -270,17 +270,13 @@ protected void rollback(FlinkResourceContext ctx) 
throws Exception {
 
 UpgradeMode upgradeMode = resource.getSpec().getJob().getUpgradeMode();
 
-cancelJob(
-ctx,
-upgradeMode == UpgradeMode.STATELESS
-? UpgradeMode.STATELESS
-: UpgradeMode.LAST_STATE);
+cancelJob(ctx, upgradeMode);

Review Comment:
   Indeed it would be better
   I will look into this and contact you through the original FLINK ticket



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] clownxc commented on a diff in pull request #22555: [FLINK-31706] [runtime] The default source parallelism should be the same as ex…

2023-05-10 Thread via GitHub


clownxc commented on code in PR #22555:
URL: https://github.com/apache/flink/pull/22555#discussion_r1190126807


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java:
##
@@ -546,7 +553,7 @@ static DefaultVertexParallelismAndInputInfosDecider from(
 configuration.get(
 
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_AVG_DATA_VOLUME_PER_TASK),
 configuration.get(
-BatchExecutionOptions
-
.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM));
+
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM),
+configuration.get(CoreOptions.DEFAULT_PARALLELISM));
 }

Review Comment:
   > parallelism.default should be obtained from ExecutionConfig instead of 
jobmaster Configuration
   
   The value of `ExecutionConfig#PARALLELISM_DEFAULT` is always `-1`
   ```java
   public static final int PARALLELISM_DEFAULT = -1;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] clownxc commented on a diff in pull request #22555: [FLINK-31706] [runtime] The default source parallelism should be the same as ex…

2023-05-10 Thread via GitHub


clownxc commented on code in PR #22555:
URL: https://github.com/apache/flink/pull/22555#discussion_r1190120652


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java:
##
@@ -546,7 +553,7 @@ static DefaultVertexParallelismAndInputInfosDecider from(
 configuration.get(
 
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_AVG_DATA_VOLUME_PER_TASK),
 configuration.get(
-BatchExecutionOptions
-
.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM));
+
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM),
+configuration.get(CoreOptions.DEFAULT_PARALLELISM));
 }

Review Comment:
   > parallelism.default should be obtained from ExecutionConfig instead of 
jobmaster Configuration
   
   Thank you very much for review. I want to get the `parallelism` of 
`ExecutionConfig` as the default value.
   ```java
 public int getParallelism() {
return configuration.get(CoreOptions.DEFAULT_PARALLELISM);
 }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] clownxc commented on a diff in pull request #22555: [FLINK-31706] [runtime] The default source parallelism should be the same as ex…

2023-05-10 Thread via GitHub


clownxc commented on code in PR #22555:
URL: https://github.com/apache/flink/pull/22555#discussion_r1190120652


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java:
##
@@ -546,7 +553,7 @@ static DefaultVertexParallelismAndInputInfosDecider from(
 configuration.get(
 
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_AVG_DATA_VOLUME_PER_TASK),
 configuration.get(
-BatchExecutionOptions
-
.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM));
+
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM),
+configuration.get(CoreOptions.DEFAULT_PARALLELISM));
 }

Review Comment:
   > parallelism.default should be obtained from ExecutionConfig instead of 
jobmaster Configuration
   
   Thank you very much for review. I want to get the parallelism of 
ExecutionConfig as the default value.
   ```java
 public int getParallelism() {
return configuration.get(CoreOptions.DEFAULT_PARALLELISM);
 }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #590: [FLINK-32012] Rely on savepoint mechanism when performing rollback with saveoint upgrade mode

2023-05-10 Thread via GitHub


gyfora commented on code in PR #590:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/590#discussion_r1190109107


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##
@@ -384,6 +384,12 @@ private boolean shouldRollBack(
 return false;
 }
 
+if (resource.getSpec().getJob() != null
+&& resource.getSpec().getJob().getUpgradeMode() == 
UpgradeMode.SAVEPOINT) {
+// HA data is not available during rollback for savepoint upgrade 
mode
+return true;
+}

Review Comment:
   This is not that simple. The operator only periodically (lets say every 
30sec) observes the job. It is possible that the job started, checkpointed and 
failed in that 30 sec period so we cannot simply ignore the HA metadata. If 
it's there we have to use it, we cannot delete it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] clownxc commented on a diff in pull request #22555: [FLINK-31706] [runtime] The default source parallelism should be the same as ex…

2023-05-10 Thread via GitHub


clownxc commented on code in PR #22555:
URL: https://github.com/apache/flink/pull/22555#discussion_r1190107593


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java:
##
@@ -546,7 +553,7 @@ static DefaultVertexParallelismAndInputInfosDecider from(
 configuration.get(
 
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_AVG_DATA_VOLUME_PER_TASK),
 configuration.get(
-BatchExecutionOptions
-
.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM));
+
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM),
+configuration.get(CoreOptions.DEFAULT_PARALLELISM));
 }

Review Comment:
   > parallelism.default should be obtained from ExecutionConfig instead of 
jobmaster Configuration
   
   Sorry for my mistake, I changed the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] ashangit commented on a diff in pull request #590: [FLINK-32012] Rely on savepoint mechanism when performing rollback with saveoint upgrade mode

2023-05-10 Thread via GitHub


ashangit commented on code in PR #590:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/590#discussion_r1190103548


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##
@@ -384,6 +384,12 @@ private boolean shouldRollBack(
 return false;
 }
 
+if (resource.getSpec().getJob() != null
+&& resource.getSpec().getJob().getUpgradeMode() == 
UpgradeMode.SAVEPOINT) {
+// HA data is not available during rollback for savepoint upgrade 
mode
+return true;
+}

Review Comment:
   Sorry but I'm not sure to see the issue if we just checkping for savepoint 
upgrade.
   If the HA metadata has indeed been created by the jobmanger but the job has 
not recover we should not have any new checkpoint, am I wrong?
   As in the rollback section we ensure that HA metadata is well deleted (in 
`cancelJob`) it should still be safe to 'not care' about this HA metadata, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #590: [FLINK-32012] Rely on savepoint mechanism when performing rollback with saveoint upgrade mode

2023-05-10 Thread via GitHub


gyfora commented on code in PR #590:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/590#discussion_r1190101168


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##
@@ -270,17 +270,13 @@ protected void rollback(FlinkResourceContext ctx) 
throws Exception {
 
 UpgradeMode upgradeMode = resource.getSpec().getJob().getUpgradeMode();
 
-cancelJob(
-ctx,
-upgradeMode == UpgradeMode.STATELESS
-? UpgradeMode.STATELESS
-: UpgradeMode.LAST_STATE);
+cancelJob(ctx, upgradeMode);

Review Comment:
   It would be great if somehow we could reuse the upgrade logic completely for 
rollbacks. I know this is a fairly complex change I am happy to discuss this 
offline with you as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #590: [FLINK-32012] Rely on savepoint mechanism when performing rollback with saveoint upgrade mode

2023-05-10 Thread via GitHub


gyfora commented on code in PR #590:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/590#discussion_r1190090026


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##
@@ -270,17 +270,13 @@ protected void rollback(FlinkResourceContext ctx) 
throws Exception {
 
 UpgradeMode upgradeMode = resource.getSpec().getJob().getUpgradeMode();
 
-cancelJob(
-ctx,
-upgradeMode == UpgradeMode.STATELESS
-? UpgradeMode.STATELESS
-: UpgradeMode.LAST_STATE);
+cancelJob(ctx, upgradeMode);

Review Comment:
   Yes, the rollback should be similar to the upgrade I completely agree. The 
problem with the current code is that the upgrade mechanism have evolved quite 
a bit and the rollback stayed pretty basic (with strong restrictions) as you 
see :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] clownxc commented on pull request #22555: [FLINK-31706] [runtime] The default source parallelism should be the same as ex…

2023-05-10 Thread via GitHub


clownxc commented on PR #22555:
URL: https://github.com/apache/flink/pull/22555#issuecomment-1542417626

   > @clownxc I've left some comments, PTAL!
   
   Thank you very much for your review, I have modified the code, can you 
re-review the code when you are free, and make some comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-cassandra] echauchot merged pull request #14: [FLINK-32014][hotfix][javadoc] Fix incorrect javadoc regarding maxSplitMemorySize and add numSplits computation information

2023-05-10 Thread via GitHub


echauchot merged PR #14:
URL: https://github.com/apache/flink-connector-cassandra/pull/14


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] ashangit commented on a diff in pull request #590: [FLINK-32012] Rely on savepoint mechanism when performing rollback with saveoint upgrade mode

2023-05-10 Thread via GitHub


ashangit commented on code in PR #590:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/590#discussion_r1190079103


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##
@@ -270,17 +270,13 @@ protected void rollback(FlinkResourceContext ctx) 
throws Exception {
 
 UpgradeMode upgradeMode = resource.getSpec().getJob().getUpgradeMode();
 
-cancelJob(
-ctx,
-upgradeMode == UpgradeMode.STATELESS
-? UpgradeMode.STATELESS
-: UpgradeMode.LAST_STATE);
+cancelJob(ctx, upgradeMode);

Review Comment:
   I wanted to ensure that HA data is well deleted before to restoreJob for 
rollback also.
   
   It looks to me that we should apply same pattern for upgrade and rollback 
relying only on the savepoint and ensuring HA data is here.
   For ex. if the upgrade was linked to bump of flink version with some changes 
in HA data not compatible with rollbacked flink version



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32048) DecimalITCase.testAggMinGroupBy fails with "Insufficient number of network buffers"

2023-05-10 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-32048:

Priority: Critical  (was: Major)

> DecimalITCase.testAggMinGroupBy fails with "Insufficient number of network 
> buffers"
> ---
>
> Key: FLINK-32048
> URL: https://issues.apache.org/jira/browse/FLINK-32048
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner, Tests
>Affects Versions: 1.18.0
>Reporter: Lijie Wang
>Priority: Critical
>  Labels: test-stability
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48855=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4]
> {code:java}
> May 10 09:37:41 Caused by: java.io.IOException: Insufficient number of 
> network buffers: required 1, but only 0 available. The total number of 
> network buffers is currently set to 2048 of 32768 bytes each. You can 
> increase this number by setting the configuration keys 
> 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 
> 'taskmanager.memory.network.max'.
> May 10 09:37:41   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalCreateBufferPool(NetworkBufferPool.java:495)
> May 10 09:37:41   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:456)
> May 10 09:37:41   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory.lambda$createBufferPoolFactory$3(SingleInputGateFactory.java:330)
> May 10 09:37:41   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:274)
> May 10 09:37:41   at 
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:105)
> May 10 09:37:41   at 
> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:969)
> May 10 09:37:41   at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:654)
> May 10 09:37:41   at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> May 10 09:37:41   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32048) DecimalITCase.testAggMinGroupBy fails with "Insufficient number of network buffers"

2023-05-10 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721406#comment-17721406
 ] 

Sergey Nuyanzin commented on FLINK-32048:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48874=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12367

> DecimalITCase.testAggMinGroupBy fails with "Insufficient number of network 
> buffers"
> ---
>
> Key: FLINK-32048
> URL: https://issues.apache.org/jira/browse/FLINK-32048
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner, Tests
>Affects Versions: 1.18.0
>Reporter: Lijie Wang
>Priority: Critical
>  Labels: test-stability
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48855=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4]
> {code:java}
> May 10 09:37:41 Caused by: java.io.IOException: Insufficient number of 
> network buffers: required 1, but only 0 available. The total number of 
> network buffers is currently set to 2048 of 32768 bytes each. You can 
> increase this number by setting the configuration keys 
> 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 
> 'taskmanager.memory.network.max'.
> May 10 09:37:41   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalCreateBufferPool(NetworkBufferPool.java:495)
> May 10 09:37:41   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:456)
> May 10 09:37:41   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory.lambda$createBufferPoolFactory$3(SingleInputGateFactory.java:330)
> May 10 09:37:41   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:274)
> May 10 09:37:41   at 
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:105)
> May 10 09:37:41   at 
> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:969)
> May 10 09:37:41   at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:654)
> May 10 09:37:41   at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> May 10 09:37:41   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32048) DecimalITCase.testAggMinGroupBy fails with "Insufficient number of network buffers"

2023-05-10 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-32048:

Labels: test-stability  (was: )

> DecimalITCase.testAggMinGroupBy fails with "Insufficient number of network 
> buffers"
> ---
>
> Key: FLINK-32048
> URL: https://issues.apache.org/jira/browse/FLINK-32048
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner, Tests
>Affects Versions: 1.18.0
>Reporter: Lijie Wang
>Priority: Major
>  Labels: test-stability
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48855=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4]
> {code:java}
> May 10 09:37:41 Caused by: java.io.IOException: Insufficient number of 
> network buffers: required 1, but only 0 available. The total number of 
> network buffers is currently set to 2048 of 32768 bytes each. You can 
> increase this number by setting the configuration keys 
> 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 
> 'taskmanager.memory.network.max'.
> May 10 09:37:41   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalCreateBufferPool(NetworkBufferPool.java:495)
> May 10 09:37:41   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:456)
> May 10 09:37:41   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory.lambda$createBufferPoolFactory$3(SingleInputGateFactory.java:330)
> May 10 09:37:41   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:274)
> May 10 09:37:41   at 
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:105)
> May 10 09:37:41   at 
> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:969)
> May 10 09:37:41   at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:654)
> May 10 09:37:41   at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> May 10 09:37:41   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32014) Cassandra source documentation is missing and javadoc is out of sync

2023-05-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32014:
---
Labels: pull-request-available  (was: )

> Cassandra source documentation is missing and javadoc is out of sync
> 
>
> Key: FLINK-32014
> URL: https://issues.apache.org/jira/browse/FLINK-32014
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Cassandra, Documentation
>Reporter: Etienne Chauchot
>Assignee: Etienne Chauchot
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-cassandra] echauchot opened a new pull request, #14: [FLINK-32014][hotfix][javadoc] Fix incorrect javadoc regarding maxSplitMemorySize and add numSplits computation informati

2023-05-10 Thread via GitHub


echauchot opened a new pull request, #14:
URL: https://github.com/apache/flink-connector-cassandra/pull/14

   Opening a PR to have a link in the ticket but self-merging as it is an hotfix


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-32014) Cassandra source documentation is missing and javadoc is out of sync

2023-05-10 Thread Etienne Chauchot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Etienne Chauchot reassigned FLINK-32014:


Assignee: Etienne Chauchot

> Cassandra source documentation is missing and javadoc is out of sync
> 
>
> Key: FLINK-32014
> URL: https://issues.apache.org/jira/browse/FLINK-32014
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Cassandra, Documentation
>Reporter: Etienne Chauchot
>Assignee: Etienne Chauchot
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] clownxc commented on pull request #22555: [FLINK-31706] [runtime] The default source parallelism should be the same as ex…

2023-05-10 Thread via GitHub


clownxc commented on PR #22555:
URL: https://github.com/apache/flink/pull/22555#issuecomment-1542370118

   > @clownxc Thanks for creating this pr! I'm curious why you don't fallback 
the value of globalDefaultSourceParallelism to parallelism.default if necessary 
outside the constructor of DefaultVertexParallelismAndInputInfosDecider? This 
change will be much cleaner. In addition, we do not require that 
parallelism.default must be a positive number. On the contrary, before version 
1.17, users need to configure parallelism.default = -1 to enable parallelism 
derivation. WDYT?
   
   Thank you very much for review, I will try to modify the code as you say


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] clownxc commented on pull request #22555: [FLINK-31706] [runtime] The default source parallelism should be the same as ex…

2023-05-10 Thread via GitHub


clownxc commented on PR #22555:
URL: https://github.com/apache/flink/pull/22555#issuecomment-1542368896

   > @clownxc Thanks for creating this pr! I'm curious why you don't fallback 
the value of globalDefaultSourceParallelism to parallelism.default if necessary 
outside the constructor of DefaultVertexParallelismAndInputInfosDecider? This 
change will be much cleaner. In addition, we do not require that 
parallelism.default must be a positive number. On the contrary, before version 
1.17, users need to configure parallelism.default = -1 to enable parallelism 
derivation. WDYT?
   Thank you, I will try to modify the code as you say
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31963) java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned checkpoints

2023-05-10 Thread Stefan Richter (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721387#comment-17721387
 ] 

Stefan Richter commented on FLINK-31963:


[~masteryhx] Did your job also make use of side-outputs? Just fishing among 
things that are potentially "unusual" about the jobs.

> java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned 
> checkpoints
> -
>
> Key: FLINK-31963
> URL: https://issues.apache.org/jira/browse/FLINK-31963
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.17.0
> Environment: Flink: 1.17.0
> FKO: 1.4.0
> StateBackend: RocksDB(Genetic Incremental Checkpoint & Unaligned Checkpoint 
> enabled)
>Reporter: Tan Kim
>Priority: Critical
>  Labels: stability
> Attachments: image-2023-04-29-02-49-05-607.png, jobmanager_error.txt, 
> taskmanager_error.txt
>
>
> I'm testing Autoscaler through Kubernetes Operator and I'm facing the 
> following issue.
> As you know, when a job is scaled down through the autoscaler, the job 
> manager and task manager go down and then back up again.
> When this happens, an index out of bounds exception is thrown and the state 
> is not restored from a checkpoint.
> [~gyfora] told me via the Flink Slack troubleshooting channel that this is 
> likely an issue with Unaligned Checkpoint and not an issue with the 
> autoscaler, but I'm opening a ticket with Gyula for more clarification.
> Please see the attached JM and TM error logs.
> Thank you.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] afedulov commented on pull request #22556: [FLINK-32030][sql-client] Add URLs support for SQL Client gateway mode

2023-05-10 Thread via GitHub


afedulov commented on PR #22556:
URL: https://github.com/apache/flink/pull/22556#issuecomment-1542314676

   @mxm thanks for the feedback! 
   SSL support will be added in a separate PR (tracked by 
https://issues.apache.org/jira/browse/FLINK-32035). This one solely expands the 
functionality to submitting URLs, including intermediary paths and does not aim 
at extending the existing hostname:port parameter functionality with encryption.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32014) Cassandra source documentation is missing and javadoc is out of sync

2023-05-10 Thread Etienne Chauchot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Etienne Chauchot updated FLINK-32014:
-
Summary: Cassandra source documentation is missing and javadoc is out of 
sync  (was: Cassandra source documentation is missing)

> Cassandra source documentation is missing and javadoc is out of sync
> 
>
> Key: FLINK-32014
> URL: https://issues.apache.org/jira/browse/FLINK-32014
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Cassandra, Documentation
>Reporter: Etienne Chauchot
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] afedulov commented on a diff in pull request #22556: [FLINK-32030][sql-client] Add URLs support for SQL Client gateway mode

2023-05-10 Thread via GitHub


afedulov commented on code in PR #22556:
URL: https://github.com/apache/flink/pull/22556#discussion_r1189996733


##
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java:
##
@@ -156,6 +156,23 @@ void testGatewayMode() throws Exception {
 assertThat(actual).contains("execution.target", "yarn-session");
 }
 
+@Test
+void testGatewayModeUrl() throws Exception {
+String[] args =
+new String[] {
+"gateway",
+"-e",
+new URL(
+"http",
+
SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress(),
+
SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort(),
+"")
+.toString()
+};
+String actual = runSqlClient(args, String.join("\n", "SET;", "QUIT;"), 
false);
+assertThat(actual).contains("execution.target", "yarn-session");

Review Comment:
   It does, has to do with how the existing tests are setup:
   
https://github.com/apache/flink/pull/22556/files#diff-f5e15c60fc56dbc6376b39004c12e41ab6501d130f788ecf421e6f09d597d3daR145-R157
   The new one is essentially the same test that just verifies that passing a 
URL instead of hostname:port works without changing anything else.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-32048) DecimalITCase.testAggMinGroupBy fails with "Insufficient number of network buffers"

2023-05-10 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-32048:
--

 Summary: DecimalITCase.testAggMinGroupBy fails with "Insufficient 
number of network buffers"
 Key: FLINK-32048
 URL: https://issues.apache.org/jira/browse/FLINK-32048
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner, Tests
Affects Versions: 1.18.0
Reporter: Lijie Wang


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48855=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4]
{code:java}
May 10 09:37:41 Caused by: java.io.IOException: Insufficient number of network 
buffers: required 1, but only 0 available. The total number of network buffers 
is currently set to 2048 of 32768 bytes each. You can increase this number by 
setting the configuration keys 'taskmanager.memory.network.fraction', 
'taskmanager.memory.network.min', and 'taskmanager.memory.network.max'.
May 10 09:37:41 at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalCreateBufferPool(NetworkBufferPool.java:495)
May 10 09:37:41 at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:456)
May 10 09:37:41 at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory.lambda$createBufferPoolFactory$3(SingleInputGateFactory.java:330)
May 10 09:37:41 at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:274)
May 10 09:37:41 at 
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:105)
May 10 09:37:41 at 
org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:969)
May 10 09:37:41 at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:654)
May 10 09:37:41 at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
May 10 09:37:41 at java.lang.Thread.run(Thread.java:748)
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] XComp commented on a diff in pull request #22548: [FLINK-32039][test] Adds graceful shutdown to TestExecutorExtension and TestExecutorResource

2023-05-10 Thread via GitHub


XComp commented on code in PR #22548:
URL: https://github.com/apache/flink/pull/22548#discussion_r1189977494


##
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/executor/TestExecutorExtension.java:
##
@@ -47,8 +58,26 @@ public T getExecutor() {
 
 @Override
 public void afterAll(ExtensionContext context) throws Exception {
+gracefulShutdown(executorService, LOG);
+}
+
+static void gracefulShutdown(@Nullable ExecutorService executorService, 
Logger logger) {

Review Comment:
   That's true. But we could make this an opt-out feature. :thinking: 
   
   But more generally asking: Don't you think that out-standing tasks that are 
left hanging might reveal some misbehavior in the test? I'm not passionate 
about that one and would be ok with using `shutdownNow()`. My only intention 
was to reveal issues in the code base that are missed right now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] mxm commented on a diff in pull request #22556: [FLINK-32030][sql-client] Add URLs support for SQL Client gateway mode

2023-05-10 Thread via GitHub


mxm commented on code in PR #22556:
URL: https://github.com/apache/flink/pull/22556#discussion_r1189971406


##
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java:
##
@@ -156,6 +156,23 @@ void testGatewayMode() throws Exception {
 assertThat(actual).contains("execution.target", "yarn-session");
 }
 
+@Test
+void testGatewayModeUrl() throws Exception {
+String[] args =
+new String[] {
+"gateway",
+"-e",
+new URL(
+"http",
+
SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress(),
+
SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort(),
+"")
+.toString()
+};
+String actual = runSqlClient(args, String.join("\n", "SET;", "QUIT;"), 
false);
+assertThat(actual).contains("execution.target", "yarn-session");

Review Comment:
   Does this verify that the SQL client comes up? Kind of odd to have the 
yarn-session in there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on pull request #22561: [BP-1.17][FLINK-32029][core] Adds fallback error handling to FutureUtils.handleUncaughtException.

2023-05-10 Thread via GitHub


XComp commented on PR #22561:
URL: https://github.com/apache/flink/pull/22561#issuecomment-1542262592

   Initially, I thought of backporting the junit5 migraiton as well. But it 
looks like we didn't do it in the past, either: `assertThatFuture` isn't 
backported. I backed off then because it felt like a quite big change to the 
`FutureUtilsTest` class just to have this kind-of cosmetic change (talking 
about the junit5 migration) be included as well. I didn't want to open the 
rabbit hole of migrating other JUnit5 migrations as well. WDYT? :thinking: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-aws] zentol commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27

2023-05-10 Thread via GitHub


zentol commented on code in PR #49:
URL: 
https://github.com/apache/flink-connector-aws/pull/49#discussion_r1189925470


##
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/HashShardAssigner.java:
##
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Experimental;
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.util.Preconditions;
+
+/** An implementation of the {@link KinesisShardAssigner} that assigns splits 
by hashcode. */
+@Experimental

Review Comment:
   Consider not exposing these implementations directly, but hide them behind a 
factory method that only returns a `KinesisShardAssigner`.



##
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/SourceConfigConstants.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.config;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import 
org.apache.flink.connector.kinesis.source.reader.PollingKinesisShardSplitReader;
+
+import java.time.Duration;
+
+/** Constants to be used with the KinesisStreamsSource. */
+@Experimental
+public class SourceConfigConstants extends AWSConfigConstants {
+/** Marks the initial position to use when reading from the Kinesis 
stream. */
+public enum InitialPosition {
+LATEST,
+TRIM_HORIZON,
+AT_TIMESTAMP
+}
+
+/** The record publisher type represents the record-consume style. */
+public enum RecordPublisherType {
+
+/** Consume the Kinesis records using AWS SDK v2 with the enhanced 
fan-out consumer. */
+EFO,
+/** Consume the Kinesis records using AWS SDK v1 with the get-records 
method. */
+POLLING
+}
+
+/** The EFO registration type represents how we are going to de-/register 
efo consumer. */
+public enum EFORegistrationType {
+
+/**
+ * Delay the registration of efo consumer for taskmanager to execute. 
De-register the efo
+ * consumer for taskmanager to execute when task is shut down.
+ */
+LAZY,
+/**
+ * Register the efo consumer eagerly for jobmanager to execute. 
De-register the efo consumer
+ * the same way as lazy does.
+ */
+EAGER,
+/** Do not register efo consumer programmatically. Do not de-register 
either. */
+NONE
+}
+
+/** The RecordPublisher type (EFO|POLLING). */
+public static final String RECORD_PUBLISHER_TYPE = 
"flink.stream.recordpublisher";

Review Comment:
   What are these constants / how are they used?



##
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializer.java:
##
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the 

[GitHub] [flink] XComp commented on pull request #22341: [FLINK-27204] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor

2023-05-10 Thread via GitHub


XComp commented on PR #22341:
URL: https://github.com/apache/flink/pull/22341#issuecomment-1542239945

   Hi @WencongLiu, sorry for the late response. I found time to look into you 
proposal now. The initial intention of FLINK-27204 was to have the async 
functionality being hidden in `FileSystemBasedJobResultStore`. The 
`FileSystemBasedJobResultStore` performs IO operations for most of the 
interface's methods which is considered blocking and should run in the 
`Dispatcher`'s main thread (as it does right now). To achieve that, we would 
have to migrate all `JobResultStore` methods to become asynchronous:
   | pre-FLINK-27204 JobResultStore | post-FLINK-27204 JobResultStore |
   |-|---|
   | void createDirtyResult(JobResultEntry) throws ... | 
CompletableFuture createDirtyResultAsync(JobResultEntry) |
   | void markResultAsClean(JobID) throws ... | CompletableFuture 
markResultAsCleanAsync(JobID) |
   | boolean hasJobResultEntry(JobID) throws ... | CompletableFuture 
hasJobResultEntryAsync(JobID) |
   | boolean hasDirtyJobResultEntry(JobID) throws ... | 
CompletableFuture hasDirtyJobResultEntryAsync(JobID) |
   | boolean hasCleanJobResultEntry(JobID) throws ... | 
CompletableFuture hasCleanJobResultEntryAsync(JobID) |
   | Set getDirtyResults(JobID) throws ... | 
CompletableFuture> getDirtyResultsAsync() |
   
   The `FileSystemBasedJobResultStore` would get an constructor parameter 
`ioExecutor` which then would be used to run the async calls. Your current 
proposal doesn't specify a executorService to run the CompletableFutures on. 
Additionally, we need to utilize the CompletableFutures whereever possible 
(instead of calling `.get()` rightaway). Calling `.get()` on the 
`CompletableFuture` makes the call blocking again (which is what we want to 
avoid).
   
   Does that sound reasonable to you? Let me know if you have more questions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32047) Fix args in JobSpec not being passed through to Flink in Standalone mode - 1.4.0

2023-05-10 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721353#comment-17721353
 ] 

Gyula Fora commented on FLINK-32047:


Does this still affect the current main branch? Could you please check?

> Fix args in JobSpec not being passed through to Flink in Standalone mode - 
> 1.4.0
> 
>
> Key: FLINK-32047
> URL: https://issues.apache.org/jira/browse/FLINK-32047
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Gil Shmaya
>Priority: Major
> Attachments: image-2023-04-30-18-54-22-291.png, 
> image-2023-04-30-19-56-30-150.png, image-2023-04-30-19-56-57-680.png
>
>
> This issue is related to a previously fixed bug in version 1.2.0 -  
> FLINK-29388
> I have noticed that while the args are successfully being passed when using 
> version 1.2.0, this is not the case with version 1.4.0.
> {+}Scenario{+}:
> I added a log that prints the argument array length at the beginning of the 
> main  function of the flink job:
> !image-2023-04-30-18-54-22-291.png!
> The result when running with 1.2.0:
> !image-2023-04-30-19-56-30-150.png!
> The result when running with 1.4.0:
> !image-2023-04-30-19-56-57-680.png!
> h4.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32047) Fix args in JobSpec not being passed through to Flink in Standalone mode - 1.4.0

2023-05-10 Thread Gil Shmaya (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gil Shmaya updated FLINK-32047:
---
Attachment: image-2023-04-30-19-56-30-150.png

> Fix args in JobSpec not being passed through to Flink in Standalone mode - 
> 1.4.0
> 
>
> Key: FLINK-32047
> URL: https://issues.apache.org/jira/browse/FLINK-32047
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Gil Shmaya
>Priority: Major
> Attachments: image-2023-04-30-18-54-22-291.png, 
> image-2023-04-30-19-56-30-150.png, image-2023-04-30-19-56-57-680.png
>
>
> This issue is related to a previously fixed bug in version 1.2.0 -  
> FLINK-29388
> I have noticed that while the args are successfully being passed when using 
> version 1.2.0, this is not the case with version 1.4.0.
> {+}Scenario{+}:
> I added a log that prints the argument array length at the beginning of the 
> main  function of the flink job:
> !image-2023-04-30-18-54-22-291.png!
> The result when running with 1.2.0:
> !image-2023-04-30-19-56-30-150.png!
> The result when running with 1.4.0:
> !image-2023-04-30-19-56-57-680.png!
> h4.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32047) Fix args in JobSpec not being passed through to Flink in Standalone mode - 1.4.0

2023-05-10 Thread Gil Shmaya (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gil Shmaya updated FLINK-32047:
---
Attachment: image-2023-04-30-18-54-22-291.png

> Fix args in JobSpec not being passed through to Flink in Standalone mode - 
> 1.4.0
> 
>
> Key: FLINK-32047
> URL: https://issues.apache.org/jira/browse/FLINK-32047
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Gil Shmaya
>Priority: Major
> Attachments: image-2023-04-30-18-54-22-291.png, 
> image-2023-04-30-19-56-30-150.png, image-2023-04-30-19-56-57-680.png
>
>
> This issue is related to a previously fixed bug in version 1.2.0 -  
> FLINK-29388
> I have noticed that while the args are successfully being passed when using 
> version 1.2.0, this is not the case with version 1.4.0.
> {+}Scenario{+}:
> I added a log that prints the argument array length at the beginning of the 
> main  function of the flink job:
> !image-2023-04-30-18-54-22-291.png!
> The result when running with 1.2.0:
> !image-2023-04-30-19-56-30-150.png!
> The result when running with 1.4.0:
> !image-2023-04-30-19-56-57-680.png!
> h4.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32047) Fix args in JobSpec not being passed through to Flink in Standalone mode - 1.4.0

2023-05-10 Thread Gil Shmaya (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gil Shmaya updated FLINK-32047:
---
Attachment: image-2023-04-30-19-56-57-680.png

> Fix args in JobSpec not being passed through to Flink in Standalone mode - 
> 1.4.0
> 
>
> Key: FLINK-32047
> URL: https://issues.apache.org/jira/browse/FLINK-32047
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Gil Shmaya
>Priority: Major
> Attachments: image-2023-04-30-18-54-22-291.png, 
> image-2023-04-30-19-56-30-150.png, image-2023-04-30-19-56-57-680.png
>
>
> This issue is related to a previously fixed bug in version 1.2.0 -  
> FLINK-29388
> I have noticed that while the args are successfully being passed when using 
> version 1.2.0, this is not the case with version 1.4.0.
> {+}Scenario{+}:
> I added a log that prints the argument array length at the beginning of the 
> main  function of the flink job:
> !image-2023-04-30-18-54-22-291.png!
> The result when running with 1.2.0:
> !image-2023-04-30-19-56-30-150.png!
> The result when running with 1.4.0:
> !image-2023-04-30-19-56-57-680.png!
> h4.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32047) Fix args in JobSpec not being passed through to Flink in Standalone mode - 1.4.0

2023-05-10 Thread Gil Shmaya (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gil Shmaya updated FLINK-32047:
---
Description: 
This issue is related to a previously fixed bug in version 1.2.0 -  FLINK-29388

I have noticed that while the args are successfully being passed when using 
version 1.2.0, this is not the case with version 1.4.0.

{+}Scenario{+}:

I added a log that prints the argument array length at the beginning of the 
main  function of the flink job:
!image-2023-04-30-18-54-22-291.png!

The result when running with 1.2.0:
!image-2023-04-30-19-56-30-150.png!

The result when running with 1.4.0:
!image-2023-04-30-19-56-57-680.png!
h4.  

  was:
This issue is related to a previously fixed bug in version 1.2.0 -  
[FLINK-29388|https://issues.apache.org/jira/browse/FLINK-29388]

I have noticed that while the args are successfully being passed when using 
version 1.2.0, this is not the case with version 1.4.0.

{+}Scenario{+}:

I added a log that prints the argument array length at the beginning of the 
main  function of the flink job:
!image-2023-04-30-18-54-22-291.png|width=659,height=102!

The result when running with 1.2.0:
!image-2023-04-30-19-56-30-150.png!

The result when running with 1.4.0:
!image-2023-04-30-19-56-57-680.png!
h4.


> Fix args in JobSpec not being passed through to Flink in Standalone mode - 
> 1.4.0
> 
>
> Key: FLINK-32047
> URL: https://issues.apache.org/jira/browse/FLINK-32047
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Gil Shmaya
>Priority: Major
>
> This issue is related to a previously fixed bug in version 1.2.0 -  
> FLINK-29388
> I have noticed that while the args are successfully being passed when using 
> version 1.2.0, this is not the case with version 1.4.0.
> {+}Scenario{+}:
> I added a log that prints the argument array length at the beginning of the 
> main  function of the flink job:
> !image-2023-04-30-18-54-22-291.png!
> The result when running with 1.2.0:
> !image-2023-04-30-19-56-30-150.png!
> The result when running with 1.4.0:
> !image-2023-04-30-19-56-57-680.png!
> h4.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32047) Fix args in JobSpec not being passed through to Flink in Standalone mode - 1.4.0

2023-05-10 Thread Gil Shmaya (Jira)
Gil Shmaya created FLINK-32047:
--

 Summary: Fix args in JobSpec not being passed through to Flink in 
Standalone mode - 1.4.0
 Key: FLINK-32047
 URL: https://issues.apache.org/jira/browse/FLINK-32047
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Reporter: Gil Shmaya


This issue is related to a previously fixed bug in version 1.2.0 -  
[FLINK-29388|https://issues.apache.org/jira/browse/FLINK-29388]

I have noticed that while the args are successfully being passed when using 
version 1.2.0, this is not the case with version 1.4.0.

{+}Scenario{+}:

I added a log that prints the argument array length at the beginning of the 
main  function of the flink job:
!image-2023-04-30-18-54-22-291.png|width=659,height=102!

The result when running with 1.2.0:
!image-2023-04-30-19-56-30-150.png!

The result when running with 1.4.0:
!image-2023-04-30-19-56-57-680.png!
h4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] tamirsagi commented on pull request #585: [FLINK-32005] Add a per-deployment error metric to signal about potential issues

2023-05-10 Thread via GitHub


tamirsagi commented on PR #585:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/585#issuecomment-1542196243

   > Do you think having a "scaling" counter would be useful as well?
   
   Hey @mxm
   
   I'd like to suggest an improvement here, let me know what you think.
   
   The operator supports pluggable `FlinkResourceListener`  which provides the 
events & deployment status. However, such interface does not expose 
MetricManager/any way to create custom meters. 
   Which means that if users would like to create custom metrics per 
deployments there is no way to attach it via operator metric manager.
   
   I see there are some basic metrics created per namespace.
   
   My suggestion is to expose either operator metric manager directly via 
FlinkResourceListener or another entity which provides a way to create meters 
and internally registers them via MetricManager.
   
   WDYT?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #22548: [FLINK-32039][test] Adds graceful shutdown to TestExecutorExtension and TestExecutorResource

2023-05-10 Thread via GitHub


zentol commented on code in PR #22548:
URL: https://github.com/apache/flink/pull/22548#discussion_r1189890956


##
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/executor/TestExecutorExtension.java:
##
@@ -47,8 +58,26 @@ public T getExecutor() {
 
 @Override
 public void afterAll(ExtensionContext context) throws Exception {
+gracefulShutdown(executorService, LOG);
+}
+
+static void gracefulShutdown(@Nullable ExecutorService executorService, 
Logger logger) {

Review Comment:
   > failing with an AssertionError if there are still Runnables that have been 
processed by the executor
   
   That'd be quite annoying when using a scheduled executor service, no? That 
will some send a runnable at some point in the future.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #22548: [FLINK-32039][test] Adds graceful shutdown to TestExecutorExtension and TestExecutorResource

2023-05-10 Thread via GitHub


zentol commented on code in PR #22548:
URL: https://github.com/apache/flink/pull/22548#discussion_r1189890956


##
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/executor/TestExecutorExtension.java:
##
@@ -47,8 +58,26 @@ public T getExecutor() {
 
 @Override
 public void afterAll(ExtensionContext context) throws Exception {
+gracefulShutdown(executorService, LOG);
+}
+
+static void gracefulShutdown(@Nullable ExecutorService executorService, 
Logger logger) {

Review Comment:
   > failing with an AssertionError if there are still Runnables that have been 
processed by the executor
   
   That'd be quite annoying when using a scheduler executor service, no?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on pull request #22560: [FLINK-32023][API / DataStream] Support negative Duration for special usage, such as -1ms for execution.buffer-timeout

2023-05-10 Thread via GitHub


1996fanrui commented on PR #22560:
URL: https://github.com/apache/flink/pull/22560#issuecomment-1542186653

   Thanks @zentol 's feedback here.
   
   > What about adding a boolean config named execution.buffer-timeout.enabled 
which default value is true. When it is false, flushing only when the output 
buffer is full. At the same time, the execution.buffer-timeout's value will be 
ignored. 
   
   Sounds make sense.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

2023-05-10 Thread via GitHub


reswqa commented on PR #22432:
URL: https://github.com/apache/flink/pull/22432#issuecomment-1542165300

   Thanks @pnowojski for the patient review and very helpful suggestion!   All 
fix-up commits has been squashed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] JunRuiLee commented on a diff in pull request #22555: [FLINK-31706] [runtime] The default source parallelism should be the same as ex…

2023-05-10 Thread via GitHub


JunRuiLee commented on code in PR #22555:
URL: https://github.com/apache/flink/pull/22555#discussion_r1189846990


##
flink-core/src/main/java/org/apache/flink/configuration/BatchExecutionOptions.java:
##
@@ -99,7 +99,7 @@ public class BatchExecutionOptions {
 public static final ConfigOption 
ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM =
 
key("execution.batch.adaptive.auto-parallelism.default-source-parallelism")
 .intType()
-.defaultValue(1)
+.defaultValue(0)
 .withDeprecatedKeys(

Review Comment:
   Why do you need to change the default value? I think a better way is through 
 
`configuration.getOptional(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM))`
   to determine whether the configuration item has been configured



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java:
##
@@ -84,21 +85,28 @@ private DefaultVertexParallelismAndInputInfosDecider(
 int globalMaxParallelism,
 int globalMinParallelism,
 MemorySize dataVolumePerTask,
-int globalDefaultSourceParallelism) {
+int globalDefaultSourceParallelism,
+int defaultExecutionParallelism) {
 
 checkArgument(globalMinParallelism > 0, "The minimum parallelism must 
be larger than 0.");
 checkArgument(
 globalMaxParallelism >= globalMinParallelism,
 "Maximum parallelism should be greater than or equal to the 
minimum parallelism.");
 checkArgument(
-globalDefaultSourceParallelism > 0,
-"The default source parallelism must be larger than 0.");
+globalDefaultSourceParallelism >= 0,
+"The default source parallelism must be greater than or equal 
to 0.");
 checkNotNull(dataVolumePerTask);
+checkArgument(
+defaultExecutionParallelism > 0,
+"The default execution parallelism must be larger than 0.");
 

Review Comment:
   We do not require that the default parallelism must be greater than 0



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java:
##
@@ -546,7 +553,7 @@ static DefaultVertexParallelismAndInputInfosDecider from(
 configuration.get(
 
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_AVG_DATA_VOLUME_PER_TASK),
 configuration.get(
-BatchExecutionOptions
-
.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM));
+
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM),
+configuration.get(CoreOptions.DEFAULT_PARALLELISM));
 }

Review Comment:
   parallelism.default should be obtained from ExecutionConfig instead of 
jobmaster Configuration



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java:
##
@@ -84,21 +85,28 @@ private DefaultVertexParallelismAndInputInfosDecider(
 int globalMaxParallelism,
 int globalMinParallelism,
 MemorySize dataVolumePerTask,
-int globalDefaultSourceParallelism) {
+int globalDefaultSourceParallelism,
+int defaultExecutionParallelism) {
 

Review Comment:
   Maybe we can determine the value of source parallelism externally without 
touching this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora merged pull request #589: [hotfix][docs] Fix flink version in doc and yaml

2023-05-10 Thread via GitHub


gyfora merged PR #589:
URL: https://github.com/apache/flink-kubernetes-operator/pull/589


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #590: [FLINK-32012] Rely on savepoint mechanism when performing rollback with saveoint upgrade mode

2023-05-10 Thread via GitHub


gyfora commented on code in PR #590:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/590#discussion_r1189847280


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##
@@ -384,6 +384,12 @@ private boolean shouldRollBack(
 return false;
 }
 
+if (resource.getSpec().getJob() != null
+&& resource.getSpec().getJob().getUpgradeMode() == 
UpgradeMode.SAVEPOINT) {
+// HA data is not available during rollback for savepoint upgrade 
mode
+return true;
+}

Review Comment:
   This check is not fully correct. Just because we started from a savepoint if 
the jobmanager started succesfully HA metadata would be available even if the 
job could not recover after the upgrade. 
   
   Instead of simply checkping for savepoint upgrade mode we also have to check 
`FlinkUtils.jmPodNeverStarted(ctx)` to be completely safe.



##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##
@@ -270,17 +270,13 @@ protected void rollback(FlinkResourceContext ctx) 
throws Exception {
 
 UpgradeMode upgradeMode = resource.getSpec().getJob().getUpgradeMode();
 
-cancelJob(
-ctx,
-upgradeMode == UpgradeMode.STATELESS
-? UpgradeMode.STATELESS
-: UpgradeMode.LAST_STATE);
+cancelJob(ctx, upgradeMode);

Review Comment:
   Why did you change this? We don't want to use a savepoint cancel during 
rollback



##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##
@@ -338,6 +339,10 @@ protected void cancelJob(
 } else if 
(ReconciliationUtils.isJobInTerminalState(deploymentStatus)) {
 LOG.info(
 "Job is already in terminal state skipping 
cancel-with-savepoint operation.");
+} else if 
(deployment.getStatus().getReconciliationStatus().getState()
+== ReconciliationState.ROLLING_BACK) {
+LOG.info(
+"Job reconciliation status is in rolling back 
state. Job is expecting to not be in running or terminal state");

Review Comment:
   We can't simply ignore errors here because the cluster may not have shut 
down. Please see my other comment, I dont think we should use savepoint cancel 
ever during rollbaack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-web] MartijnVisser merged pull request #637: Add GCP Pubsub v3.0.1 for Flink 1.16.x and Flink 1.17.x

2023-05-10 Thread via GitHub


MartijnVisser merged PR #637:
URL: https://github.com/apache/flink-web/pull/637


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on pull request #22394: [FLINK-31780][component=Runtime/Coordination] Allow users to disable 'Ensemble tracking' feature for ZooKeeper Curator framework

2023-05-10 Thread via GitHub


XComp commented on PR #22394:
URL: https://github.com/apache/flink/pull/22394#issuecomment-1542112096

   CI is still failing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #22548: [FLINK-32039][test] Adds graceful shutdown to TestExecutorExtension and TestExecutorResource

2023-05-10 Thread via GitHub


XComp commented on code in PR #22548:
URL: https://github.com/apache/flink/pull/22548#discussion_r1189818630


##
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/executor/TestExecutorExtension.java:
##
@@ -47,8 +58,26 @@ public T getExecutor() {
 
 @Override
 public void afterAll(ExtensionContext context) throws Exception {
+gracefulShutdown(executorService, LOG);
+}
+
+static void gracefulShutdown(@Nullable ExecutorService executorService, 
Logger logger) {

Review Comment:
   Good point. The rational behind this change was trying to make sure that any 
resources (that are handled by the threads) are actually cleaned up. What do 
you think about calling `shutdownNow` and failing with an `AssertionError` if 
there are still `Runnables` that have been processed by the executor? That way, 
we force everyone to handle hanging threads within the test implementation 
:thinking: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] JunRuiLee commented on pull request #22555: [FLINK-31706] [runtime] The default source parallelism should be the same as ex…

2023-05-10 Thread via GitHub


JunRuiLee commented on PR #22555:
URL: https://github.com/apache/flink/pull/22555#issuecomment-1542095484

   @clownxc Thanks for creating this pr! I'm curious why you don't fallback the 
value of globalDefaultSourceParallelism to parallelism.default if necessary 
outside the constructor of DefaultVertexParallelismAndInputInfosDecider? This 
change will be much cleaner.
   In addition, we do not require that parallelism.default must be a positive 
number. On the contrary, before version 1.17, users need to configure 
parallelism.default = -1 to enable concurrency derivation. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Myracle commented on pull request #22560: [FLINK-32023][API / DataStream] Support negative Duration for special usage, such as -1ms for execution.buffer-timeout

2023-05-10 Thread via GitHub


Myracle commented on PR #22560:
URL: https://github.com/apache/flink/pull/22560#issuecomment-1542091391

   @zentol I agree with you for that the change may affect other usages. What 
about adding a boolean config named execution.buffer-timeout.enabled which 
default value is true. When it is false, flushing only when the output buffer 
is full. At the same time, the execution.buffer-timeout's value will be 
ignored. cc @1996fanrui 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] ashangit commented on pull request #590: [FLINK-32012] Rely on savepoint mechanism when performing rollback with saveoint upgrade mode

2023-05-10 Thread via GitHub


ashangit commented on PR #590:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/590#issuecomment-1542063039

   @gyfora please review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32012) Operator failed to rollback due to missing HA metadata

2023-05-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32012:
---
Labels: pull-request-available  (was: )

> Operator failed to rollback due to missing HA metadata
> --
>
> Key: FLINK-32012
> URL: https://issues.apache.org/jira/browse/FLINK-32012
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.4.0
>Reporter: Nicolas Fraison
>Priority: Major
>  Labels: pull-request-available
>
> The operator has well detected that the job was failing and initiate the 
> rollback but this rollback has failed due to `Rollback is not possible due to 
> missing HA metadata`
> We are relying on saevpoint upgrade mode and zookeeper HA.
> The operator is performing a set of action to also delete this HA data in 
> savepoint upgrade mode:
>  * [flink-kubernetes-operator/AbstractFlinkService.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L346]
>  : Suspend job with savepoint and deleteClusterDeployment
>  * [flink-kubernetes-operator/StandaloneFlinkService.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java#L158]
>  : Remove JM + TM deployment and delete HA data
>  * [flink-kubernetes-operator/AbstractFlinkService.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L1008]
>  : Wait cluster shutdown and delete zookeeper HA data
>  * [flink-kubernetes-operator/FlinkUtils.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java#L155]
>  : Remove all child znode
> Then when running rollback the operator is looking for HA data even if we 
> rely on sevepoint upgrade mode:
>  * [flink-kubernetes-operator/AbstractFlinkResourceReconciler.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L164]
>  Perform reconcile of rollback if it should rollback
>  * [flink-kubernetes-operator/AbstractFlinkResourceReconciler.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L387]
>  Rollback failed as HA data is not available
>  * [flink-kubernetes-operator/FlinkUtils.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java#L220]
>  Check if some child znodes are available
> For both step the pattern looks to be the same for kubernetes HA so it 
> doesn't looks to be linked to a bug with zookeeper.
>  
> From https://issues.apache.org/jira/browse/FLINK-30305 it looks to be 
> expected that the HA data has been deleted (as it is also performed by flink 
> when relying on savepoint upgrade mode).
> Still the use case seems to differ from 
> https://issues.apache.org/jira/browse/FLINK-30305 as the operator is aware of 
> the failure and treat a specific rollback event.
> So I'm wondering why we enforce such a check when performing rollback if we 
> rely on savepoint upgrade mode. Would it be fine to not rely on the HA data 
> and rollback from the last savepoint (the one we used in the deployment step)?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] ashangit opened a new pull request, #590: [FLINK-32012] Rely on savepoint mechanism when performing rollback with saveoint upgrade mode

2023-05-10 Thread via GitHub


ashangit opened a new pull request, #590:
URL: https://github.com/apache/flink-kubernetes-operator/pull/590

   
   
   ## What is the purpose of the change
   
   Operator failed to rollback due to missing HA data.
   This is due to data being removed by flink job when relying on savepoint 
upgrade mode.
   
   As performed by upgrade mechanism we should rely on savepoint to do rollback 
and not on HA data
   
   ## Brief change log
   
   During savepoint rollback the HA data is not mandatory. Do not ensure it is 
available when rollbacking:
   - during check if shouldRollback
   - during restore of the job
   
   Also enforce deletion of the HA data as performed during upgrade mechanism 
for savepoint upgrade mode
   
   ## Verifying this change
   
   
   - This change is covered by change performed in 
RollbackTest.testStatefulRollback.SAVEPOINT
   The flinkService now return false for haDataAvailable as if the HA data was 
deleted during upgrade phase and not recreated due to failure during upgrade 
phase.
   Still the rollback reconcile work fine and the job has well taken in account 
the last savepoint.
   
   - Also tested on one of our flinkdeployment
 - redeploy flinkdeployment with increase of parallelism work fine
 - then redeploy the flinkdeployment with a service account which didn't 
have access to s3 savepoint path. The operator successfully rollbacked to 
previous working state
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
no
 - Core observer or reconciler logic that is regularly executed: yes for 
rollback event with savepoint upgrade mode
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32046) OOM caused by SplitAssignmentTracker.uncheckpointedAssignments

2023-05-10 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-32046:
---
Component/s: Connectors / Common
 (was: API / Core)

> OOM caused by SplitAssignmentTracker.uncheckpointedAssignments
> --
>
> Key: FLINK-32046
> URL: https://issues.apache.org/jira/browse/FLINK-32046
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Reporter: Peter Vary
>Priority: Major
>
> If the checkpointing is turned off then the 
> {{SplitAssignmentTracker.uncheckpointedAssignments}} is never cleared and 
> grows indefinitely. Eventually leading to OOM.
> The only other place which would remove elements from this map is 
> {{{}getAndRemoveUncheckpointedAssignment{}}}, but it is only for failure 
> scenarios.
> By my understanding this problem exists since the introduction of the new 
> {{Source}} implementation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-aws] Samrat002 commented on a diff in pull request #47: [FLINK-30481][FLIP-277] GlueCatalog Implementation

2023-05-10 Thread via GitHub


Samrat002 commented on code in PR #47:
URL: 
https://github.com/apache/flink-connector-aws/pull/47#discussion_r1189670987


##
flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AwsClientFactory.java:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.util;
+
+import org.apache.flink.configuration.ReadableConfig;
+
+import software.amazon.awssdk.services.glue.GlueClient;
+
+/**
+ * Interface to customize AWS clients used by Flink. A custom factory must 
have a no-arg.
+ * constructor, and use {@link #initialize(ReadableConfig)} to initialize the 
factory.
+ */
+public interface AwsClientFactory {

Review Comment:
   No , removed entire class and file



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a diff in pull request #22506: [FLINK-31890][runtime] Introduce SchedulerBase per-task failure enrichment/labeling

2023-05-10 Thread via GitHub


zhuzhurk commented on code in PR #22506:
URL: https://github.com/apache/flink/pull/22506#discussion_r1189766188


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##
@@ -473,26 +500,50 @@ public CompletableFuture cancel(Time 
timeout) {
 @Override
 public CompletableFuture updateTaskExecutionState(
 final TaskExecutionState taskExecutionState) {
-FlinkException taskExecutionException;
+checkNotNull(taskExecutionState, "taskExecutionState");
+// Use the main/caller thread for all updates to make sure they are 
processed in order.
+// (MainThreadExecutor i.e., the akka thread pool does not guarantee 
that)
+// Only detach for a FAILED state update that is terminal and may 
perform io heavy labeling.
+if 
(ExecutionState.FAILED.equals(taskExecutionState.getExecutionState())) {
+return labelFailure(taskExecutionState)
+.thenApplyAsync(
+taskStateWithLabels -> {
+try {
+return 
doUpdateTaskExecutionState(taskStateWithLabels);
+} catch (FlinkException e) {
+throw new CompletionException(e);
+}
+},
+getMainThreadExecutor());
+}
 try {
-checkNotNull(taskExecutionState, "taskExecutionState");
+return CompletableFuture.completedFuture(
+doUpdateTaskExecutionState(taskExecutionState));
+} catch (FlinkException e) {
+return FutureUtils.completedExceptionally(e);
+}
+}
 
+private Acknowledge doUpdateTaskExecutionState(final TaskExecutionState 
taskExecutionState)
+throws FlinkException {
+@Nullable FlinkException taskExecutionException;
+try {
 if (schedulerNG.updateTaskExecutionState(taskExecutionState)) {

Review Comment:
   Thanks for updating the PR. @pgaref 
   I'm actually not against that restart strategies use failure labels. I'm 
just hope that we can avoid some complication if possible. Totally +1 to the 
concern about the risk to change some synchronous process into asynchronous 
ones. But given that it is not avoidable, considering labeling failure from 
with the JM, I prefer to do it in a common path. Absolutely later we need to 
carefully review and test it.
   Sorry that I did not have time to review it right now. I may take a look 
tomorrow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

2023-05-10 Thread via GitHub


reswqa commented on PR #22432:
URL: https://github.com/apache/flink/pull/22432#issuecomment-1542002743

   Thanks for the quick reply. I have squashed all previous fix-up commits and 
pushed a new commit to address latest comment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31383) Add support for documenting additionProperties of the REST API payloads.

2023-05-10 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-31383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Morávek updated FLINK-31383:
--
Description: 
For implementing the request and response body of the resource requirements 
endpoint, we need to be able to document "additionalProperties" because these 
payloads have only top-level dynamic properties of the same type.

This affects both classic (HTML docs) and OpenAPI generators.

An example of what we want to be able to document is:
{code:java}
@JsonAnySetter
@JsonAnyGetter
@JsonSerialize(keyUsing = JobVertexIDKeySerializer.class)
@JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class)
private final Map 
jobVertexResourceRequirements;{code}

  was:
For implementing the request and response body of the resource requirements 
endpoint, we need to be able to document "additionalProperties" because these 
payloads have only top-level dynamic properties of the same type.

 

An example of what we want to be able to document is:
{code:java}
@JsonAnySetter
@JsonAnyGetter
@JsonSerialize(keyUsing = JobVertexIDKeySerializer.class)
@JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class)
private final Map 
jobVertexResourceRequirements;{code}


> Add support for documenting additionProperties of the REST API payloads.
> 
>
> Key: FLINK-31383
> URL: https://issues.apache.org/jira/browse/FLINK-31383
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Runtime / REST
>Reporter: David Morávek
>Assignee: David Morávek
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> For implementing the request and response body of the resource requirements 
> endpoint, we need to be able to document "additionalProperties" because these 
> payloads have only top-level dynamic properties of the same type.
> This affects both classic (HTML docs) and OpenAPI generators.
> An example of what we want to be able to document is:
> {code:java}
> @JsonAnySetter
> @JsonAnyGetter
> @JsonSerialize(keyUsing = JobVertexIDKeySerializer.class)
> @JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class)
> private final Map 
> jobVertexResourceRequirements;{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

2023-05-10 Thread via GitHub


reswqa commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1189760303


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java:
##
@@ -996,6 +1002,88 @@ public void testCanEmitBatchOfRecords() throws Exception {
 }
 }
 
+@Test
+public void testTaskSideOutputStatistics() throws Exception {
+TaskMetricGroup taskMetricGroup =
+UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+ResultPartitionWriter[] partitionWriters = new 
ResultPartitionWriter[3];
+for (int i = 0; i < partitionWriters.length; ++i) {
+partitionWriters[i] =
+new RecordOrEventCollectingResultPartitionWriter<>(
+new ArrayDeque<>(),
+new StreamElementSerializer<>(
+
BasicTypeInfo.INT_TYPE_INFO.createSerializer(
+new ExecutionConfig(;
+partitionWriters[i].setup();
+}
+
+try (StreamTaskMailboxTestHarness testHarness =
+new StreamTaskMailboxTestHarnessBuilder<>(
+OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
+.addInput(BasicTypeInfo.INT_TYPE_INFO)
+.addAdditionalOutput(partitionWriters)
+.setupOperatorChain(new OperatorID(), new 
OddEvenOperator())
+
.chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+.setOperatorFactory(SimpleOperatorFactory.of(new 
OddEvenOperator()))
+.addNonChainedOutputsCount(
+new OutputTag<>("odd", 
BasicTypeInfo.INT_TYPE_INFO), 2)
+.addNonChainedOutputsCount(1)
+.build()
+
.chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+.setOperatorFactory(SimpleOperatorFactory.of(new 
DuplicatingOperator()))
+.addNonChainedOutputsCount(1)
+.build()
+.finish()
+.setTaskMetricGroup(taskMetricGroup)
+.build()) {
+Counter numRecordsInCounter =
+
taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
+Counter numRecordsOutCounter =
+
taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+
+final int numEvenRecords = 5;
+final int numOddRecords = 3;
+
+for (int x = 0; x < numEvenRecords; x++) {
+testHarness.processElement(new StreamRecord<>(2 * x));
+}
+
+for (int x = 0; x < numOddRecords; x++) {
+testHarness.processElement(new StreamRecord<>(2 * x + 1));
+}
+assertEquals(numOddRecords + numEvenRecords, 
numRecordsInCounter.getCount());
+assertEquals(
+numOddRecords
++ (numOddRecords + numEvenRecords)
++ (numOddRecords + numEvenRecords) * 2,
+numRecordsOutCounter.getCount());

Review Comment:
   > If the above is correct, is this construct officially supported behaviour 
of Flink? Can user create such pipelines? If not, I would either remove the 
first operator, or replace it with just a no-op/pass-through operator that just 
forwards input records. 
   
   Fair enough! This is indeed not a conventional pipeline, I have replaced the 
first operator by a `PassThroughOperator`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

2023-05-10 Thread via GitHub


reswqa commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1189760303


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java:
##
@@ -996,6 +1002,88 @@ public void testCanEmitBatchOfRecords() throws Exception {
 }
 }
 
+@Test
+public void testTaskSideOutputStatistics() throws Exception {
+TaskMetricGroup taskMetricGroup =
+UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+ResultPartitionWriter[] partitionWriters = new 
ResultPartitionWriter[3];
+for (int i = 0; i < partitionWriters.length; ++i) {
+partitionWriters[i] =
+new RecordOrEventCollectingResultPartitionWriter<>(
+new ArrayDeque<>(),
+new StreamElementSerializer<>(
+
BasicTypeInfo.INT_TYPE_INFO.createSerializer(
+new ExecutionConfig(;
+partitionWriters[i].setup();
+}
+
+try (StreamTaskMailboxTestHarness testHarness =
+new StreamTaskMailboxTestHarnessBuilder<>(
+OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
+.addInput(BasicTypeInfo.INT_TYPE_INFO)
+.addAdditionalOutput(partitionWriters)
+.setupOperatorChain(new OperatorID(), new 
OddEvenOperator())
+
.chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+.setOperatorFactory(SimpleOperatorFactory.of(new 
OddEvenOperator()))
+.addNonChainedOutputsCount(
+new OutputTag<>("odd", 
BasicTypeInfo.INT_TYPE_INFO), 2)
+.addNonChainedOutputsCount(1)
+.build()
+
.chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+.setOperatorFactory(SimpleOperatorFactory.of(new 
DuplicatingOperator()))
+.addNonChainedOutputsCount(1)
+.build()
+.finish()
+.setTaskMetricGroup(taskMetricGroup)
+.build()) {
+Counter numRecordsInCounter =
+
taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
+Counter numRecordsOutCounter =
+
taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+
+final int numEvenRecords = 5;
+final int numOddRecords = 3;
+
+for (int x = 0; x < numEvenRecords; x++) {
+testHarness.processElement(new StreamRecord<>(2 * x));
+}
+
+for (int x = 0; x < numOddRecords; x++) {
+testHarness.processElement(new StreamRecord<>(2 * x + 1));
+}
+assertEquals(numOddRecords + numEvenRecords, 
numRecordsInCounter.getCount());
+assertEquals(
+numOddRecords
++ (numOddRecords + numEvenRecords)
++ (numOddRecords + numEvenRecords) * 2,
+numRecordsOutCounter.getCount());

Review Comment:
   > If the above is correct, is this construct officially supported behaviour 
of Flink? Can user create such pipelines? 
   Fair enough! This is indeed not a conventional pipeline, I have replaced the 
first operator by a `PassThroughOperator`.
   



##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java:
##
@@ -996,6 +1002,88 @@ public void testCanEmitBatchOfRecords() throws Exception {
 }
 }
 
+@Test
+public void testTaskSideOutputStatistics() throws Exception {
+TaskMetricGroup taskMetricGroup =
+UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+ResultPartitionWriter[] partitionWriters = new 
ResultPartitionWriter[3];
+for (int i = 0; i < partitionWriters.length; ++i) {
+partitionWriters[i] =
+new RecordOrEventCollectingResultPartitionWriter<>(
+new ArrayDeque<>(),
+new StreamElementSerializer<>(
+
BasicTypeInfo.INT_TYPE_INFO.createSerializer(
+new ExecutionConfig(;
+partitionWriters[i].setup();
+}
+
+try (StreamTaskMailboxTestHarness testHarness =
+new StreamTaskMailboxTestHarnessBuilder<>(
+OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
+.addInput(BasicTypeInfo.INT_TYPE_INFO)
+.addAdditionalOutput(partitionWriters)
+

[jira] [Updated] (FLINK-32046) OOM caused by SplitAssignmentTracker.uncheckpointedAssignments

2023-05-10 Thread Peter Vary (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Peter Vary updated FLINK-32046:
---
Description: 
If the checkpointing is turned off then the 
{{SplitAssignmentTracker.uncheckpointedAssignments}} is never cleared and grows 
indefinitely. Eventually leading to OOM.

The only other place which would remove elements from this map is 
{{{}getAndRemoveUncheckpointedAssignment{}}}, but it is only for failure 
scenarios.

By my understanding this problem exists since the introduction of the new 
{{Source}} implementation.

  was:
If the checkpointing is turned off then the 
{{SplitAssignmentTracker.uncheckpointedAssignments}} is never cleared and grows 
indefinitely. Eventually leading to OOM.

The only other place which would remove elements from this map is 
{{{}getAndRemoveUncheckpointedAssignment{}}}, but it is only for failure 
scenarios.

By my understanding this problem exists since the introduction of the new 
source code.


> OOM caused by SplitAssignmentTracker.uncheckpointedAssignments
> --
>
> Key: FLINK-32046
> URL: https://issues.apache.org/jira/browse/FLINK-32046
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Reporter: Peter Vary
>Priority: Major
>
> If the checkpointing is turned off then the 
> {{SplitAssignmentTracker.uncheckpointedAssignments}} is never cleared and 
> grows indefinitely. Eventually leading to OOM.
> The only other place which would remove elements from this map is 
> {{{}getAndRemoveUncheckpointedAssignment{}}}, but it is only for failure 
> scenarios.
> By my understanding this problem exists since the introduction of the new 
> {{Source}} implementation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >