Re: [PR] [FLINK-33776] [Test] only close the cluster once to make the test `ClientHeartbeatTest` faster [flink]

2023-12-11 Thread via GitHub


TestBoost commented on PR #23910:
URL: https://github.com/apache/flink/pull/23910#issuecomment-1851472434

   Hi reswqa,
   
   Thank you so much for your reply! I just changed the title but I don't know 
if it's suitable.
   
   Yes, right now the test cluster is started before every test method. 
However, there is no need to start per-test because these tests are not 
affecting each other and we can test it just on the same cluster.
   
   Thanks,
   TestBoost


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33057] Add options to disable creating job-id subdirectories under the checkpoint directory [flink]

2023-12-11 Thread via GitHub


Myasuka commented on code in PR #23509:
URL: https://github.com/apache/flink/pull/23509#discussion_r1423537299


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java:
##
@@ -72,6 +73,7 @@ public MemoryBackendCheckpointStorageAccess(
 JobID jobId,
 @Nullable Path checkpointsBaseDirectory,
 @Nullable Path defaultSavepointLocation,
+boolean createCheckpointSubDirs,

Review Comment:
   How about introducing another constructor to make `createCheckpointSubDirs` 
as default `true`? If so, we can avoid touching many test code in this PR.



##
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java:
##
@@ -141,6 +141,12 @@ public class FsStateBackend extends 
AbstractFileStateBackend implements Configur
  */
 private final int writeBufferSize;
 
+/**
+ * Switch to create checkpoint sub-directory with name of jobId. A value 
of 'undefined' means
+ * not yet configured, in which case the default will be used.
+ */
+private TernaryBoolean createCheckpointSubDirs = TernaryBoolean.UNDEFINED;

Review Comment:
   I wonder do we really need to introduce an undefined Boolean here in such 
way.
   Currently, we only have one way to set this value within a private 
constructor:
   ~~~java
   this.createCheckpointSubDirs =
   original.createCheckpointSubDirs.resolveUndefined(
   configuration.get(CheckpointingOptions.CREATE_CHECKPOINT_SUB_DIS));
   ~~~
   
   However, there is no public constructor or setter to set the 
`createCheckpointSubDirs` in the `original` one, that is to say we will always 
get a `TernaryBoolean.UNDEFINED` `createCheckpointSubDirs` in the original.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33736][Scheduler] Update default value of exponential-delay.max-backoff and exponential-delay.backoff-multiplier [flink]

2023-12-11 Thread via GitHub


flinkbot commented on PR #23911:
URL: https://github.com/apache/flink/pull/23911#issuecomment-1851449909

   
   ## CI report:
   
   * 4f20956497f2d5b1255419eb5f31d03737c9a597 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32895][Scheduler] Introduce the max attempts for Exponential Delay Restart Strategy [flink]

2023-12-11 Thread via GitHub


1996fanrui commented on code in PR #23247:
URL: https://github.com/apache/flink/pull/23247#discussion_r1423553344


##
flink-core/src/main/java/org/apache/flink/configuration/RestartStrategyOptions.java:
##
@@ -184,7 +184,7 @@ public class RestartStrategyOptions {
 public static final ConfigOption 
RESTART_STRATEGY_EXPONENTIAL_DELAY_BACKOFF_MULTIPLIER =
 
ConfigOptions.key("restart-strategy.exponential-delay.backoff-multiplier")
 .doubleType()
-.defaultValue(2.0)
+.defaultValue(1.2)

Review Comment:
   The default value related commit is migrated to a new [separate 
PR](https://github.com/apache/flink/pull/23911), we can follow it there, thanks~
   
   Also, if this PR(FLINK-32895) no other comments in 72 hours, I will merge 
it, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33736) Update default value of exponential-delay.max-backoff and exponential-delay.backoff-multiplier

2023-12-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33736:
---
Labels: pull-request-available  (was: )

> Update default value of exponential-delay.max-backoff and 
> exponential-delay.backoff-multiplier
> --
>
> Key: FLINK-33736
> URL: https://issues.apache.org/jira/browse/FLINK-33736
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Update default value of exponential-delay.max-backoff from 5min to 1min.
> Update default value of exponential-delay.backoff-multiplier from 2.0 to 1.2.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33714) Update documentation about the usage of RuntimeContext#getExecutionConfig

2023-12-11 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo closed FLINK-33714.
--
Resolution: Fixed

master(1.19) via 3531998adad20f3904d1421a35a69fef44b2b69f.

> Update documentation about the usage of RuntimeContext#getExecutionConfig
> -
>
> Key: FLINK-33714
> URL: https://issues.apache.org/jira/browse/FLINK-33714
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Affects Versions: 1.19.0
>Reporter: Junrui Li
>Assignee: Junrui Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33736][Scheduler] Update default value of exponential-delay.max-backoff and exponential-delay.backoff-multiplier [flink]

2023-12-11 Thread via GitHub


1996fanrui opened a new pull request, #23911:
URL: https://github.com/apache/flink/pull/23911

   ## What is the purpose of the change
   
   See [FLIP-364: Improve the exponential-delay 
restart-strategy](https://cwiki.apache.org/confluence/x/uJqzDw) . This PR  
includes the subtask1 of FLIP-364.
   
   ## Brief change log
   
   - [FLINK-33736][Scheduler] Update default value of 
exponential-delay.max-backoff and exponential-delay.backoff-multiplier
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? Java doc and config doc are 
updated, and updated a little official web doc related to 
`restart-strategy.exponential-delay.attempts-before-reset-backoff`.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33714) Update documentation about the usage of RuntimeContext#getExecutionConfig

2023-12-11 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-33714:
---
Affects Version/s: 1.19.0

> Update documentation about the usage of RuntimeContext#getExecutionConfig
> -
>
> Key: FLINK-33714
> URL: https://issues.apache.org/jira/browse/FLINK-33714
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Affects Versions: 1.19.0
>Reporter: Junrui Li
>Assignee: Junrui Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33714) Update documentation about the usage of RuntimeContext#getExecutionConfig

2023-12-11 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-33714:
---
Fix Version/s: 1.19.0

> Update documentation about the usage of RuntimeContext#getExecutionConfig
> -
>
> Key: FLINK-33714
> URL: https://issues.apache.org/jira/browse/FLINK-33714
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Junrui Li
>Assignee: Junrui Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33714][doc] Update the documentation about the usage of RuntimeContext#getExecutionConfig. [flink]

2023-12-11 Thread via GitHub


reswqa merged PR #23906:
URL: https://github.com/apache/flink/pull/23906


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32895][Scheduler] Introduce the max attempts for Exponential Delay Restart Strategy [flink]

2023-12-11 Thread via GitHub


1996fanrui commented on code in PR #23247:
URL: https://github.com/apache/flink/pull/23247#discussion_r1423542955


##
flink-core/src/main/java/org/apache/flink/configuration/RestartStrategyOptions.java:
##
@@ -184,7 +184,7 @@ public class RestartStrategyOptions {
 public static final ConfigOption 
RESTART_STRATEGY_EXPONENTIAL_DELAY_BACKOFF_MULTIPLIER =
 
ConfigOptions.key("restart-strategy.exponential-delay.backoff-multiplier")
 .doubleType()
-.defaultValue(2.0)
+.defaultValue(1.2)

Review Comment:
   Thanks @zhuzhurk for the quick feedback!
   
   I have updated it in the `dev` mail list. Let us wait for more feedback 
for a while. Also, I pause FLINK-33736 first. Therefore, I will remove the 
first commit from this PR and continue working on the rest of the JIRA for 
FLIP-364.
   
   [1] https://lists.apache.org/thread/6glz0d57r8gtpzq4f71vf9066c5x6nyw



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-26585) State Processor API: Loading a state set buffers the whole state set in memory before starting to process

2023-12-11 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795627#comment-17795627
 ] 

Hangxiang Yu commented on FLINK-26585:
--

[~jingge] I saw 1.18.1 is releasing.

Could this PR be involved in 1.18.1 or next 1.18.2 ? 

It could help a lot in the case [~czchen] mentioned.

> State Processor API: Loading a state set buffers the whole state set in 
> memory before starting to process
> -
>
> Key: FLINK-26585
> URL: https://issues.apache.org/jira/browse/FLINK-26585
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor
>Affects Versions: 1.13.0, 1.14.0, 1.15.0
>Reporter: Matthias Schwalbe
>Assignee: Matthias Schwalbe
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
> Attachments: MultiStateKeyIteratorNoStreams.java
>
>
> * When loading a state, MultiStateKeyIterator load and bufferes the whole 
> state in memory before it event processes a single data point 
>  ** This is absolutely no problem for small state (hence the unit tests work 
> fine)
>  ** MultiStateKeyIterator ctor sets up a java Stream that iterates all state 
> descriptors and flattens all datapoints contained within
>  ** The java.util.stream.Stream#flatMap function causes the buffering of the 
> whole data set when enumerated later on
>  ** See call stack [1] 
>  *** I our case this is 150e6 data points (> 1GiB just for the pointers to 
> the data, let alone the data itself ~30GiB)
>  ** I’m not aware of some instrumentation of Stream in order to avoid the 
> problem, hence
>  ** I coded an alternative implementation of MultiStateKeyIterator that 
> avoids using java Stream,
>  ** I can contribute our implementation (MultiStateKeyIteratorNoStreams)
> [1]
> Streams call stack:
> hasNext:77, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> next:82, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> forEachRemaining:116, Iterator (java.util)
> forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)
> forEach:580, ReferencePipeline$Head (java.util.stream)
> accept:270, ReferencePipeline$7$1 (java.util.stream)  
>  #  Stream flatMap(final Function Stream> var1)
> accept:373, ReferencePipeline$11$1 (java.util.stream) 
>  # Stream peek(final Consumer var1)
> accept:193, ReferencePipeline$3$1 (java.util.stream)      
>  #  Stream map(final Function 
> var1)
> tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)
> lambda$initPartialTraversalState$0:294, 
> StreamSpliterators$WrappingSpliterator (java.util.stream)
> getAsBoolean:-1, 1528195520 
> (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)
> fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)
> hasNext:681, Spliterators$1Adapter (java.util)
> hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input)
> hasNext:162, KeyedStateReaderOperator$NamespaceDecorator 
> (org.apache.flink.state.api.input.operator)
> reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input)
> invoke:191, DataSourceTask (org.apache.flink.runtime.operators)
> doRun:776, Task (org.apache.flink.runtime.taskmanager)
> run:563, Task (org.apache.flink.runtime.taskmanager)
> run:748, Thread (java.lang)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-11 Thread via GitHub


gyfora commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1423504221


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -222,6 +222,18 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .withDescription(
 "Processing rate increase threshold for detecting 
ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% 
of the desired capacity increase with scaling, the action is marked 
ineffective.");
 
+public static final ConfigOption GC_PRESSURE_THRESHOLD =
+autoScalerConfig("memory.gc-pressure.threshold")
+.doubleType()
+.defaultValue(0.3)
+.withDescription("Max allowed GC pressure during scaling 
operations");
+
+public static final ConfigOption HEAP_USAGE_THRESHOLD =
+autoScalerConfig("memory.heap-usage.threshold")
+.doubleType()
+.defaultValue(0.9)

Review Comment:
   I see your point about the heap usage with a large number of TMs and it 
makes sense. I don't think most jobs would be affected by this issue but we 
could increase the default threshold to 95% to be on the safe side.
   
   Regarding the follow up, please go ahead with the ticket :) and feel free to 
work on it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30702] Add Elasticsearch dialect [flink-connector-jdbc]

2023-12-11 Thread via GitHub


grzegorz8 commented on code in PR #67:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/67#discussion_r1420783379


##
flink-connector-jdbc/pom.xml:
##
@@ -38,10 +38,12 @@ under the License.
2.12
2.12.7
3.23.1
+   2.15.2
42.5.1
21.8.0.0
418
1.12.10
+   8.11.1

Review Comment:
   @MartijnVisser Hi! What should be done to ensure if adding this dependency 
(even as "provided") is allowed? The same question for enabling Elastic trial 
for testing purposes.
   Should I create a ticket somewhere? Write to elastic_lice...@elastic.co? 
What do you suggest? Or should we already close the PR and corresponding ticket 
due to incompatible license?
   
   The license is not listed on Category X: 
https://www.apache.org/legal/resolved.html#category-x (nor in any other). 
Assuming Elastic License is "Category X", I think we are still safe, since we 
do not violate the following rules mentioned in the document: _"They may not be 
distributed"_ and _"You may rely on them when they support an optional 
feature"_.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32993][table] Datagen connector handles length-constrained fields according to the schema definition by default [flink]

2023-12-11 Thread via GitHub


liyubin117 commented on code in PR #23678:
URL: https://github.com/apache/flink/pull/23678#discussion_r1423393030


##
flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java:
##
@@ -297,10 +295,104 @@ void testVariableLengthDataGeneration() throws Exception 
{
 anyCauseMatches(
 ValidationException.class,
 String.format(
-"Only supports specifying '%s' option 
for variable-length types (varchar, string, varbinary, bytes). The type of 
field %s is not within this range.",
+"Only supports specifying '%s' option 
for variable-length types (VARCHAR/STRING/VARBINARY/BYTES). The type of field 
'%s' is not within this range.",
 
DataGenConnectorOptions.FIELD_VAR_LEN.key(), "f4")));
 }
 
+@Test
+void testVariableLengthDataType() throws Exception {
+DescriptorProperties descriptor = new DescriptorProperties();
+final int rowsNumber = 200;
+descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
+descriptor.putLong(DataGenConnectorOptions.NUMBER_OF_ROWS.key(), 
rowsNumber);
+
+List results = runGenerator(LENGTH_CONSTRAINED_SCHEMA, 
descriptor);
+assertThat(results).hasSize(rowsNumber);
+
+for (RowData row : results) {
+assertThat(row.getString(2).toString()).hasSize(30);
+assertThat(row.getBinary(3)).hasSize(20);
+assertThat(row.getString(4).toString())
+
.hasSize(RandomGeneratorVisitor.RANDOM_STRING_LENGTH_DEFAULT);
+}
+
+descriptor.putString(
+DataGenConnectorOptionsUtil.FIELDS + ".f2." + 
DataGenConnectorOptionsUtil.KIND,
+DataGenConnectorOptionsUtil.RANDOM);
+descriptor.putLong(
+DataGenConnectorOptionsUtil.FIELDS + ".f2." + 
DataGenConnectorOptionsUtil.LENGTH,
+25);
+descriptor.putString(
+DataGenConnectorOptionsUtil.FIELDS + ".f4." + 
DataGenConnectorOptionsUtil.KIND,
+DataGenConnectorOptionsUtil.RANDOM);
+descriptor.putLong(
+DataGenConnectorOptionsUtil.FIELDS + ".f4." + 
DataGenConnectorOptionsUtil.LENGTH,
+);
+
+results = runGenerator(LENGTH_CONSTRAINED_SCHEMA, descriptor);
+
+for (RowData row : results) {
+assertThat(row.getString(2).toString()).hasSize(25);
+assertThat(row.getString(4).toString()).hasSize();
+}
+
+assertThatThrownBy(
+() -> {
+descriptor.putString(
+DataGenConnectorOptionsUtil.FIELDS
++ ".f3."
++ DataGenConnectorOptionsUtil.KIND,
+DataGenConnectorOptionsUtil.RANDOM);
+descriptor.putLong(
+DataGenConnectorOptionsUtil.FIELDS
++ ".f3."
++ 
DataGenConnectorOptionsUtil.LENGTH,
+21);
+
+runGenerator(LENGTH_CONSTRAINED_SCHEMA, 
descriptor);
+})
+.satisfies(
+anyCauseMatches(
+ValidationException.class,
+"Custom length '21' for variable-length type 
(VARCHAR/STRING/VARBINARY/BYTES) field 'f3' should be shorter than '20' defined 
in the schema."));
+}
+
+@Test
+void testFixedLengthDataType() throws Exception {
+DescriptorProperties descriptor = new DescriptorProperties();
+final int rowsNumber = 200;
+descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
+descriptor.putLong(DataGenConnectorOptions.NUMBER_OF_ROWS.key(), 
rowsNumber);
+
+List results = runGenerator(LENGTH_CONSTRAINED_SCHEMA, 
descriptor);
+assertThat(results).hasSize(rowsNumber);
+
+for (RowData row : results) {
+assertThat(row.getString(0).toString()).hasSize(50);
+assertThat(row.getBinary(1)).hasSize(40);
+}
+
+assertThatThrownBy(
+() -> {
+descriptor.putString(
+DataGenConnectorOptionsUtil.FIELDS
++ ".f0."
++ DataGenConnectorOptionsUtil.KIND,
+DataGenConnectorOptionsUtil.RANDOM);
+descriptor.putLong(
+DataGenConnectorOptionsUtil.FIELDS
+ 

Re: [PR] [FLINK-32895][Scheduler] Introduce the max attempts for Exponential Delay Restart Strategy [flink]

2023-12-11 Thread via GitHub


zhuzhurk commented on code in PR #23247:
URL: https://github.com/apache/flink/pull/23247#discussion_r1423468479


##
flink-core/src/main/java/org/apache/flink/configuration/RestartStrategyOptions.java:
##
@@ -184,7 +184,7 @@ public class RestartStrategyOptions {
 public static final ConfigOption 
RESTART_STRATEGY_EXPONENTIAL_DELAY_BACKOFF_MULTIPLIER =
 
ConfigOptions.key("restart-strategy.exponential-delay.backoff-multiplier")
 .doubleType()
-.defaultValue(2.0)
+.defaultValue(1.2)

Review Comment:
   1.5 sounds good to me. Yet I think we need to send the updates to the FLIP 
discussion ML and open another vote for it, in case it turned out to be a 
surprise for those who have reviewed and voted on the FLIP.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32895][Scheduler] Introduce the max attempts for Exponential Delay Restart Strategy [flink]

2023-12-11 Thread via GitHub


zhuzhurk commented on code in PR #23247:
URL: https://github.com/apache/flink/pull/23247#discussion_r1423468479


##
flink-core/src/main/java/org/apache/flink/configuration/RestartStrategyOptions.java:
##
@@ -184,7 +184,7 @@ public class RestartStrategyOptions {
 public static final ConfigOption 
RESTART_STRATEGY_EXPONENTIAL_DELAY_BACKOFF_MULTIPLIER =
 
ConfigOptions.key("restart-strategy.exponential-delay.backoff-multiplier")
 .doubleType()
-.defaultValue(2.0)
+.defaultValue(1.2)

Review Comment:
   1.5 sounds good to me. Yet I think we need to send the updates to the FLIP 
discussion ML and open another vote for it. In case it turned out to be a 
surprise for those who have reviewed and voted on the FLIP.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33676] Implement RestoreTests for WindowAggregate [flink]

2023-12-11 Thread via GitHub


xuyangzhong commented on code in PR #23886:
URL: https://github.com/apache/flink/pull/23886#discussion_r1423461099


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateEventTimeRestoreTest.java:
##
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Restore tests for {@link StreamExecWindowAggregate}. */
+public class WindowAggregateEventTimeRestoreTest extends RestoreTestBase {
+
+public WindowAggregateEventTimeRestoreTest() {
+super(StreamExecWindowAggregate.class);
+}
+
+@Override
+public List programs() {
+return Arrays.asList(
+WindowAggregateTestPrograms.GROUP_TUMBLE_WINDOW_EVENT_TIME,

Review Comment:
   Just wonder why add prefix 'GROUP_' each test?



##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateTestPrograms.java:
##
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.utils.AggregatePhaseStrategy;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+import java.math.BigDecimal;
+
+/** {@link TableTestProgram} definitions for testing {@link 
StreamExecWindowAggregate}. */
+public class WindowAggregateTestPrograms {
+
+static final Row[] BEFORE_DATA = {
+Row.of("2020-10-10 00:00:01", 1, 1d, 1f, new BigDecimal("1.11"), "Hi", 
"a"),
+Row.of("2020-10-10 00:00:02", 2, 2d, 2f, new BigDecimal("2.22"), 
"Comment#1", "a"),
+Row.of("2020-10-10 00:00:03", 2, 2d, 2f, new BigDecimal("2.22"), 
"Comment#1", "a"),
+Row.of("2020-10-10 00:00:04", 5, 5d, 5f, new BigDecimal("5.55"), null, 
"a"),
+Row.of("2020-10-10 00:00:07", 3, 3d, 3f, null, "Hello", "b"),
+// out of order
+Row.of("2020-10-10 00:00:06", 6, 6d, 6f, new BigDecimal("6.66"), "Hi", 
"b"),
+Row.of("2020-10-10 00:00:08", 3, null, 3f, new BigDecimal("3.33"), 
"Comment#2", "a"),
+// late event
+Row.of("2020-10-10 00:00:04", 5, 5d, null, new BigDecimal("5.55"), 
"Hi", "a"),
+Row.of("2020-10-10 00:00:16", 4, 4d, 4f, new BigDecimal("4.44"), "Hi", 
"b"),
+Row.of("2020-10-10 00:00:32", 7, 7d, 7f, new BigDecimal("7.77"), null, 
null),
+Row.of("2020-10-10 00:00:34", 1, 3d, 3f, new BigDecimal("3.33"), 
"Comment#3", "b")
+};
+
+static final Row[] AFTER_DATA = {
+Row.of("2020-10-10 00:00:40", 10, 3d, 3f, new BigDecimal("4.44"), 
"Comment#4", "a"),
+Row.of("2020-10-10 00:00:42", 11, 4d, 4f, new BigDecimal("5.44"), 
"Comment#5", "d"),
+Row.of("2020-10-10 00:00:43", 12, 5d, 5f, new BigDecimal("6.44"), 
"Comment#6", "c"),
+Row.of("2020-10-10 00:00:44", 13, 6d, 6f, new BigDecimal("7.44"), 
"Comment#7", "d")
+};
+
+static final SourceTestStep SOURCE =
+  

[jira] [Comment Edited] (FLINK-33780) Support to store default catalog in CatalogStore

2023-12-11 Thread Yubin Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795574#comment-17795574
 ] 

Yubin Li edited comment on FLINK-33780 at 12/12/23 5:42 AM:


[~hackergin] Hi, we could make modifitions in the `CatalogManager` construct 
function as follows:

!image-2023-12-12-11-09-53-075.png|width=798,height=406!

DefaultCatalog is the current catalog of a session including SqlGateway, 
`default_catalog` is like `d2` under the context `use catalog d2`, the only 
difference is that it is created implicitly by Flink, we should treat them 
equally for semantic consistency, WDYT?

!image-2023-12-12-13-42-04-762.png|width=422,height=184!


was (Author: liyubin117):
[~hackergin] Hi, we could make modifitions in the `CatalogManager` construct 
function as follows:

!image-2023-12-12-11-09-53-075.png|width=1165,height=593!

DefaultCatalog is the current catalog of a session including SqlGateway, 
`default_catalog` is like `d2` under the context `use catalog d2`, the only 
difference is that it is created implicitly by Flink, we should treat them 
equally for semantic consistency, WDYT?

> Support to store default catalog in CatalogStore
> 
>
> Key: FLINK-33780
> URL: https://issues.apache.org/jira/browse/FLINK-33780
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.19.0
>Reporter: Yubin Li
>Priority: Major
> Attachments: image-2023-12-11-13-47-29-623.png, 
> image-2023-12-11-14-14-10-002.png, image-2023-12-12-11-09-53-075.png, 
> image-2023-12-12-13-42-04-762.png
>
>
> Flink initially creates a default catalog which is included in the 
> `Map catalogs`, but is not stored in the CatalogStore.
> After conducting thorough investigation, I've determined that the necessary 
> modification can be made within the `CatalogManager`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] only close the cluster once to make the tests faster [flink]

2023-12-11 Thread via GitHub


flinkbot commented on PR #23910:
URL: https://github.com/apache/flink/pull/23910#issuecomment-1851322851

   
   ## CI report:
   
   * b0fd64a2a24dc1cd50ea2fb5ae4f3d4036e2b85a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] only close the cluster once to make the tests faster [flink]

2023-12-11 Thread via GitHub


TestBoost opened a new pull request, #23910:
URL: https://github.com/apache/flink/pull/23910

   
   
   ## What is the purpose of the change
   
   This change is to make `miniCluster.close()` run only once after all tests 
finish and make the tests in test class `ClientHeartbeatTest` run faster. There 
are three tests in this test class. They are trying to submit different jobs 
through the cluster and assert the running status of `miniCluster` or the job 
status of submitted job. These tests are submitting jobs that are not related 
to each other. We also try to run these tests in different orders and they 
always pass.
   
   Besides, test class `ClientHeartbeatTest` is similar to the test class 
`org.apache.flink.runtime.leaderelection.LeaderChangeClusterComponentsTest`. 
However, test class 
   `LeaderChangeClusterComponentsTest` only close the `miniCluster` after all 
tests finish. It is a good choice to apply similar approach to the test class 
`ClientHeartbeatTest`.
   
   The test runtime can change from `9.9113 s` to `8.9771 s` after applying 
this change.
   
   ## Brief change log
   
   - This pull request changes the code such that `miniCluster.close()` run 
once after all tests finish rather than run after every test.
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   This pull request is just modifying tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32895][Scheduler] Introduce the max attempts for Exponential Delay Restart Strategy [flink]

2023-12-11 Thread via GitHub


1996fanrui commented on code in PR #23247:
URL: https://github.com/apache/flink/pull/23247#discussion_r1423360642


##
flink-core/src/main/java/org/apache/flink/configuration/RestartStrategyOptions.java:
##
@@ -184,7 +184,7 @@ public class RestartStrategyOptions {
 public static final ConfigOption 
RESTART_STRATEGY_EXPONENTIAL_DELAY_BACKOFF_MULTIPLIER =
 
ConfigOptions.key("restart-strategy.exponential-delay.backoff-multiplier")
 .doubleType()
-.defaultValue(2.0)
+.defaultValue(1.2)

Review Comment:
   Thanks @mxm for the feedback!
   
   `1.5` is fine for me, and I'd like to cc @zhuzhurk who propose change these 
2 default value as well!
   
   Max and Mason have a little feedback after voting in the [user mail 
list](https://lists.apache.org/thread/6glz0d57r8gtpzq4f71vf9066c5x6nyw), and 
Max and I had a offline discussion yesterday. Max think the 1.2 is a little 
small or aggressive(delay time is too short). Here is the reason:
   
   - Every time the job restarts, it will make a bunch of calls to the 
Kubernetes API, e.g. read/write to config maps, create task managers.
   - When his producation had the default fixed-delay(1s) restart strategy 
turned on. A Kubernetes cluster became instable.
   
   Following is the relationship between restart-attempts and retry-delay-time:
   
   - The `delay-time` will reach 1 min after 12 attempts when 
`backoff-multiplier` is 1.5
   - The `delay-time` will reach 1 min after 24 attempts when 
`backoff-multiplier` is 1.2
   
   Hey @zhuzhurk , what do you think about setting`1.5` as the default value? 
If you agree it, I will update the FLIP and feedback it to the [user mail 
list](https://lists.apache.org/thread/6glz0d57r8gtpzq4f71vf9066c5x6nyw), and go 
ahead this PR.
   
   
![image](https://github.com/apache/flink/assets/38427477/642c57e0-b415-4326-af05-8b506c5fbb3a)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32993][table] Datagen connector handles length-constrained fields according to the schema definition by default [flink]

2023-12-11 Thread via GitHub


liyubin117 commented on code in PR #23678:
URL: https://github.com/apache/flink/pull/23678#discussion_r1423393030


##
flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java:
##
@@ -297,10 +295,104 @@ void testVariableLengthDataGeneration() throws Exception 
{
 anyCauseMatches(
 ValidationException.class,
 String.format(
-"Only supports specifying '%s' option 
for variable-length types (varchar, string, varbinary, bytes). The type of 
field %s is not within this range.",
+"Only supports specifying '%s' option 
for variable-length types (VARCHAR/STRING/VARBINARY/BYTES). The type of field 
'%s' is not within this range.",
 
DataGenConnectorOptions.FIELD_VAR_LEN.key(), "f4")));
 }
 
+@Test
+void testVariableLengthDataType() throws Exception {
+DescriptorProperties descriptor = new DescriptorProperties();
+final int rowsNumber = 200;
+descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
+descriptor.putLong(DataGenConnectorOptions.NUMBER_OF_ROWS.key(), 
rowsNumber);
+
+List results = runGenerator(LENGTH_CONSTRAINED_SCHEMA, 
descriptor);
+assertThat(results).hasSize(rowsNumber);
+
+for (RowData row : results) {
+assertThat(row.getString(2).toString()).hasSize(30);
+assertThat(row.getBinary(3)).hasSize(20);
+assertThat(row.getString(4).toString())
+
.hasSize(RandomGeneratorVisitor.RANDOM_STRING_LENGTH_DEFAULT);
+}
+
+descriptor.putString(
+DataGenConnectorOptionsUtil.FIELDS + ".f2." + 
DataGenConnectorOptionsUtil.KIND,
+DataGenConnectorOptionsUtil.RANDOM);
+descriptor.putLong(
+DataGenConnectorOptionsUtil.FIELDS + ".f2." + 
DataGenConnectorOptionsUtil.LENGTH,
+25);
+descriptor.putString(
+DataGenConnectorOptionsUtil.FIELDS + ".f4." + 
DataGenConnectorOptionsUtil.KIND,
+DataGenConnectorOptionsUtil.RANDOM);
+descriptor.putLong(
+DataGenConnectorOptionsUtil.FIELDS + ".f4." + 
DataGenConnectorOptionsUtil.LENGTH,
+);
+
+results = runGenerator(LENGTH_CONSTRAINED_SCHEMA, descriptor);
+
+for (RowData row : results) {
+assertThat(row.getString(2).toString()).hasSize(25);
+assertThat(row.getString(4).toString()).hasSize();
+}
+
+assertThatThrownBy(
+() -> {
+descriptor.putString(
+DataGenConnectorOptionsUtil.FIELDS
++ ".f3."
++ DataGenConnectorOptionsUtil.KIND,
+DataGenConnectorOptionsUtil.RANDOM);
+descriptor.putLong(
+DataGenConnectorOptionsUtil.FIELDS
++ ".f3."
++ 
DataGenConnectorOptionsUtil.LENGTH,
+21);
+
+runGenerator(LENGTH_CONSTRAINED_SCHEMA, 
descriptor);
+})
+.satisfies(
+anyCauseMatches(
+ValidationException.class,
+"Custom length '21' for variable-length type 
(VARCHAR/STRING/VARBINARY/BYTES) field 'f3' should be shorter than '20' defined 
in the schema."));
+}
+
+@Test
+void testFixedLengthDataType() throws Exception {
+DescriptorProperties descriptor = new DescriptorProperties();
+final int rowsNumber = 200;
+descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
+descriptor.putLong(DataGenConnectorOptions.NUMBER_OF_ROWS.key(), 
rowsNumber);
+
+List results = runGenerator(LENGTH_CONSTRAINED_SCHEMA, 
descriptor);
+assertThat(results).hasSize(rowsNumber);
+
+for (RowData row : results) {
+assertThat(row.getString(0).toString()).hasSize(50);
+assertThat(row.getBinary(1)).hasSize(40);
+}
+
+assertThatThrownBy(
+() -> {
+descriptor.putString(
+DataGenConnectorOptionsUtil.FIELDS
++ ".f0."
++ DataGenConnectorOptionsUtil.KIND,
+DataGenConnectorOptionsUtil.RANDOM);
+descriptor.putLong(
+DataGenConnectorOptionsUtil.FIELDS
+ 

Re: [PR] [hotfix][build] Release branch should use SNAPSHOT version [flink-connector-pulsar]

2023-12-11 Thread via GitHub


leonardBang merged PR #66:
URL: https://github.com/apache/flink-connector-pulsar/pull/66


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][build] Release branch should use SNAPSHOT version [flink-connector-pulsar]

2023-12-11 Thread via GitHub


leonardBang commented on PR #66:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/66#issuecomment-1851280119

   CI passed, merging...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32993][table] Datagen connector handles length-constrained fields according to the schema definition by default [flink]

2023-12-11 Thread via GitHub


liyubin117 commented on code in PR #23678:
URL: https://github.com/apache/flink/pull/23678#discussion_r1423385053


##
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java:
##
@@ -141,15 +143,15 @@ private DataGeneratorContainer createContainer(
 }
 
 private void validateFieldOptions(String name, DataType type, 
ReadableConfig options) {
-ConfigOption lenOption =
+ConfigOption varLenOption =
 key(DataGenConnectorOptionsUtil.FIELDS
 + "."
 + name
 + "."
 + DataGenConnectorOptionsUtil.VAR_LEN)
 .booleanType()
 .defaultValue(false);
-options.getOptional(lenOption)
+options.getOptional(varLenOption)

Review Comment:
   the prior name is not clear enough, so I use a new one instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32993][table] Datagen connector handles length-constrained fields according to the schema definition by default [flink]

2023-12-11 Thread via GitHub


LadyForest commented on code in PR #23678:
URL: https://github.com/apache/flink/pull/23678#discussion_r1423376727


##
flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java:
##
@@ -297,10 +295,104 @@ void testVariableLengthDataGeneration() throws Exception 
{
 anyCauseMatches(
 ValidationException.class,
 String.format(
-"Only supports specifying '%s' option 
for variable-length types (varchar, string, varbinary, bytes). The type of 
field %s is not within this range.",
+"Only supports specifying '%s' option 
for variable-length types (VARCHAR/STRING/VARBINARY/BYTES). The type of field 
'%s' is not within this range.",
 
DataGenConnectorOptions.FIELD_VAR_LEN.key(), "f4")));
 }
 
+@Test
+void testVariableLengthDataType() throws Exception {
+DescriptorProperties descriptor = new DescriptorProperties();
+final int rowsNumber = 200;
+descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
+descriptor.putLong(DataGenConnectorOptions.NUMBER_OF_ROWS.key(), 
rowsNumber);
+
+List results = runGenerator(LENGTH_CONSTRAINED_SCHEMA, 
descriptor);
+assertThat(results).hasSize(rowsNumber);
+
+for (RowData row : results) {
+assertThat(row.getString(2).toString()).hasSize(30);
+assertThat(row.getBinary(3)).hasSize(20);
+assertThat(row.getString(4).toString())
+
.hasSize(RandomGeneratorVisitor.RANDOM_STRING_LENGTH_DEFAULT);
+}
+
+descriptor.putString(
+DataGenConnectorOptionsUtil.FIELDS + ".f2." + 
DataGenConnectorOptionsUtil.KIND,
+DataGenConnectorOptionsUtil.RANDOM);
+descriptor.putLong(
+DataGenConnectorOptionsUtil.FIELDS + ".f2." + 
DataGenConnectorOptionsUtil.LENGTH,
+25);
+descriptor.putString(
+DataGenConnectorOptionsUtil.FIELDS + ".f4." + 
DataGenConnectorOptionsUtil.KIND,
+DataGenConnectorOptionsUtil.RANDOM);
+descriptor.putLong(
+DataGenConnectorOptionsUtil.FIELDS + ".f4." + 
DataGenConnectorOptionsUtil.LENGTH,
+);
+
+results = runGenerator(LENGTH_CONSTRAINED_SCHEMA, descriptor);
+
+for (RowData row : results) {
+assertThat(row.getString(2).toString()).hasSize(25);
+assertThat(row.getString(4).toString()).hasSize();
+}
+
+assertThatThrownBy(
+() -> {
+descriptor.putString(
+DataGenConnectorOptionsUtil.FIELDS
++ ".f3."
++ DataGenConnectorOptionsUtil.KIND,
+DataGenConnectorOptionsUtil.RANDOM);
+descriptor.putLong(
+DataGenConnectorOptionsUtil.FIELDS
++ ".f3."
++ 
DataGenConnectorOptionsUtil.LENGTH,
+21);
+
+runGenerator(LENGTH_CONSTRAINED_SCHEMA, 
descriptor);
+})
+.satisfies(
+anyCauseMatches(
+ValidationException.class,
+"Custom length '21' for variable-length type 
(VARCHAR/STRING/VARBINARY/BYTES) field 'f3' should be shorter than '20' defined 
in the schema."));
+}
+
+@Test
+void testFixedLengthDataType() throws Exception {
+DescriptorProperties descriptor = new DescriptorProperties();
+final int rowsNumber = 200;
+descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
+descriptor.putLong(DataGenConnectorOptions.NUMBER_OF_ROWS.key(), 
rowsNumber);
+
+List results = runGenerator(LENGTH_CONSTRAINED_SCHEMA, 
descriptor);
+assertThat(results).hasSize(rowsNumber);
+
+for (RowData row : results) {
+assertThat(row.getString(0).toString()).hasSize(50);
+assertThat(row.getBinary(1)).hasSize(40);
+}
+
+assertThatThrownBy(
+() -> {
+descriptor.putString(
+DataGenConnectorOptionsUtil.FIELDS
++ ".f0."
++ DataGenConnectorOptionsUtil.KIND,
+DataGenConnectorOptionsUtil.RANDOM);
+descriptor.putLong(
+DataGenConnectorOptionsUtil.FIELDS
+ 

Re: [PR] [FLINK-33781][table] Cleanup usage of deprecated TableConfig#ctor [flink]

2023-12-11 Thread via GitHub


liuyongvs commented on PR #23897:
URL: https://github.com/apache/flink/pull/23897#issuecomment-1851260132

   > LGTM, I have a minor comment: do we need to remove the deprecated 
constructor method `public TableConfig() {}` now?
   
   i think we should not remove it now, may some users or connectors also use 
it . we can remove it at flink 2.0, what do you think? @lsyldliu 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32895][Scheduler] Introduce the max attempts for Exponential Delay Restart Strategy [flink]

2023-12-11 Thread via GitHub


1996fanrui commented on code in PR #23247:
URL: https://github.com/apache/flink/pull/23247#discussion_r1423367354


##
flink-core/src/main/java/org/apache/flink/configuration/RestartStrategyOptions.java:
##
@@ -222,6 +223,19 @@ public class RestartStrategyOptions {
 code(RESTART_STRATEGY.key()), 
code("exponential-delay"))
 .build());
 
+@Documentation.OverrideDefault("infinite")
+public static final ConfigOption 
RESTART_STRATEGY_EXPONENTIAL_DELAY_ATTEMPTS =
+
ConfigOptions.key("restart-strategy.exponential-delay.attempts-before-reset-backoff")
+.intType()
+.defaultValue(Integer.MAX_VALUE)
+.withDescription(
+Description.builder()
+.text(
+"The number of times that Flink 
retries the execution before the job is declared as failed "
++ "before reset the 
backoff to its initial value if %s has been set to %s.",

Review Comment:
   Thanks @RocMarshal for the review! Updated~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-11 Thread via GitHub


reswqa commented on PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1851249351

   We should waiting for https://github.com/apache/flink/pull/23876 to be 
merged(This won't take long as it has already been approved by 3 votes). After 
that, CI should be able to pass.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33672] Use MapState.entries() instead of keys() and get() in over window [flink]

2023-12-11 Thread via GitHub


Zakelly commented on PR #23855:
URL: https://github.com/apache/flink/pull/23855#issuecomment-1851243905

   Thanks @fsk119 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-26586) FileSystem uses unbuffered read I/O

2023-12-11 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795577#comment-17795577
 ] 

Hangxiang Yu commented on FLINK-26586:
--

[~Matthias Schwalbe] Sure. Just feel free to contribute it.

Already assigned to you, please go ahead.

> FileSystem uses unbuffered read I/O
> ---
>
> Key: FLINK-26586
> URL: https://issues.apache.org/jira/browse/FLINK-26586
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor, Connectors / FileSystem, Runtime 
> / Checkpointing
>Affects Versions: 1.13.0, 1.14.0
>Reporter: Matthias Schwalbe
>Assignee: Matthias Schwalbe
>Priority: Major
> Attachments: BufferedFSDataInputStreamWrapper.java, 
> BufferedLocalFileSystem.java
>
>
> - I found out that, at least when using LocalFileSystem on a windows system, 
> read I/O to load a savepoint is unbuffered,
>  - See example stack [1]
>  - i.e. in order to load only a long in a serializer, it needs to go into 
> kernel mode 8 times and load the 8 bytes one by one
>  - I coded a BufferedFSDataInputStreamWrapper that allows to opt-in buffered 
> reads on any FileSystem implementation
>  - In our setting savepoint load is now 30 times faster
>  - I’ve once seen a Jira ticket as to improve savepoint load time in general 
> (lost the link unfortunately), maybe this approach can help with it
>  - not sure if HDFS has got the same problem
>  - I can contribute my implementation of a BufferedFSDataInputStreamWrapper 
> which can be integrated in any 
> [1] unbuffered reads stack:
> read:207, FileInputStream (java.io)
> read:68, LocalDataInputStream (org.apache.flink.core.fs.local)
> read:50, FSDataInputStreamWrapper (org.apache.flink.core.fs)
> read:42, ForwardingInputStream (org.apache.flink.runtime.util)
> readInt:390, DataInputStream (java.io)
> deserialize:80, BytePrimitiveArraySerializer 
> (org.apache.flink.api.common.typeutils.base.array)
> next:298, FullSnapshotRestoreOperation$KeyGroupEntriesIterator 
> (org.apache.flink.runtime.state.restore)
> next:273, FullSnapshotRestoreOperation$KeyGroupEntriesIterator 
> (org.apache.flink.runtime.state.restore)
> restoreKVStateData:147, RocksDBFullRestoreOperation 
> (org.apache.flink.contrib.streaming.state.restore)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-26586) FileSystem uses unbuffered read I/O

2023-12-11 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu reassigned FLINK-26586:


Assignee: Matthias Schwalbe

> FileSystem uses unbuffered read I/O
> ---
>
> Key: FLINK-26586
> URL: https://issues.apache.org/jira/browse/FLINK-26586
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor, Connectors / FileSystem, Runtime 
> / Checkpointing
>Affects Versions: 1.13.0, 1.14.0
>Reporter: Matthias Schwalbe
>Assignee: Matthias Schwalbe
>Priority: Major
> Attachments: BufferedFSDataInputStreamWrapper.java, 
> BufferedLocalFileSystem.java
>
>
> - I found out that, at least when using LocalFileSystem on a windows system, 
> read I/O to load a savepoint is unbuffered,
>  - See example stack [1]
>  - i.e. in order to load only a long in a serializer, it needs to go into 
> kernel mode 8 times and load the 8 bytes one by one
>  - I coded a BufferedFSDataInputStreamWrapper that allows to opt-in buffered 
> reads on any FileSystem implementation
>  - In our setting savepoint load is now 30 times faster
>  - I’ve once seen a Jira ticket as to improve savepoint load time in general 
> (lost the link unfortunately), maybe this approach can help with it
>  - not sure if HDFS has got the same problem
>  - I can contribute my implementation of a BufferedFSDataInputStreamWrapper 
> which can be integrated in any 
> [1] unbuffered reads stack:
> read:207, FileInputStream (java.io)
> read:68, LocalDataInputStream (org.apache.flink.core.fs.local)
> read:50, FSDataInputStreamWrapper (org.apache.flink.core.fs)
> read:42, ForwardingInputStream (org.apache.flink.runtime.util)
> readInt:390, DataInputStream (java.io)
> deserialize:80, BytePrimitiveArraySerializer 
> (org.apache.flink.api.common.typeutils.base.array)
> next:298, FullSnapshotRestoreOperation$KeyGroupEntriesIterator 
> (org.apache.flink.runtime.state.restore)
> next:273, FullSnapshotRestoreOperation$KeyGroupEntriesIterator 
> (org.apache.flink.runtime.state.restore)
> restoreKVStateData:147, RocksDBFullRestoreOperation 
> (org.apache.flink.contrib.streaming.state.restore)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33798) Automatically clean up rocksdb logs when the task failover.

2023-12-11 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795576#comment-17795576
 ] 

Hangxiang Yu commented on FLINK-33798:
--

Thanks [~fanrui] pinging me here.

I think you are right. The behavious after relocating is not consistent with 
before.

We could make it.

Thanks [~liming] for reporting this and we could go ahead.

 

> Automatically clean up rocksdb logs when the task failover.
> ---
>
> Key: FLINK-33798
> URL: https://issues.apache.org/jira/browse/FLINK-33798
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Ming Li
>Priority: Major
>
> Since FLINK-24785 relocates rocksdb log, multiple rocksdb logs will be 
> created under the flink log directory, but they are not cleaned up during 
> task failover, resulting in a large number of rocksdb logs under the flink 
> log directory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32895][Scheduler] Introduce the max attempts for Exponential Delay Restart Strategy [flink]

2023-12-11 Thread via GitHub


1996fanrui commented on code in PR #23247:
URL: https://github.com/apache/flink/pull/23247#discussion_r1423360642


##
flink-core/src/main/java/org/apache/flink/configuration/RestartStrategyOptions.java:
##
@@ -184,7 +184,7 @@ public class RestartStrategyOptions {
 public static final ConfigOption 
RESTART_STRATEGY_EXPONENTIAL_DELAY_BACKOFF_MULTIPLIER =
 
ConfigOptions.key("restart-strategy.exponential-delay.backoff-multiplier")
 .doubleType()
-.defaultValue(2.0)
+.defaultValue(1.2)

Review Comment:
   Thanks @mxm for the feedback!
   
   `1.5` is fine for me, and I'd like to cc @zhuzhurk who propose change these 
2 default value as well!
   
   Max and Mason have a little feedback after voting in the [user mail 
list](https://lists.apache.org/thread/6glz0d57r8gtpzq4f71vf9066c5x6nyw), and 
Max and I had a offline discussion yesterday. Max think the 1.2 is a little 
small or aggressive. Here is the reason:
   
   - Every time the job restarts, it will make a bunch of calls to the 
Kubernetes API, e.g. read/write to config maps, create task managers.
   - When his producation had the default fixed-delay(1s) restart strategy 
turned on. A Kubernetes cluster became instable.
   
   Following is the relationship between restart-attempts and retry-delay-time:
   
   - The `delay-time` will reach 1 min after 12 attempts when 
`backoff-multiplier` is 1.5
   - The `delay-time` will reach 1 min after 24 attempts when 
`backoff-multiplier` is 1.2
   
   Hey @zhuzhurk , what do you think about setting`1.5` as the default value? 
If you agree it, I will update the FLIP and feedback it to the [user mail 
list](https://lists.apache.org/thread/6glz0d57r8gtpzq4f71vf9066c5x6nyw), and go 
ahead this PR.
   
   
![image](https://github.com/apache/flink/assets/38427477/642c57e0-b415-4326-af05-8b506c5fbb3a)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-33780) Support to store default catalog in CatalogStore

2023-12-11 Thread Yubin Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795574#comment-17795574
 ] 

Yubin Li edited comment on FLINK-33780 at 12/12/23 3:22 AM:


[~hackergin] Hi, we could make modifitions in the `CatalogManager` construct 
function as follows:

!image-2023-12-12-11-09-53-075.png|width=1165,height=593!

DefaultCatalog is the current catalog of a session including SqlGateway, 
`default_catalog` is like `d2` under the context `use catalog d2`, the only 
difference is that it is created implicitly by Flink, we should treat them 
equally for semantic consistency, WDYT?


was (Author: liyubin117):
[~hackergin] Hi, we could make modifitions in the `CatalogManager` construct 
function as follows:

!image-2023-12-12-11-09-53-075.png|width=1165,height=593!

DefaultCatalog is the current catalog of a session including SqlGateway, 
`default_catalog` is like `d2` under the context `use catalog d2`, only the 
difference is that it is created implicitly by Flink, we should treat them 
equally for semantic consistency, WDYT?

> Support to store default catalog in CatalogStore
> 
>
> Key: FLINK-33780
> URL: https://issues.apache.org/jira/browse/FLINK-33780
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.19.0
>Reporter: Yubin Li
>Priority: Major
> Attachments: image-2023-12-11-13-47-29-623.png, 
> image-2023-12-11-14-14-10-002.png, image-2023-12-12-11-09-53-075.png
>
>
> Flink initially creates a default catalog which is included in the 
> `Map catalogs`, but is not stored in the CatalogStore.
> After conducting thorough investigation, I've determined that the necessary 
> modification can be made within the `CatalogManager`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33780) Support to store default catalog in CatalogStore

2023-12-11 Thread Yubin Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795574#comment-17795574
 ] 

Yubin Li edited comment on FLINK-33780 at 12/12/23 3:21 AM:


[~hackergin] Hi, we could make modifitions in the `CatalogManager` construct 
function as follows:

!image-2023-12-12-11-09-53-075.png|width=1165,height=593!

DefaultCatalog is the current catalog of a session including SqlGateway, 
`default_catalog` is like `d2` under the context `use catalog d2`, only the 
difference is that it is created implicitly by Flink, we should treat them 
equally for semantic consistency, WDYT?


was (Author: liyubin117):
[~hackergin] Hi, we could make modifitions in the `CatalogManager` construct 
function as follows:

!image-2023-12-12-11-09-53-075.png|width=1165,height=593!

DefaultCatalog is the current catalog of a session including SqlGateway, 
`default_catalog` is like `d2`, only the difference is that it is created 
implicitly by Flink, we should treat them equally for semantic consistency, 
WDYT?

!image-2023-12-11-14-14-10-002.png|width=805,height=565!

> Support to store default catalog in CatalogStore
> 
>
> Key: FLINK-33780
> URL: https://issues.apache.org/jira/browse/FLINK-33780
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.19.0
>Reporter: Yubin Li
>Priority: Major
> Attachments: image-2023-12-11-13-47-29-623.png, 
> image-2023-12-11-14-14-10-002.png, image-2023-12-12-11-09-53-075.png
>
>
> Flink initially creates a default catalog which is included in the 
> `Map catalogs`, but is not stored in the CatalogStore.
> After conducting thorough investigation, I've determined that the necessary 
> modification can be made within the `CatalogManager`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33780) Support to store default catalog in CatalogStore

2023-12-11 Thread Yubin Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795574#comment-17795574
 ] 

Yubin Li commented on FLINK-33780:
--

[~hackergin] Hi, we could make modifitions in the `CatalogManager` construct 
function as follows:

!image-2023-12-12-11-09-53-075.png|width=1165,height=593!

DefaultCatalog is the current catalog of a session including SqlGateway, 
`default_catalog` is like `d2`, only the difference is that it is created 
implicitly by Flink, we should treat them equally for semantic consistency, 
WDYT?

!image-2023-12-11-14-14-10-002.png|width=805,height=565!

> Support to store default catalog in CatalogStore
> 
>
> Key: FLINK-33780
> URL: https://issues.apache.org/jira/browse/FLINK-33780
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.19.0
>Reporter: Yubin Li
>Priority: Major
> Attachments: image-2023-12-11-13-47-29-623.png, 
> image-2023-12-11-14-14-10-002.png, image-2023-12-12-11-09-53-075.png
>
>
> Flink initially creates a default catalog which is included in the 
> `Map catalogs`, but is not stored in the CatalogStore.
> After conducting thorough investigation, I've determined that the necessary 
> modification can be made within the `CatalogManager`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33716) Cleanup the usage of deprecated StreamTableEnvironment#createTemporaryView

2023-12-11 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan reassigned FLINK-33716:
-

Assignee: Jacky Lau

> Cleanup the usage of deprecated StreamTableEnvironment#createTemporaryView
> --
>
> Key: FLINK-33716
> URL: https://issues.apache.org/jira/browse/FLINK-33716
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Jane Chan
>Assignee: Jacky Lau
>Priority: Major
>
> {code:java}
> ExpressionTestBase
> HiveTableSinkITCase
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33716) Cleanup the usage of deprecated StreamTableEnvironment#createTemporaryView

2023-12-11 Thread Jane Chan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795573#comment-17795573
 ] 

Jane Chan commented on FLINK-33716:
---

[~jackylau] assigned to you.

> Cleanup the usage of deprecated StreamTableEnvironment#createTemporaryView
> --
>
> Key: FLINK-33716
> URL: https://issues.apache.org/jira/browse/FLINK-33716
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Jane Chan
>Assignee: Jacky Lau
>Priority: Major
>
> {code:java}
> ExpressionTestBase
> HiveTableSinkITCase
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33717) Cleanup the usage of deprecated StreamTableEnvironment#fromDataStream

2023-12-11 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan reassigned FLINK-33717:
-

Assignee: Jacky Lau

> Cleanup the usage of deprecated StreamTableEnvironment#fromDataStream
> -
>
> Key: FLINK-33717
> URL: https://issues.apache.org/jira/browse/FLINK-33717
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Jane Chan
>Assignee: Jacky Lau
>Priority: Major
>
> {code:java}
> PythonScalarFunctionOperatorTestBase
> AvroTypesITCase {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33717) Cleanup the usage of deprecated StreamTableEnvironment#fromDataStream

2023-12-11 Thread Jane Chan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795572#comment-17795572
 ] 

Jane Chan commented on FLINK-33717:
---

Hi, [~jackylau]. Thanks for the interest, assigned to you.

> Cleanup the usage of deprecated StreamTableEnvironment#fromDataStream
> -
>
> Key: FLINK-33717
> URL: https://issues.apache.org/jira/browse/FLINK-33717
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Jane Chan
>Priority: Major
>
> {code:java}
> PythonScalarFunctionOperatorTestBase
> AvroTypesITCase {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33780) Support to store default catalog in CatalogStore

2023-12-11 Thread Yubin Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yubin Li updated FLINK-33780:
-
Attachment: image-2023-12-12-11-09-53-075.png

> Support to store default catalog in CatalogStore
> 
>
> Key: FLINK-33780
> URL: https://issues.apache.org/jira/browse/FLINK-33780
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.19.0
>Reporter: Yubin Li
>Priority: Major
> Attachments: image-2023-12-11-13-47-29-623.png, 
> image-2023-12-11-14-14-10-002.png, image-2023-12-12-11-09-53-075.png
>
>
> Flink initially creates a default catalog which is included in the 
> `Map catalogs`, but is not stored in the CatalogStore.
> After conducting thorough investigation, I've determined that the necessary 
> modification can be made within the `CatalogManager`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-11 Thread via GitHub


1996fanrui commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1423346604


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -222,6 +222,18 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .withDescription(
 "Processing rate increase threshold for detecting 
ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% 
of the desired capacity increase with scaling, the action is marked 
ineffective.");
 
+public static final ConfigOption GC_PRESSURE_THRESHOLD =
+autoScalerConfig("memory.gc-pressure.threshold")
+.doubleType()
+.defaultValue(0.3)
+.withDescription("Max allowed GC pressure during scaling 
operations");
+
+public static final ConfigOption HEAP_USAGE_THRESHOLD =
+autoScalerConfig("memory.heap-usage.threshold")
+.doubleType()
+.defaultValue(0.9)

Review Comment:
   > Also keep in mind that this is the average heap usage. With 90% average 
usage you are extremely likely to be close to out of heap in most cases.
   
   Thanks @gyfora for the clarification!
   
   I guess it's not average heap usage, and I wanna check with you first. In 
the `ScalingExecutor#isJobUnderMemoryPressure` method, we check whether 
`evaluatedMetrics.get(ScalingMetric.HEAP_USAGE).getAverage()` > 
`conf.get(AutoScalerOptions.HEAP_USAGE_THRESHOLD)`. Intuitively `getAverage` 
looks like the average, but its calculation is divided into two steps:
   - Step1: `ScalingMetrics#computeGlobalMetrics` collect the `HEAP_USAGE` for 
each time, it's `heapUsed.getMax() / heapMax.getMax()`. 
   - IIUC, the `heapUsed` is `AggregatedMetric`, when one job has 1000 
taskmanagers, if the heapUsed for 999 tms is very low, and only one tm is high, 
we think `heapUsed` is high as this time.
   - Step2: `ScalingMetricEvaluator#evaluateGlobalMetrics` compute the 
`HEAP_USAGE` based on `metricHistory`.
   - The `metricHistory` is composed of TMs with the highest heapUsage at a 
large number of time points.
   
   
   Strictly speaking, both of 2 steps have some problems:
   - Step1: Java GC is executed lazily, not immediately.
- When TM heapUsage is high, it may be that the GC has not been 
triggered, which does not mean that the memory pressure is high.
- Especially if the heapUsage is high for only one TM or a small number 
of TMs.
   - Step2: Since the data in the first step is unreliable, the average value 
in the second step is unreliable.
   
   > GC metrics will only be available in Flink 1.19.
   
   I'm not sure can we sum all GC times as the total gc times? Before 1.19, it 
has detailed GC times for each GC.
   
   > This is a very good point and happens often. I think we could definitely 
build this logic on top of the newly introduced metrics + scaling history as a 
follow up. It would probably be a very good addition. (but definitely out of 
scope for this PR)
   
   Sounds make sense, as I understand: it's better to revert this scaling if 
job is unhealthy after scale down. The memory pressure is one type of 
unhealthy. Checkpoint fails or CPU pressure may be unhealthy as well.
   
   Would you mind if I create one JIRA and pick it up? Thanks~
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail

2023-12-11 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795571#comment-17795571
 ] 

xiaogang zhou commented on FLINK-33728:
---

Hi [~mapohl] , thanks for the comment above. sorry for my poor writing english 
:P, but I think your re-clarification  is exactly what I am proposing. I'd like 
to introduce a lazy re-initialization of watch mechanism which will tolerate a 
disconnection of the watch until a new POD is requested.

And I think your concern is how we detect a TM loss without a active watcher.  
I have test my change in a real K8S environment. With a disconnected watcher, I 
killed a TM pod. after no more than 50s, the task restarted with a exception
{code:java}
// code placeholder
 java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 
flink-6168d34cf9d3a5d31ad8bb02bce6a370-taskmanager-1-8 timed out.
at 
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1306)
at 
org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitC {code}
moreover, I think YARN also do not have a watcher mechanism, so FLINK scheduled 
in yarn also relays on a heartbeat timeout mechanism? 

 

And an active rewatching strategy can really cause great pressure on API 
server, especially in the early versions without the resource version zero set 
in the watch-list request.

> do not rewatch when KubernetesResourceManagerDriver watch fail
> --
>
> Key: FLINK-33728
> URL: https://issues.apache.org/jira/browse/FLINK-33728
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: xiaogang zhou
>Priority: Major
>  Labels: pull-request-available
>
> I met massive production problem when kubernetes ETCD slow responding happen. 
> After Kube recoverd after 1 hour, Thousands of Flink jobs using 
> kubernetesResourceManagerDriver rewatched when recieving 
> ResourceVersionTooOld,  which caused great pressure on API Server and made 
> API server failed again... 
>  
> I am not sure is it necessary to
> getResourceEventHandler().onError(throwable)
> in  PodCallbackHandlerImpl# handleError method?
>  
> We can just neglect the disconnection of watching process. and try to rewatch 
> once new requestResource called. And we can leverage on the akka heartbeat 
> timeout to discover the TM failure, just like YARN mode do.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix][test] Migrate JsonRowDeserializationSchemaTest/JsonRowSerializationSchemaTest to Junit5 and Assertj [flink]

2023-12-11 Thread via GitHub


fsk119 merged PR #23882:
URL: https://github.com/apache/flink/pull/23882


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-33672) Use MapState.entries() instead of keys() and get() in over window

2023-12-11 Thread Shengkai Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengkai Fang closed FLINK-33672.
-
Fix Version/s: 1.19.0
   Resolution: Fixed

> Use MapState.entries() instead of keys() and get() in over window
> -
>
> Key: FLINK-33672
> URL: https://issues.apache.org/jira/browse/FLINK-33672
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> In code logic related with over windows, such as 
> org.apache.flink.table.runtime.operators.over.ProcTimeRangeBoundedPrecedingFunction
> {code:java}
> private transient MapState> inputState;
> public void onTimer(
> long timestamp,
> KeyedProcessFunction.OnTimerContext ctx,
> Collector out)
> throws Exception {
> //...
> Iterator iter = inputState.keys().iterator();
> //...
> while (iter.hasNext()) {
> Long elementKey = iter.next();
> if (elementKey < limit) {
> // element key outside of window. Retract values
> List elementsRemove = inputState.get(elementKey);
> // ...
> }
> }
> //...
> } {code}
> As we can see, there is a combination of key iteration and get the value for 
> iterated key from inputState. However for RocksDB, the key iteration calls 
> entry iteration, which means actually we could replace it by entry iteration 
> without introducing any extra overhead. And as a result, we could save a 
> function call of get() by using getValue() of iterated entry at very low cost.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33672) Use MapState.entries() instead of keys() and get() in over window

2023-12-11 Thread Shengkai Fang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795569#comment-17795569
 ] 

Shengkai Fang commented on FLINK-33672:
---

Merged into master: 080119cca53d9890257982b6a74a7d6f913253c2

> Use MapState.entries() instead of keys() and get() in over window
> -
>
> Key: FLINK-33672
> URL: https://issues.apache.org/jira/browse/FLINK-33672
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
>
> In code logic related with over windows, such as 
> org.apache.flink.table.runtime.operators.over.ProcTimeRangeBoundedPrecedingFunction
> {code:java}
> private transient MapState> inputState;
> public void onTimer(
> long timestamp,
> KeyedProcessFunction.OnTimerContext ctx,
> Collector out)
> throws Exception {
> //...
> Iterator iter = inputState.keys().iterator();
> //...
> while (iter.hasNext()) {
> Long elementKey = iter.next();
> if (elementKey < limit) {
> // element key outside of window. Retract values
> List elementsRemove = inputState.get(elementKey);
> // ...
> }
> }
> //...
> } {code}
> As we can see, there is a combination of key iteration and get the value for 
> iterated key from inputState. However for RocksDB, the key iteration calls 
> entry iteration, which means actually we could replace it by entry iteration 
> without introducing any extra overhead. And as a result, we could save a 
> function call of get() by using getValue() of iterated entry at very low cost.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33672] Use MapState.entries() instead of keys() and get() in over window [flink]

2023-12-11 Thread via GitHub


fsk119 merged PR #23855:
URL: https://github.com/apache/flink/pull/23855


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33612][table-planner] Hybrid shuffle mode avoids unnecessary blocking edges in the plan [flink]

2023-12-11 Thread via GitHub


TanYuxin-tyx commented on PR #23771:
URL: https://github.com/apache/flink/pull/23771#issuecomment-1851182058

   @lsyldliu Thanks for helping review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33612) The table plan of hybrid shuffle may introduce additional blocking edges occasionally

2023-12-11 Thread dalongliu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795564#comment-17795564
 ] 

dalongliu commented on FLINK-33612:
---

merged via master branch: 9cac80be18c6aff0cebdfe706327c1693822e884

> The table plan of hybrid shuffle may introduce additional blocking edges 
> occasionally
> -
>
> Key: FLINK-33612
> URL: https://issues.apache.org/jira/browse/FLINK-33612
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
>
> To enhance the performance of hybrid shuffle, it is imperative to address the 
> inconsistency between hybrid shuffle mode and blocking shuffle mode in 
> certain query plans of TPC-DS (such as q88.sql, q14a.sql, q14b.sql, etc). 
> In hybrid shuffle mode, these plans introduce additional blocking shuffle 
> edges and result in increased shuffle times, potentially impacting overall 
> efficiency. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33612][table-planner] Hybrid shuffle mode avoids unnecessary blocking edges in the plan [flink]

2023-12-11 Thread via GitHub


lsyldliu merged PR #23771:
URL: https://github.com/apache/flink/pull/23771


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bump org.apache.commons:commons-compress from 1.22 to 1.24.0 [flink-connector-pulsar]

2023-12-11 Thread via GitHub


tisonkun merged PR #65:
URL: https://github.com/apache/flink-connector-pulsar/pull/65


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][build] Bump version to 4.2-SNAPSHOT [flink-connector-pulsar]

2023-12-11 Thread via GitHub


tisonkun merged PR #67:
URL: https://github.com/apache/flink-connector-pulsar/pull/67


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][build] Bump version to 4.2-SNAPSHOT [flink-connector-pulsar]

2023-12-11 Thread via GitHub


boring-cyborg[bot] commented on PR #67:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/67#issuecomment-1851152475

   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [build] Bump version to 4.2-SNAPSHOT [flink-connector-pulsar]

2023-12-11 Thread via GitHub


tisonkun commented on code in PR #67:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/67#discussion_r1423296836


##
flink-connector-pulsar-e2e-tests/pom.xml:
##
@@ -23,7 +23,7 @@ under the License.

org.apache.flink
flink-connector-pulsar-parent
-   4.1-SNAPSHOT
+   4.2-SNAPSHOT

Review Comment:
   @snuyanzin This may need a mailing list discussion. I personally support to 
use `4.2.0-SNAPSHOT` but the whole Flink repos use `X.Y-SNAPSHOT` for a long 
time and I don't know if other scripts depend on this format.
   
   cc @zentol 
   
   For this patch, I tend to keep the current flavor.



##
docs/data/pulsar.yml:
##
@@ -16,7 +16,7 @@
 # limitations under the License.
 

 
-version: 4.1.0
+version: 4.2-SNAPSHOT

Review Comment:
   ```suggestion
   version: 4.1.10
   ```



##
docs/data/pulsar.yml:
##
@@ -16,7 +16,7 @@
 # limitations under the License.
 

 
-version: 4.1.0
+version: 4.2-SNAPSHOT

Review Comment:
   ```suggestion
   version: 4.1.0
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][build] Release branch should use SNAPSHOT version [flink-connector-pulsar]

2023-12-11 Thread via GitHub


tisonkun commented on code in PR #66:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/66#discussion_r1423296026


##
docs/data/pulsar.yml:
##
@@ -16,7 +16,7 @@
 # limitations under the License.
 

 
-version: 4.1.0
+version: 4.1-SNAPSHOT

Review Comment:
   ```suggestion
   version: 4.1.0
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [build][hotfix] Release branch should use SNAPSHOT version [flink-connector-pulsar]

2023-12-11 Thread via GitHub


tisonkun commented on code in PR #66:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/66#discussion_r1423293161


##
docs/data/pulsar.yml:
##
@@ -16,7 +16,7 @@
 # limitations under the License.
 

 
-version: 4.1.0
+version: 4.1-SNAPSHOT

Review Comment:
   Ditto 
https://github.com/apache/flink-connector-pulsar/pull/67#discussion_r1422300397
   
   Please keep it as 4.1.0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][docs] config key for parquet int64 option [flink]

2023-12-11 Thread via GitHub


flinkbot commented on PR #23909:
URL: https://github.com/apache/flink/pull/23909#issuecomment-1851138706

   
   ## CI report:
   
   * b72dd38e4c835667830025114edd647c40224429 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [hotfix][docs] config key for parquet int64 option [flink]

2023-12-11 Thread via GitHub


tweise opened a new pull request, #23909:
URL: https://github.com/apache/flink/pull/23909

   Corrects error from https://github.com/apache/flink/pull/23900


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-33361) Add Java 17 compatibility to Flink Kafka connector

2023-12-11 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin closed FLINK-33361.
---

> Add Java 17 compatibility to Flink Kafka connector
> --
>
> Key: FLINK-33361
> URL: https://issues.apache.org/jira/browse/FLINK-33361
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.0.1, kafka-3.1.0
>Reporter: Martijn Visser
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
> Fix For: kafka-3.1.0
>
>
> When currently trying to {{mvn clean install -Dflink.version=1.18.0 
> -Dscala-2.12 -Prun-end-to-end-tests 
> -DdistDir=/Users/mvisser/Developer/flink-1.18.0 
> -Dflink.convergence.phase=install 
> -Dlog4j.configurationFile=tools/ci/log4j.properties}} this fails with errors 
> like:
> {code:java}
> [INFO] 
> [INFO] Results:
> [INFO] 
> [ERROR] Errors: 
> [ERROR] FlinkKafkaConsumerBaseMigrationTest.testRestore
> [ERROR]   Run 1: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 2: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 3: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 4: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 5: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 6: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 7: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 8: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 9: Exception while creating StreamOperatorStateContext.
> [INFO] 
> [ERROR]   
> FlinkKafkaConsumerBaseTest.testExplicitStateSerializerCompatibility:721 » 
> Runtime
> [ERROR]   FlinkKafkaConsumerBaseTest.testScaleDown:742->testRescaling:817 » 
> Checkpoint C...
> [ERROR]   FlinkKafkaConsumerBaseTest.testScaleUp:737->testRescaling:817 » 
> Checkpoint Cou...
> [ERROR]   UpsertKafkaDynamicTableFactoryTest.testBufferedTableSink:243 » 
> UncheckedIO jav...
> {code}
> Example stacktrace:
> {code:java}
> Test 
> testBufferedTableSink(org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactoryTest)
>  failed with:
> java.io.UncheckedIOException: java.io.IOException: Serializing the source 
> elements failed: java.lang.reflect.InaccessibleObjectException: Unable to 
> make field private final java.lang.Object[] java.util.Arrays$ArrayList.a 
> accessible: module java.base does not "opens java.util" to unnamed module 
> @45b4c3a9
>   at 
> org.apache.flink.streaming.api.functions.source.FromElementsFunction.setOutputType(FromElementsFunction.java:162)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySetOutputType(StreamingFunctionUtils.java:84)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.setOutputType(StreamingFunctionUtils.java:60)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setOutputType(AbstractUdfStreamOperator.java:146)
>   at 
> org.apache.flink.streaming.api.operators.SimpleOperatorFactory.setOutputType(SimpleOperatorFactory.java:118)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:434)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:402)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraph.addLegacySource(StreamGraph.java:356)
>   at 
> org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateInternal(LegacySourceTransformationTranslator.java:66)
>   at 
> org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:53)
>   at 
> org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:40)
>   at 
> org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:860)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:590)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:881)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:839)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:590)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:328)
>   

[jira] [Commented] (FLINK-33361) Add Java 17 compatibility to Flink Kafka connector

2023-12-11 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795516#comment-17795516
 ] 

Sergey Nuyanzin commented on FLINK-33361:
-

Merged as 
[825052f55754e401176083c121ffaf38362b7a26|https://github.com/apache/flink-connector-kafka/commit/825052f55754e401176083c121ffaf38362b7a26]

> Add Java 17 compatibility to Flink Kafka connector
> --
>
> Key: FLINK-33361
> URL: https://issues.apache.org/jira/browse/FLINK-33361
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.0.1, kafka-3.1.0
>Reporter: Martijn Visser
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
> Fix For: kafka-3.1.0
>
>
> When currently trying to {{mvn clean install -Dflink.version=1.18.0 
> -Dscala-2.12 -Prun-end-to-end-tests 
> -DdistDir=/Users/mvisser/Developer/flink-1.18.0 
> -Dflink.convergence.phase=install 
> -Dlog4j.configurationFile=tools/ci/log4j.properties}} this fails with errors 
> like:
> {code:java}
> [INFO] 
> [INFO] Results:
> [INFO] 
> [ERROR] Errors: 
> [ERROR] FlinkKafkaConsumerBaseMigrationTest.testRestore
> [ERROR]   Run 1: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 2: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 3: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 4: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 5: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 6: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 7: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 8: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 9: Exception while creating StreamOperatorStateContext.
> [INFO] 
> [ERROR]   
> FlinkKafkaConsumerBaseTest.testExplicitStateSerializerCompatibility:721 » 
> Runtime
> [ERROR]   FlinkKafkaConsumerBaseTest.testScaleDown:742->testRescaling:817 » 
> Checkpoint C...
> [ERROR]   FlinkKafkaConsumerBaseTest.testScaleUp:737->testRescaling:817 » 
> Checkpoint Cou...
> [ERROR]   UpsertKafkaDynamicTableFactoryTest.testBufferedTableSink:243 » 
> UncheckedIO jav...
> {code}
> Example stacktrace:
> {code:java}
> Test 
> testBufferedTableSink(org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactoryTest)
>  failed with:
> java.io.UncheckedIOException: java.io.IOException: Serializing the source 
> elements failed: java.lang.reflect.InaccessibleObjectException: Unable to 
> make field private final java.lang.Object[] java.util.Arrays$ArrayList.a 
> accessible: module java.base does not "opens java.util" to unnamed module 
> @45b4c3a9
>   at 
> org.apache.flink.streaming.api.functions.source.FromElementsFunction.setOutputType(FromElementsFunction.java:162)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySetOutputType(StreamingFunctionUtils.java:84)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.setOutputType(StreamingFunctionUtils.java:60)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setOutputType(AbstractUdfStreamOperator.java:146)
>   at 
> org.apache.flink.streaming.api.operators.SimpleOperatorFactory.setOutputType(SimpleOperatorFactory.java:118)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:434)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:402)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraph.addLegacySource(StreamGraph.java:356)
>   at 
> org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateInternal(LegacySourceTransformationTranslator.java:66)
>   at 
> org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:53)
>   at 
> org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:40)
>   at 
> org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:860)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:590)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:881)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:839)
>   at 
> 

[jira] [Assigned] (FLINK-33361) Add Java 17 compatibility to Flink Kafka connector

2023-12-11 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin reassigned FLINK-33361:
---

Assignee: Sergey Nuyanzin

> Add Java 17 compatibility to Flink Kafka connector
> --
>
> Key: FLINK-33361
> URL: https://issues.apache.org/jira/browse/FLINK-33361
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.0.1, kafka-3.1.0
>Reporter: Martijn Visser
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> When currently trying to {{mvn clean install -Dflink.version=1.18.0 
> -Dscala-2.12 -Prun-end-to-end-tests 
> -DdistDir=/Users/mvisser/Developer/flink-1.18.0 
> -Dflink.convergence.phase=install 
> -Dlog4j.configurationFile=tools/ci/log4j.properties}} this fails with errors 
> like:
> {code:java}
> [INFO] 
> [INFO] Results:
> [INFO] 
> [ERROR] Errors: 
> [ERROR] FlinkKafkaConsumerBaseMigrationTest.testRestore
> [ERROR]   Run 1: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 2: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 3: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 4: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 5: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 6: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 7: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 8: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 9: Exception while creating StreamOperatorStateContext.
> [INFO] 
> [ERROR]   
> FlinkKafkaConsumerBaseTest.testExplicitStateSerializerCompatibility:721 » 
> Runtime
> [ERROR]   FlinkKafkaConsumerBaseTest.testScaleDown:742->testRescaling:817 » 
> Checkpoint C...
> [ERROR]   FlinkKafkaConsumerBaseTest.testScaleUp:737->testRescaling:817 » 
> Checkpoint Cou...
> [ERROR]   UpsertKafkaDynamicTableFactoryTest.testBufferedTableSink:243 » 
> UncheckedIO jav...
> {code}
> Example stacktrace:
> {code:java}
> Test 
> testBufferedTableSink(org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactoryTest)
>  failed with:
> java.io.UncheckedIOException: java.io.IOException: Serializing the source 
> elements failed: java.lang.reflect.InaccessibleObjectException: Unable to 
> make field private final java.lang.Object[] java.util.Arrays$ArrayList.a 
> accessible: module java.base does not "opens java.util" to unnamed module 
> @45b4c3a9
>   at 
> org.apache.flink.streaming.api.functions.source.FromElementsFunction.setOutputType(FromElementsFunction.java:162)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySetOutputType(StreamingFunctionUtils.java:84)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.setOutputType(StreamingFunctionUtils.java:60)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setOutputType(AbstractUdfStreamOperator.java:146)
>   at 
> org.apache.flink.streaming.api.operators.SimpleOperatorFactory.setOutputType(SimpleOperatorFactory.java:118)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:434)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:402)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraph.addLegacySource(StreamGraph.java:356)
>   at 
> org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateInternal(LegacySourceTransformationTranslator.java:66)
>   at 
> org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:53)
>   at 
> org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:40)
>   at 
> org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:860)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:590)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:881)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:839)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:590)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:328)
> 

[jira] [Resolved] (FLINK-33361) Add Java 17 compatibility to Flink Kafka connector

2023-12-11 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin resolved FLINK-33361.
-
Fix Version/s: kafka-3.1.0
   Resolution: Fixed

> Add Java 17 compatibility to Flink Kafka connector
> --
>
> Key: FLINK-33361
> URL: https://issues.apache.org/jira/browse/FLINK-33361
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.0.1, kafka-3.1.0
>Reporter: Martijn Visser
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
> Fix For: kafka-3.1.0
>
>
> When currently trying to {{mvn clean install -Dflink.version=1.18.0 
> -Dscala-2.12 -Prun-end-to-end-tests 
> -DdistDir=/Users/mvisser/Developer/flink-1.18.0 
> -Dflink.convergence.phase=install 
> -Dlog4j.configurationFile=tools/ci/log4j.properties}} this fails with errors 
> like:
> {code:java}
> [INFO] 
> [INFO] Results:
> [INFO] 
> [ERROR] Errors: 
> [ERROR] FlinkKafkaConsumerBaseMigrationTest.testRestore
> [ERROR]   Run 1: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 2: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 3: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 4: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 5: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 6: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 7: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 8: Exception while creating StreamOperatorStateContext.
> [ERROR]   Run 9: Exception while creating StreamOperatorStateContext.
> [INFO] 
> [ERROR]   
> FlinkKafkaConsumerBaseTest.testExplicitStateSerializerCompatibility:721 » 
> Runtime
> [ERROR]   FlinkKafkaConsumerBaseTest.testScaleDown:742->testRescaling:817 » 
> Checkpoint C...
> [ERROR]   FlinkKafkaConsumerBaseTest.testScaleUp:737->testRescaling:817 » 
> Checkpoint Cou...
> [ERROR]   UpsertKafkaDynamicTableFactoryTest.testBufferedTableSink:243 » 
> UncheckedIO jav...
> {code}
> Example stacktrace:
> {code:java}
> Test 
> testBufferedTableSink(org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactoryTest)
>  failed with:
> java.io.UncheckedIOException: java.io.IOException: Serializing the source 
> elements failed: java.lang.reflect.InaccessibleObjectException: Unable to 
> make field private final java.lang.Object[] java.util.Arrays$ArrayList.a 
> accessible: module java.base does not "opens java.util" to unnamed module 
> @45b4c3a9
>   at 
> org.apache.flink.streaming.api.functions.source.FromElementsFunction.setOutputType(FromElementsFunction.java:162)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySetOutputType(StreamingFunctionUtils.java:84)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.setOutputType(StreamingFunctionUtils.java:60)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setOutputType(AbstractUdfStreamOperator.java:146)
>   at 
> org.apache.flink.streaming.api.operators.SimpleOperatorFactory.setOutputType(SimpleOperatorFactory.java:118)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:434)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:402)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraph.addLegacySource(StreamGraph.java:356)
>   at 
> org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateInternal(LegacySourceTransformationTranslator.java:66)
>   at 
> org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:53)
>   at 
> org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:40)
>   at 
> org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:860)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:590)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:881)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:839)
>   at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:590)
>   at 
> 

Re: [PR] [FLINK-33361][Connectors/Kafka] Add Java 17 compatibility to Flink Kafka connector [flink-connector-kafka]

2023-12-11 Thread via GitHub


boring-cyborg[bot] commented on PR #68:
URL: 
https://github.com/apache/flink-connector-kafka/pull/68#issuecomment-1850970248

   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33361][Connectors/Kafka] Add Java 17 compatibility to Flink Kafka connector [flink-connector-kafka]

2023-12-11 Thread via GitHub


snuyanzin merged PR #68:
URL: https://github.com/apache/flink-connector-kafka/pull/68


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33361][Connectors/Kafka] Add Java 17 compatibility to Flink Kafka connector [flink-connector-kafka]

2023-12-11 Thread via GitHub


snuyanzin commented on code in PR #68:
URL: 
https://github.com/apache/flink-connector-kafka/pull/68#discussion_r1423178960


##
pom.xml:
##
@@ -78,6 +78,13 @@ under the License.
 2.17.1
 
 
flink-connector-kafka-parent
+
+
+
+-XX:+UseG1GC -Xms256m 
-XX:+IgnoreUnrecognizedVMOptions 
${flink.connector.module.config}

Review Comment:
   IIRC flags are configured in `flink-conf.yaml` at `env.java.opts.all` and 
all flags used here are already present there.
   That means that currently there is nothing to do.
   In theory in future there might appear such necessity e.g. because of some 
specific dependencies however I think it should be handled only once it is 
required



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33800) Allow passing parameters to database via jdbc url

2023-12-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33800:
---
Labels: pull-request-available  (was: )

> Allow passing parameters to database via jdbc url
> -
>
> Key: FLINK-33800
> URL: https://issues.apache.org/jira/browse/FLINK-33800
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Affects Versions: jdbc-3.1.1
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> Currently it does not allow to pass extra properties e.g.
> an attempt to connect to 
> {{jdbc:postgresql://...?sslmode=require}}
> fails with 
> {noformat}
> Caused by: org.apache.flink.table.gateway.api.utils.SqlGatewayException: 
> Failed to fetchResults.
>   at 
> org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.fetchResults(SqlGatewayServiceImpl.java:229)
>   at 
> org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:83)
>   ... 48 more
> Caused by: 
> org.apache.flink.table.gateway.service.utils.SqlExecutionException: Failed to 
> execute the operation b70b5cf7-7068-4eb6-83a4-78aed36dbd35.
>   at 
> org.apache.flink.table.gateway.service.operation.OperationManager$Operation.processThrowable(OperationManager.java:414)
>   at 
> org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:267)
>   at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
>   at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
>   at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
>   at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
>   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
> Source)
>   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> Source)
> {noformat}
> because of of a logic at 
> {{org.apache.flink.connector.jdbc.catalog.JdbcCatalogUtils#validateJdbcUrl}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33800][JDBC/Connector] Allow passing parameters to database via jdbc url [flink-connector-jdbc]

2023-12-11 Thread via GitHub


snuyanzin opened a new pull request, #83:
URL: https://github.com/apache/flink-connector-jdbc/pull/83

   The idea is pretty straightforward: from one side there is a jdbc url which 
is normally is provided by most of the dbs providers to connect to. 
   From the other side for some,  probably historical reasons, there a separate 
property for default database which is used for default url.
   
   To not break former behaviour it is possible to specify either database in 
jdbcurl or database with extra parameters only if database is same as ifor a 
dedicated property for default database. 
   In fact I tend to think to mark it as a deprecated since all the info could 
be extracted from jdbc url


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33454] [Operator] Add IngressTlsSpec to support TLS within the managed Ingress, also add Label Passthrough [flink-kubernetes-operator]

2023-12-11 Thread via GitHub


ryanvanhuuksloot commented on code in PR #727:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/727#discussion_r1423102408


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java:
##
@@ -93,24 +96,45 @@ private static HasMetadata getIngress(
 if (ingressInNetworkingV1(client)) {
 return new IngressBuilder()
 .withNewMetadata()
+.withLabels(spec.getIngress().getLabels())
 .withAnnotations(spec.getIngress().getAnnotations())
 .withName(objectMeta.getName())
 .withNamespace(objectMeta.getNamespace())
 .endMetadata()
 .withNewSpec()
 .withIngressClassName(spec.getIngress().getClassName())
+.withTls(spec.getIngress().getTls())
 .withRules(getIngressRule(objectMeta, spec, 
effectiveConfig))
 .endSpec()
 .build();
 } else {
+List ingressTLS =

Review Comment:
   Pulled it outside of the return because otherwise it was even more 
impossible to read



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33454] [Operator] Add IngressTlsSpec to support TLS within the managed Ingress, also add Label Passthrough [flink-kubernetes-operator]

2023-12-11 Thread via GitHub


ryanvanhuuksloot commented on code in PR #727:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/727#discussion_r1423103105


##
flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/IngressTlsSpec.java:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.api.spec;
+
+import org.apache.flink.annotation.Experimental;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+/** Ingress spec. */
+@Experimental
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@Builder
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class IngressTlsSpec {

Review Comment:
   No particular reason - didn't think about it. Swapped to the kubernetes spec.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33454] [Operator] Add IngressTlsSpec to support TLS within the managed Ingress, also add Label Passthrough [flink-kubernetes-operator]

2023-12-11 Thread via GitHub


ryanvanhuuksloot commented on code in PR #727:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/727#discussion_r1423102408


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java:
##
@@ -93,24 +96,45 @@ private static HasMetadata getIngress(
 if (ingressInNetworkingV1(client)) {
 return new IngressBuilder()
 .withNewMetadata()
+.withLabels(spec.getIngress().getLabels())
 .withAnnotations(spec.getIngress().getAnnotations())
 .withName(objectMeta.getName())
 .withNamespace(objectMeta.getNamespace())
 .endMetadata()
 .withNewSpec()
 .withIngressClassName(spec.getIngress().getClassName())
+.withTls(spec.getIngress().getTls())
 .withRules(getIngressRule(objectMeta, spec, 
effectiveConfig))
 .endSpec()
 .build();
 } else {
+List ingressTLS =

Review Comment:
   Pulled it outside of the return because otherwise it is even more impossible 
to read.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33454] [Operator] Add IngressTlsSpec to support TLS within the managed Ingress, also add Label Passthrough [flink-kubernetes-operator]

2023-12-11 Thread via GitHub


ryanvanhuuksloot commented on code in PR #727:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/727#discussion_r1423102109


##
docs/content/docs/custom-resource/reference.md:
##
@@ -96,6 +96,8 @@ This page serves as a full reference for FlinkDeployment 
custom resource definit
 | template | java.lang.String | Ingress template for the JobManager service. |
 | className | java.lang.String | Ingress className for the Flink deployment. |
 | annotations | java.util.Map | Ingress 
annotations. |
+| labels | java.util.Map | Ingress labels. |
+| tls | 
java.util.List | 
Ingress tls. |

Review Comment:
   Like you said in your review - I think it is fine to only show v1 here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-33800) Allow passing parameters to database via jdbc url

2023-12-11 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-33800:
---

 Summary: Allow passing parameters to database via jdbc url
 Key: FLINK-33800
 URL: https://issues.apache.org/jira/browse/FLINK-33800
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / JDBC
Affects Versions: jdbc-3.1.1
Reporter: Sergey Nuyanzin
Assignee: Sergey Nuyanzin


Currently it does not allow to pass extra properties e.g.
an attempt to connect to 
{{jdbc:postgresql://...?sslmode=require}}
fails with 
{noformat}
Caused by: org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed 
to fetchResults.
at 
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.fetchResults(SqlGatewayServiceImpl.java:229)
at 
org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:83)
... 48 more
Caused by: org.apache.flink.table.gateway.service.utils.SqlExecutionException: 
Failed to execute the operation b70b5cf7-7068-4eb6-83a4-78aed36dbd35.
at 
org.apache.flink.table.gateway.service.operation.OperationManager$Operation.processThrowable(OperationManager.java:414)
at 
org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:267)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)


{noformat}
because of of a logic at 
{{org.apache.flink.connector.jdbc.catalog.JdbcCatalogUtils#validateJdbcUrl}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-11 Thread via GitHub


gyfora commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1423061343


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java:
##
@@ -297,6 +301,28 @@ private void computeTargetDataRate(
 }
 }
 
+@VisibleForTesting
+protected static Map 
evaluateGlobalMetrics(
+SortedMap metricHistory) {
+var latest = 
metricHistory.get(metricHistory.lastKey()).getGlobalMetrics();
+var out = new HashMap();
+
+var gcPressure = latest.getOrDefault(GC_PRESSURE, Double.NaN);
+var lastHeapUsage = latest.getOrDefault(HEAP_USAGE, Double.NaN);
+
+out.put(GC_PRESSURE, EvaluatedScalingMetric.of(gcPressure));
+out.put(
+HEAP_USAGE,
+new EvaluatedScalingMetric(
+lastHeapUsage, getAverageGlobalMetric(HEAP_USAGE, 
metricHistory)));
+return out;
+}
+
+private static double getAverageGlobalMetric(
+ScalingMetric metric, SortedMap 
metricsHistory) {
+return getAverage(metric, null, metricsHistory);
+}
+
 public static double getAverage(
 ScalingMetric metric,
 JobVertexID jobVertexId,

Review Comment:
   will do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-11 Thread via GitHub


gyfora commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1423061010


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -222,6 +222,18 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .withDescription(
 "Processing rate increase threshold for detecting 
ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% 
of the desired capacity increase with scaling, the action is marked 
ineffective.");
 
+public static final ConfigOption GC_PRESSURE_THRESHOLD =
+autoScalerConfig("memory.gc-pressure.threshold")
+.doubleType()
+.defaultValue(0.3)
+.withDescription("Max allowed GC pressure during scaling 
operations");
+
+public static final ConfigOption HEAP_USAGE_THRESHOLD =
+autoScalerConfig("memory.heap-usage.threshold")
+.doubleType()
+.defaultValue(0.9)

Review Comment:
   1. You are right but I still think the current logic is valuable because GC 
metrics will only be available in Flink 1.19. With the heap usage based logic 
we can also support older Flink versions. Also keep in mind that this is the 
average heap usage. With 90% average usage you are extremely likely to be close 
to out of heap in most cases.
   
   2. This is a very good point and happens often. I think we could definitely 
build this logic on top of the newly introduced metrics + scaling history as a 
follow up. It would probably be a very good addition. (but definitely out of 
scope for this PR)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-11 Thread via GitHub


gyfora commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1423057412


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java:
##
@@ -78,19 +91,87 @@ protected Map 
queryAggregatedVertexMetrics(
 EmptyRequestBody.getInstance())
 .get();
 
-return responseBody.getMetrics().stream()
-.collect(
-Collectors.toMap(
-m -> metrics.get(m.getId()),
-m -> m,
-(m1, m2) ->
-new AggregatedMetric(
-m1.getId() + " merged with 
" + m2.getId(),
-Math.min(m1.getMin(), 
m2.getMin()),
-Math.max(m1.getMax(), 
m2.getMax()),
-// Average can't be 
computed
-Double.NaN,
-m1.getSum() + 
m2.getSum(;
+return aggregateByFlinkMetric(metrics, responseBody);
 }
 }
+
+protected Map queryTmMetrics(Context ctx) 
throws Exception {
+try (var restClient = ctx.getRestClusterClient()) {
+var metricNames = getTmMetricNames(restClient, ctx);
+var metricNameMapping = new HashMap();
+
+REQUIRED_TM_METRICS.forEach(
+fm -> {
+var name =
+fm.findAny(metricNames)
+.orElseThrow(
+() ->
+new RuntimeException(
+"Could not 
find required TM metric "
++ 
fm.name()));
+metricNameMapping.put(name, fm);
+});
+
+TOTAL_GC_TIME_PER_SEC
+.findAny(metricNames)
+.ifPresent(
+m -> {
+LOG.debug("GC metrics found");
+metricNameMapping.put(m, 
TOTAL_GC_TIME_PER_SEC);
+});
+
+var queriedMetrics =
+new HashMap<>(queryAggregatedTmMetrics(restClient, 
metricNameMapping));
+availableTmMetricNames.put(ctx.getJobKey(), metricNames);

Review Comment:
   Actually I just realised that TM metric names are fixed, so we know them 
beforehand and no need to cache it, we can simply hardcode it. I will work on 
this tomorrow



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-11 Thread via GitHub


gyfora commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1423056535


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java:
##
@@ -65,7 +65,16 @@ public enum ScalingMetric {
 SCALE_DOWN_RATE_THRESHOLD(false),
 
 /** Expected true processing rate after scale up. */
-EXPECTED_PROCESSING_RATE(false);
+EXPECTED_PROCESSING_RATE(false),
+
+/**
+ * Maximum GC pressure across taskmanagers. Percentage of time spent 
garbage collecting between
+ * 0 (no time in GC) and 1 (100% time in GC).
+ */
+GC_PRESSURE(false),
+
+/** Percentage of max heap used (between 0 and 1). */
+HEAP_USAGE(true);

Review Comment:
   good point, I will improve that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33795] Add new config to forbid autoscaling in certain periods of a day [flink-kubernetes-operator]

2023-12-11 Thread via GitHub


mxm commented on code in PR #728:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/728#discussion_r1422946292


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -72,6 +72,15 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .withDescription(
 "Stabilization period in which no new scaling will 
be executed");
 
+public static final ConfigOption> FORBID_PERIOD =
+autoScalerConfig("forbid.periods")
+.stringType()
+.asList()
+.defaultValues()
+
.withDeprecatedKeys(deprecatedOperatorConfigKey("forbid.periods"))
+.withDescription(
+"A (semicolon-separated) list of certain times of 
the day during which autoscaling is forbidden, 
10:00:00-11:00:00;21:30:00-22:30:00 for example");

Review Comment:
   This was also my first thought seeing this. Ideally, we would use some 
available standard for time spans. I think this also has to include the time 
zone.
   
   What about different times at different days of the week / month?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33373) Capture build scans on ge.apache.org to benefit from deep build insights

2023-12-11 Thread Clay Johnson (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795475#comment-17795475
 ] 

Clay Johnson commented on FLINK-33373:
--

Hi [~mapohl], it was nice to connect with you on the ASF Infrastructure 
roundtable the other day. I see that we had some conversation on the ticket 
here, but let me know if there are any other questions you have about this!

> Capture build scans on ge.apache.org to benefit from deep build insights
> 
>
> Key: FLINK-33373
> URL: https://issues.apache.org/jira/browse/FLINK-33373
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System / CI
>Reporter: Clay Johnson
>Assignee: Clay Johnson
>Priority: Minor
>  Labels: pull-request-available
>
> This improvement will enhance the functionality of the Flink build by 
> publishing build scans to [ge.apache.org|https://ge.apache.org/], hosted by 
> the Apache Software Foundation and run in partnership between the ASF and 
> Gradle. This Develocity instance has all features and extensions enabled and 
> is freely available for use by the Apache Flink project and all other Apache 
> projects.
> On this Develocity instance, Apache Flink will have access not only to all of 
> the published build scans but other aggregate data features such as:
>  * Dashboards to view all historical build scans, along with performance 
> trends over time
>  * Build failure analytics for enhanced investigation and diagnosis of build 
> failures
>  * Test failure analytics to better understand trends and causes around slow, 
> failing, and flaky tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]

2023-12-11 Thread via GitHub


mxm opened a new pull request, #729:
URL: https://github.com/apache/flink-kubernetes-operator/pull/729

   The previous PR in #725 made the legacy autoscaler config keys "fallback" 
keys to prevent logging a deprecation WARN message on every reconciliation loop 
in the operator. Turns out, fallback keys also log a warning.
   
   This change moves to migrating all config keys in the legacy 
"kubernetes.operator." before any autoscaler ConfigOptions are used. This 
ensures no warnings will be logged but the older keys can still be used.  The 
new keys always have precedence over the old keys.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33793) java.lang.NoSuchMethodError when checkpointing in Google Cloud Storage

2023-12-11 Thread Chris Nauroth (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795464#comment-17795464
 ] 

Chris Nauroth commented on FLINK-33793:
---

I expect this will be fixed by FLINK-33603.

CC: [~jjayadeep]

> java.lang.NoSuchMethodError when checkpointing in Google Cloud Storage
> --
>
> Key: FLINK-33793
> URL: https://issues.apache.org/jira/browse/FLINK-33793
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.18.0
> Environment: Flink 1.18
>Reporter: ChangZhuo Chen (陳昌倬)
>Priority: Major
>
> We have the following exception when checkpointing in Flink 1.18 + Google 
> Cloud Storage. The same code works well in Flink 1.17:
>  
> {{2023-12-11 07:45:28,861 ERROR 
> org.apache.flink.util.FatalExitExceptionHandler              [] - FATAL: 
> Thread 'jobmanager-io-thread-5' produced an uncaught exception. Stopping the 
> process...}}
> {{java.lang.NoSuchMethodError: 'com.google.common.collect.ImmutableMap 
> com.google.common.collect.ImmutableMap$Builder.buildOrThrow()'}}
> {{        at 
> com.google.cloud.storage.UnifiedOpts$Opts.getRpcOptions(UnifiedOpts.java:2096)
>  ~[?:?]}}
> {{        at 
> com.google.cloud.storage.StorageImpl.writer(StorageImpl.java:624) ~[?:?]}}
> {{        at com.google.cloud.storage.StorageImpl.writer(StorageImpl.java:90) 
> ~[?:?]}}
> {{        at 
> org.apache.flink.fs.gs.storage.GSBlobStorageImpl.writeBlob(GSBlobStorageImpl.java:64)
>  ~[?:?]}}
> {{        at 
> org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.createWriteChannel(GSRecoverableFsDataOutputStream.java:229)
>  ~[?:?]}}
> {{        at 
> org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.write(GSRecoverableFsDataOutputStream.java:152)
>  ~[?:?]}}
> {{        at 
> org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.write(GSRecoverableFsDataOutputStream.java:135)
>  ~[?:?]}}
> {{        at 
> org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.write(GSRecoverableFsDataOutputStream.java:128)
>  ~[?:?]}}
> {{        at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.write(FsCheckpointMetadataOutputStream.java:73)
>  ~[flink-dist-1.18.0.jar:1.18.0]}}
> {{        at java.io.DataOutputStream.writeInt(Unknown Source) ~[?:?]}}
> {{        at 
> org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:98)
>  ~[flink-dist-1.18.0.jar:1.18.0]}}
> {{        at 
> org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:88)
>  ~[flink-dist-1.18.0.jar:1.18.0]}}
> {{        at 
> org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:83)
>  ~[flink-dist-1.18.0.jar:1.18.0]}}
> {{        at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:335)
>  ~[flink-dist-1.18.0.jar:1.18.0]}}
> {{        at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1404)
>  ~[flink-dist-1.18.0.jar:1.18.0]}}
> {{        at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1303)
>  ~[flink-dist-1.18.0.jar:1.18.0]}}
> {{        at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1195)
>  ~[flink-dist-1.18.0.jar:1.18.0]}}
> {{        at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
>  ~[flink-dist-1.18.0.jar:1.18.0]}}
> {{        at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>  ~[flink-dist-1.18.0.jar:1.18.0]}}
> {{        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
> Source) ~[?:?]}}
> {{        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> Source) ~[?:?]}}
> {{        at java.lang.Thread.run(Unknown Source) [?:?]}}
>  
> The issue has been reported in GitHub 
> [https://github.com/apache/flink/pull/22281#issuecomment-1728553794.] 
> However, it is still not fixed yet in 1.18.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-31631][FileSystems] Upgrade GCS connector to 2.2.11. [flink]

2023-12-11 Thread via GitHub


cnauroth commented on PR #22281:
URL: https://github.com/apache/flink/pull/22281#issuecomment-1850568860

   > > There is now a patch available here at #23469.
   > 
   > will it fix this issue?
   
   @yigress and @czchen , my apologies, I think I referenced the wrong pull 
request in my last comment. I think #23489 is the one that's relevant. That one 
is still open.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-33799) Add e2e's for tls enabled operator

2023-12-11 Thread Tony Garrard (Jira)
Tony Garrard created FLINK-33799:


 Summary: Add e2e's for tls enabled operator
 Key: FLINK-33799
 URL: https://issues.apache.org/jira/browse/FLINK-33799
 Project: Flink
  Issue Type: Technical Debt
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.7.0
Reporter: Tony Garrard
 Fix For: kubernetes-operator-1.8.0


It would be good to create some E2E tests to ensure a tls enabled flink 
operator works, so that we don't break anything in the future



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33633) Automatic creation of RBAC for instances of Flink Deployments

2023-12-11 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795454#comment-17795454
 ] 

Gyula Fora commented on FLINK-33633:


In my personal opinion, creating the role without the binding doesn't really 
simplify anything it may just complicate the process because you need to know 
what role the operator is going to create exactly. 

> Automatic creation of RBAC for instances of Flink Deployments
> -
>
> Key: FLINK-33633
> URL: https://issues.apache.org/jira/browse/FLINK-33633
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Tony Garrard
>Priority: Not a Priority
>
> Currently users have to manually create RBAC e.g. the flink service account. 
> When operator is watching all namespaces; creation of a FlinkDeployment in a 
> specific namespace may fail if the kube admin has failed to create the 
> required RBAC. To improve usability the operator could be coded to 
> automatically create these rbac resources in the instance namespace if not 
> present



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33633) Automatic creation of RBAC for instances of Flink Deployments

2023-12-11 Thread Ryan van Huuksloot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795452#comment-17795452
 ] 

Ryan van Huuksloot commented on FLINK-33633:


Would there be any appetite to have the operator create just a ClusterRole for 
Flink?

We can then leave the RoleBinding to the specific deployment to not scope 
creep. It at least removes one extra resource per deployment. I think this 
would be safe.

> Automatic creation of RBAC for instances of Flink Deployments
> -
>
> Key: FLINK-33633
> URL: https://issues.apache.org/jira/browse/FLINK-33633
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Tony Garrard
>Priority: Not a Priority
>
> Currently users have to manually create RBAC e.g. the flink service account. 
> When operator is watching all namespaces; creation of a FlinkDeployment in a 
> specific namespace may fail if the kube admin has failed to create the 
> required RBAC. To improve usability the operator could be coded to 
> automatically create these rbac resources in the instance namespace if not 
> present



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail

2023-12-11 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795451#comment-17795451
 ] 

Matthias Pohl commented on FLINK-33728:
---

Thanks for creating this Jira issue, [~zhoujira86]. AFAIU, you're proposing the 
lazy initialization of the watcher after an connection error occurred that left 
the resourceVersion in an out-dated state (i.e. the resourceVersion which is 
used by the k8s client doesn't match any pod in the k8s cluster). 
Re-initialization of the watcher wouldn't happen when the error is detected but 
when Flink realizes that the TM is gone and initiates a new TM pod.

Correct me if I'm wrong here but isn't the watcher watching multiple pods (all 
TM pods belonging to the Flink cluster) and the 
{{KubernetesTooOldResourceVersionException}} can be triggered by an error 
coming from a single pod? If that's the case, not re-initializing the watcher 
right away would leave us hanging for other pods' lifecycle events wouldn't it? 
We would lose the ability to detect the deletion of other pods. But I guess 
that's what you mean in your comment above with "delete pod can allow us detect 
pod failure more quickly, but we can also discover it by detecting the lost of 
akka heartbeat timeout."?!

> do not rewatch when KubernetesResourceManagerDriver watch fail
> --
>
> Key: FLINK-33728
> URL: https://issues.apache.org/jira/browse/FLINK-33728
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: xiaogang zhou
>Priority: Major
>  Labels: pull-request-available
>
> I met massive production problem when kubernetes ETCD slow responding happen. 
> After Kube recoverd after 1 hour, Thousands of Flink jobs using 
> kubernetesResourceManagerDriver rewatched when recieving 
> ResourceVersionTooOld,  which caused great pressure on API Server and made 
> API server failed again... 
>  
> I am not sure is it necessary to
> getResourceEventHandler().onError(throwable)
> in  PodCallbackHandlerImpl# handleError method?
>  
> We can just neglect the disconnection of watching process. and try to rewatch 
> once new requestResource called. And we can leverage on the akka heartbeat 
> timeout to discover the TM failure, just like YARN mode do.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33796] Add ability to customize java version for python ci in connectors [flink-connector-shared-utils]

2023-12-11 Thread via GitHub


snuyanzin commented on PR #30:
URL: 
https://github.com/apache/flink-connector-shared-utils/pull/30#issuecomment-1850502662

   @pvary yep, sure
   here it is a link 
https://github.com/snuyanzin/flink-connector-kafka/actions/runs/7170817788
   
   sorry I didn't put earlier


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33697) FLIP-386: Support adding custom metrics in Recovery Spans

2023-12-11 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-33697:
---
Description: 
h1. Motivation

[FLIP-386|https://cwiki.apache.org/confluence/x/VAuZE] is building on top of 
[FLIP-384|https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces].
 The intention here is to add a capability for state backends to attach custom 
attributes during recovery to recovery spans. For example 
RocksDBIncrementalRestoreOperation could report both remote download time and 
time to actually clip/ingest the RocksDB instances after rescaling.

  was:
h1. Motivation

FLIP-386 is building on top of 
[FLIP-384|https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces].
 The intention here is to add a capability for state backends to attach custom 
attributes during recovery to recovery spans. For example 
RocksDBIncrementalRestoreOperation could report both remote download time and 
time to actually clip/ingest the RocksDB instances after rescaling.


> FLIP-386: Support adding custom metrics in Recovery Spans
> -
>
> Key: FLINK-33697
> URL: https://issues.apache.org/jira/browse/FLINK-33697
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Metrics, Runtime / State Backends
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
> Fix For: 1.19.0
>
>
> h1. Motivation
> [FLIP-386|https://cwiki.apache.org/confluence/x/VAuZE] is building on top of 
> [FLIP-384|https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces].
>  The intention here is to add a capability for state backends to attach 
> custom attributes during recovery to recovery spans. For example 
> RocksDBIncrementalRestoreOperation could report both remote download time and 
> time to actually clip/ingest the RocksDB instances after rescaling.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33796] Add ability to customize java version for python ci in connectors [flink-connector-shared-utils]

2023-12-11 Thread via GitHub


pvary commented on PR #30:
URL: 
https://github.com/apache/flink-connector-shared-utils/pull/30#issuecomment-1850450046

   @snuyanzin: Do we have a PR where this job has been tried out, and working? 
For testing my changes I started using the changes on my own repo to verify 
that everything is fine. https://github.com/pvary/flink-connector-kafka/actions


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33454] [Operator] Add IngressTlsSpec to support TLS within the managed Ingress, also add Label Passthrough [flink-kubernetes-operator]

2023-12-11 Thread via GitHub


gyfora commented on code in PR #727:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/727#discussion_r1422763184


##
flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/IngressTlsSpec.java:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.api.spec;
+
+import org.apache.flink.annotation.Experimental;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+/** Ingress spec. */
+@Experimental
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@Builder
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class IngressTlsSpec {

Review Comment:
   Why don't we use ` io.fabric8.kubernetes.api.model.networking.v1.IngressTLS` 
here directly? We can still convert back to the beta 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33795] Add new config to forbid autoscaling in certain periods of a day [flink-kubernetes-operator]

2023-12-11 Thread via GitHub


gyfora commented on code in PR #728:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/728#discussion_r1422702163


##
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/AutoScalerUtilsTest.java:
##
@@ -56,4 +60,30 @@ public void testVertexExclusion() {
 Set.of(v1.toString(), v2.toString(), v3.toString()),
 new HashSet<>(conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS)));
 }
+
+@Test
+public void testForbidPeriods() {

Review Comment:
   Should be `Forbidden`



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -96,6 +97,13 @@ public void scale(Context ctx) throws Exception {
 return;
 }
 
+if (!AutoScalerUtils.verifyForbidPeriods(ctx.getConfiguration())) {

Review Comment:
   Config validation should be in the validator module, not here.



##
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java:
##
@@ -599,6 +602,73 @@ public void testMetricCollectionDuringStabilization() 
throws Exception {
 assertEquals(2, stateStore.getCollectedMetrics(context).size());
 }
 
+@Test
+public void testMetricCollectionDuringForbidden() throws Exception {

Review Comment:
   I think this test should be part of the `JobAutoscalerImplTest` and then it 
can be simplified a lot.



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -72,6 +72,15 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .withDescription(
 "Stabilization period in which no new scaling will 
be executed");
 
+public static final ConfigOption> FORBID_PERIOD =
+autoScalerConfig("forbid.periods")
+.stringType()
+.asList()
+.defaultValues()
+
.withDeprecatedKeys(deprecatedOperatorConfigKey("forbid.periods"))

Review Comment:
   deprecated key should not be added



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -72,6 +72,15 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .withDescription(
 "Stabilization period in which no new scaling will 
be executed");
 
+public static final ConfigOption> FORBID_PERIOD =
+autoScalerConfig("forbid.periods")
+.stringType()
+.asList()
+.defaultValues()
+
.withDeprecatedKeys(deprecatedOperatorConfigKey("forbid.periods"))
+.withDescription(
+"A (semicolon-separated) list of certain times of 
the day during which autoscaling is forbidden, 
10:00:00-11:00:00;21:30:00-22:30:00 for example");

Review Comment:
   Where did you get this syntax from? What are the potential alternatives?



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java:
##
@@ -114,6 +115,7 @@ public CollectedMetricHistory updateMetrics(
 var topology = getJobTopology(ctx, stateStore, jobDetailsInfo);
 var stableTime = 
jobUpdateTs.plus(conf.get(AutoScalerOptions.STABILIZATION_INTERVAL));
 final boolean isStabilizing = now.isBefore(stableTime);
+final boolean isForbidding = AutoScalerUtils.inForbidPeriod(conf, now);

Review Comment:
   Everywhere the word `Forbid` should be replaced with `Forbidden` or 
`Blocked` to be correct. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31631][FileSystems] Upgrade GCS connector to 2.2.11. [flink]

2023-12-11 Thread via GitHub


czchen commented on PR #22281:
URL: https://github.com/apache/flink/pull/22281#issuecomment-1850343759

   @cnauroth 
   
   The `java.lang.NoSuchMethodError` issue [remains in 
1.18.0](https://issues.apache.org/jira/browse/FLINK-33793). Any change it can 
be fixed in 1.18 series?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-26586) FileSystem uses unbuffered read I/O

2023-12-11 Thread Matthias Schwalbe (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795416#comment-17795416
 ] 

Matthias Schwalbe commented on FLINK-26586:
---

Hi [~masteryhx] I've reached agreement from my employer to contribute to this 
ticket, could you please assign it to me?

I'll take a while due to my other work load. 

I'd also appreciate some support again especially with features not yet 
implemented:
 * general input I/O buffering for filesystems, configuration thereof
 * authoring tests
 * benchmarking 

Many thanks

 

Thias

> FileSystem uses unbuffered read I/O
> ---
>
> Key: FLINK-26586
> URL: https://issues.apache.org/jira/browse/FLINK-26586
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor, Connectors / FileSystem, Runtime 
> / Checkpointing
>Affects Versions: 1.13.0, 1.14.0
>Reporter: Matthias Schwalbe
>Priority: Major
> Attachments: BufferedFSDataInputStreamWrapper.java, 
> BufferedLocalFileSystem.java
>
>
> - I found out that, at least when using LocalFileSystem on a windows system, 
> read I/O to load a savepoint is unbuffered,
>  - See example stack [1]
>  - i.e. in order to load only a long in a serializer, it needs to go into 
> kernel mode 8 times and load the 8 bytes one by one
>  - I coded a BufferedFSDataInputStreamWrapper that allows to opt-in buffered 
> reads on any FileSystem implementation
>  - In our setting savepoint load is now 30 times faster
>  - I’ve once seen a Jira ticket as to improve savepoint load time in general 
> (lost the link unfortunately), maybe this approach can help with it
>  - not sure if HDFS has got the same problem
>  - I can contribute my implementation of a BufferedFSDataInputStreamWrapper 
> which can be integrated in any 
> [1] unbuffered reads stack:
> read:207, FileInputStream (java.io)
> read:68, LocalDataInputStream (org.apache.flink.core.fs.local)
> read:50, FSDataInputStreamWrapper (org.apache.flink.core.fs)
> read:42, ForwardingInputStream (org.apache.flink.runtime.util)
> readInt:390, DataInputStream (java.io)
> deserialize:80, BytePrimitiveArraySerializer 
> (org.apache.flink.api.common.typeutils.base.array)
> next:298, FullSnapshotRestoreOperation$KeyGroupEntriesIterator 
> (org.apache.flink.runtime.state.restore)
> next:273, FullSnapshotRestoreOperation$KeyGroupEntriesIterator 
> (org.apache.flink.runtime.state.restore)
> restoreKVStateData:147, RocksDBFullRestoreOperation 
> (org.apache.flink.contrib.streaming.state.restore)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33793) java.lang.NoSuchMethodError when checkpointing in Google Cloud Storage

2023-12-11 Thread 陳昌倬


 [ 
https://issues.apache.org/jira/browse/FLINK-33793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangZhuo Chen (陳昌倬) updated FLINK-33793:
-
Description: 
We have the following exception when checkpointing in Flink 1.18 + Google Cloud 
Storage. The same code works well in Flink 1.17:

 

{{2023-12-11 07:45:28,861 ERROR org.apache.flink.util.FatalExitExceptionHandler 
             [] - FATAL: Thread 'jobmanager-io-thread-5' produced an uncaught 
exception. Stopping the process...}}
{{java.lang.NoSuchMethodError: 'com.google.common.collect.ImmutableMap 
com.google.common.collect.ImmutableMap$Builder.buildOrThrow()'}}
{{        at 
com.google.cloud.storage.UnifiedOpts$Opts.getRpcOptions(UnifiedOpts.java:2096) 
~[?:?]}}
{{        at com.google.cloud.storage.StorageImpl.writer(StorageImpl.java:624) 
~[?:?]}}
{{        at com.google.cloud.storage.StorageImpl.writer(StorageImpl.java:90) 
~[?:?]}}
{{        at 
org.apache.flink.fs.gs.storage.GSBlobStorageImpl.writeBlob(GSBlobStorageImpl.java:64)
 ~[?:?]}}
{{        at 
org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.createWriteChannel(GSRecoverableFsDataOutputStream.java:229)
 ~[?:?]}}
{{        at 
org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.write(GSRecoverableFsDataOutputStream.java:152)
 ~[?:?]}}
{{        at 
org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.write(GSRecoverableFsDataOutputStream.java:135)
 ~[?:?]}}
{{        at 
org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.write(GSRecoverableFsDataOutputStream.java:128)
 ~[?:?]}}
{{        at 
org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.write(FsCheckpointMetadataOutputStream.java:73)
 ~[flink-dist-1.18.0.jar:1.18.0]}}
{{        at java.io.DataOutputStream.writeInt(Unknown Source) ~[?:?]}}
{{        at 
org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:98)
 ~[flink-dist-1.18.0.jar:1.18.0]}}
{{        at 
org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:88)
 ~[flink-dist-1.18.0.jar:1.18.0]}}
{{        at 
org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:83)
 ~[flink-dist-1.18.0.jar:1.18.0]}}
{{        at 
org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:335)
 ~[flink-dist-1.18.0.jar:1.18.0]}}
{{        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1404)
 ~[flink-dist-1.18.0.jar:1.18.0]}}
{{        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1303)
 ~[flink-dist-1.18.0.jar:1.18.0]}}
{{        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1195)
 ~[flink-dist-1.18.0.jar:1.18.0]}}
{{        at 
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
 ~[flink-dist-1.18.0.jar:1.18.0]}}
{{        at 
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
 ~[flink-dist-1.18.0.jar:1.18.0]}}
{{        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
~[?:?]}}
{{        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
~[?:?]}}
{{        at java.lang.Thread.run(Unknown Source) [?:?]}}

 

The issue has been reported in GitHub 
[https://github.com/apache/flink/pull/22281#issuecomment-1728553794.] However, 
it is still not fixed yet in 1.18.0.

  was:
We have the following exception when checkpointing in Flink 1.18 + Google Cloud 
Storage. The same code works well in Flink 1.17:

 

{{2023-12-11 07:45:28,861 ERROR org.apache.flink.util.FatalExitExceptionHandler 
             [] - FATAL: Thread 'jobmanager-io-thread-5' produced an uncaught 
exception. Stopping the process...}}
{{java.lang.NoSuchMethodError: 'com.google.common.collect.ImmutableMap 
com.google.common.collect.ImmutableMap$Builder.buildOrThrow()'}}
{{        at 
com.google.cloud.storage.UnifiedOpts$Opts.getRpcOptions(UnifiedOpts.java:2096) 
~[?:?]}}
{{        at com.google.cloud.storage.StorageImpl.writer(StorageImpl.java:624) 
~[?:?]}}
{{        at com.google.cloud.storage.StorageImpl.writer(StorageImpl.java:90) 
~[?:?]}}
{{        at 
org.apache.flink.fs.gs.storage.GSBlobStorageImpl.writeBlob(GSBlobStorageImpl.java:64)
 ~[?:?]}}
{{        at 
org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.createWriteChannel(GSRecoverableFsDataOutputStream.java:229)
 ~[?:?]}}
{{        at 
org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.write(GSRecoverableFsDataOutputStream.java:152)
 ~[?:?]}}
{{        at 
org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.write(GSRecoverableFsDataOutputStream.java:135)
 ~[?:?]}}
{{        at 

Re: [PR] [FLINK-33500][Runtime] Run storing the JobGraph an asynchronous operation [flink]

2023-12-11 Thread via GitHub


XComp commented on code in PR #23880:
URL: https://github.com/apache/flink/pull/23880#discussion_r1422663705


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobGraphWriter.java:
##
@@ -37,6 +38,18 @@ public interface JobGraphWriter extends 
LocallyCleanableResource, GloballyCleana
  */
 void putJobGraph(JobGraph jobGraph) throws Exception;
 
+/**
+ * Adds the {@link JobGraph} instance and have write operations performed 
asynchronously in
+ * ioExecutor of Dispatcher
+ *
+ * @param jobGraph
+ * @param ioExecutor
+ * @return
+ * @throws Exception
+ */
+CompletableFuture putJobGraphAsync(JobGraph jobGraph, 
Optional ioExecutor)

Review Comment:
   A few things on the interface change:
   
   1. `Optional` is not the usual way we implement async and sync 
behavior with a single method. You can rely on the `DirectExecutor` to achieve 
the same. There is no need to deal with `Optional`.
   2. For cases where you want to have the async and the sync version of a 
method being available, the code is usually easier to read if you put the 
business logic in the sync method and implement the async method in the 
following way:
   ```java
   public void runRandomMethod(Object obj) {
 // do something
   }
   
   public CompletableFuture runRandomMethodAsync(Object obj, Executor 
executor) {
   return FutureUtils.runAsync(() -> runRandomMethod(obj), executor);
   }
   ```
   3. I'm wondering whether we should make all `put*` methods in the 
`JobGraphWriter` interface asynchronous rather than maintaining a synchonous 
`putJobGraph` method along the `putJobGraphAsync` method which is then only 
called by `putJobResourceRequirements`. `putJobResourceRequirements` could 
block the `Dispatcher` for the very same reason why `putJobGraph` would block. 
WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-28215) Bump Maven Surefire plugin to 3.2.2

2023-12-11 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser closed FLINK-28215.
--
Fix Version/s: 1.19.0
   Resolution: Fixed

Fixed in apache/flink:master ea4cdc28651ad91defd4fc7b371a1f520ea7a262

> Bump Maven Surefire plugin to 3.2.2
> ---
>
> Key: FLINK-28215
> URL: https://issues.apache.org/jira/browse/FLINK-28215
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Build System
>Reporter: Martijn Visser
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-28215][Buildsystem] Update Maven Surefire plugin to 3.2.2 [flink]

2023-12-11 Thread via GitHub


MartijnVisser merged PR #22502:
URL: https://github.com/apache/flink/pull/22502


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28215][Buildsystem] Update Maven Surefire plugin to 3.2.2 [flink]

2023-12-11 Thread via GitHub


MartijnVisser commented on PR #22502:
URL: https://github.com/apache/flink/pull/22502#issuecomment-1850303930

   > Was there a specific reason that made us wanting to upgrade the surefire 
plugin?
   
   Nothing more then I always prefer to run on stable version over milestone 
releases :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32895][Scheduler] Introduce the max attempts for Exponential Delay Restart Strategy [flink]

2023-12-11 Thread via GitHub


mxm commented on code in PR #23247:
URL: https://github.com/apache/flink/pull/23247#discussion_r1422626734


##
flink-core/src/main/java/org/apache/flink/configuration/RestartStrategyOptions.java:
##
@@ -184,7 +184,7 @@ public class RestartStrategyOptions {
 public static final ConfigOption 
RESTART_STRATEGY_EXPONENTIAL_DELAY_BACKOFF_MULTIPLIER =
 
ConfigOptions.key("restart-strategy.exponential-delay.backoff-multiplier")
 .doubleType()
-.defaultValue(2.0)
+.defaultValue(1.2)

Review Comment:
   Could we set this to `1.5` to make it a bit less aggressive?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   >