[GitHub] [flink] hanyuzheng7 commented on pull request #22842: [FLINK-32261][table] Add built-in MAP_UNION function.

2023-07-19 Thread via GitHub


hanyuzheng7 commented on PR #22842:
URL: https://github.com/apache/flink/pull/22842#issuecomment-1643174801

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30559) May get wrong result for `if` expression if it's string data type

2023-07-19 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-30559:

Fix Version/s: 1.18.0
   1.16.3
   1.17.2

> May get wrong result for `if` expression if it's string data type
> -
>
> Key: FLINK-30559
> URL: https://issues.apache.org/jira/browse/FLINK-30559
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: luoyuxia
>Assignee: luoyuxia
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.16.3, 1.17.2
>
>
> Can be reproduced by the folowing code in 
> `org.apache.flink.table.planner.runtime.batch.sql.CalcITCase`
>  
> {code:java}
> checkResult("SELECT if(b > 10, 'ua', c) from Table3", data3) {code}
> The actual result is [co, He, He, ...].
> Seems it will only get the first two characters.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-30559) May get wrong result for `if` expression if it's string data type

2023-07-19 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu resolved FLINK-30559.
-
Resolution: Duplicate

> May get wrong result for `if` expression if it's string data type
> -
>
> Key: FLINK-30559
> URL: https://issues.apache.org/jira/browse/FLINK-30559
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: luoyuxia
>Assignee: luoyuxia
>Priority: Major
>  Labels: pull-request-available
>
> Can be reproduced by the folowing code in 
> `org.apache.flink.table.planner.runtime.batch.sql.CalcITCase`
>  
> {code:java}
> checkResult("SELECT if(b > 10, 'ua', c) from Table3", data3) {code}
> The actual result is [co, He, He, ...].
> Seems it will only get the first two characters.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (FLINK-30559) May get wrong result for `if` expression if it's string data type

2023-07-19 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reopened FLINK-30559:
-

> May get wrong result for `if` expression if it's string data type
> -
>
> Key: FLINK-30559
> URL: https://issues.apache.org/jira/browse/FLINK-30559
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: luoyuxia
>Assignee: luoyuxia
>Priority: Major
>  Labels: pull-request-available
>
> Can be reproduced by the folowing code in 
> `org.apache.flink.table.planner.runtime.batch.sql.CalcITCase`
>  
> {code:java}
> checkResult("SELECT if(b > 10, 'ua', c) from Table3", data3) {code}
> The actual result is [co, He, He, ...].
> Seems it will only get the first two characters.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32559) Deprecate Queryable State

2023-07-19 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song reassigned FLINK-32559:


Assignee: Xintong Song

> Deprecate Queryable State
> -
>
> Key: FLINK-32559
> URL: https://issues.apache.org/jira/browse/FLINK-32559
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Queryable State
>Reporter: Xintong Song
>Assignee: Xintong Song
>Priority: Blocker
> Fix For: 1.18.0
>
>
> Queryable State is described as approaching end-of-life in the roadmap [1], 
> but is neither deprecated in codes nor in user documentation [2]. There're 
> also more negative opinions than positive ones in the discussion about 
> rescuing it [3].
> [1] https://flink.apache.org/roadmap/
> [2] 
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/queryable_state/
> [3] https://lists.apache.org/thread/9hmwcjb3q5c24pk3qshjvybfqk62v17m



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-5336) Make Path immutable

2023-07-19 Thread Wencong Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-5336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744452#comment-17744452
 ] 

Wencong Liu edited comment on FLINK-5336 at 7/20/23 4:13 AM:
-

Hi all, I have checked all the classes that utilize the *Path* class. I found 
that there're still some classes are de/serializing the *Path* through 
*IOReadableWritable* interface.
 # {*}FileSourceSplitSerializer{*}: It de/serializes the *Path* during the 
process of de/serializing FileSourceSplit.
 # {*}TestManagedSinkCommittableSerializer{*}: It de/serializes the Path during 
the process of de/serializing TestManagedCommittable.
 # {*}TestManagedFileSourceSplitSerializer{*}: It de/serializes the Path during 
the process of de/serializing TestManagedIterableSourceSplit.

For 1, the Path needs to be serialized to save checkpoint data in source.

For 2/3, the IT case in flink-table-common depends on the Path 
de/serialization. [~qingyue] 


was (Author: JIRAUSER281639):
Hi all, I have checked all the classes that utilize the *Path* class. I found 
that there're still some classes are de/serializing the *Path* through 
*IOReadableWritable* interface.
 # {*}FileSourceSplitSerializer{*}: It de/serializes the *Path* during the 
process of de/serializing FileSourceSplit.
 # {*}TestManagedSinkCommittableSerializer{*}: It de/serializes the Path during 
the process of de/serializing TestManagedCommittable.
 # {*}TestManagedFileSourceSplitSerializer{*}: It de/serializes the Path during 
the process of de/serializing TestManagedIterableSourceSplit.

For 1, the Path needs to be serialized to save checkpoint data in source.

For 2/3, the IT case in flink-table-common depends on the Path 
de/serialization. [~qingyue] 

In summary, I think the Path class should still need to implement the 
*IOReadableWritable* interface to support de/serialization. WDYT? 

> Make Path immutable
> ---
>
> Key: FLINK-5336
> URL: https://issues.apache.org/jira/browse/FLINK-5336
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataSet
>Reporter: Stephan Ewen
>Priority: Major
> Fix For: 2.0.0
>
>
> The {{Path}} class is currently mutable to support the {{IOReadableWritable}} 
> serialization. Since that serialization is not used any more, I suggest to 
> drop that interface from Path and make the Path's URI final.
> Being immutable, we can store configures paths properly without the chance of 
> them being mutated as side effects.
> Many parts of the code make the assumption that the Path is immutable, being 
> susceptible to subtle errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] masteryhx commented on a diff in pull request #22744: [FLINK-29802][state] Changelog supports native savepoint

2023-07-19 Thread via GitHub


masteryhx commented on code in PR #22744:
URL: https://github.com/apache/flink/pull/22744#discussion_r1268880413


##
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##
@@ -375,6 +378,51 @@ public RunnableFuture> 
snapshot(
 @Nonnull CheckpointStreamFactory streamFactory,
 @Nonnull CheckpointOptions checkpointOptions)
 throws Exception {
+
+if (checkpointOptions.getCheckpointType().isSavepoint()) {
+SnapshotType.SharingFilesStrategy sharingFilesStrategy =
+
checkpointOptions.getCheckpointType().getSharingFilesStrategy();
+if (sharingFilesStrategy == 
SnapshotType.SharingFilesStrategy.NO_SHARING) {
+long materializationID = materializedId++;
+// For NO_SHARING native savepoint, trigger delegated one
+RunnableFuture> 
delegatedSnapshotResult =
+keyedStateBackend.snapshot(
+materializationID, timestamp, streamFactory, 
checkpointOptions);

Review Comment:
   I think notification about checkpoint complete could be done like normal 
checkpoint by add mapping in the materializationIdByCheckpointId.
   notification about checkpoint abortion has not done for normal checkpoint 
(FLINK-25850), so I think it could be considered together in the FLINK-25850.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] LadyForest commented on a diff in pull request #22593: [FLINK-32053][table-planner] Introduce StateMetadata to ExecNode to support configure operator-level state TTL via CompiledPlan

2023-07-19 Thread via GitHub


LadyForest commented on code in PR #22593:
URL: https://github.com/apache/flink/pull/22593#discussion_r1268866818


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java:
##
@@ -71,20 +75,36 @@
 producedTransformations = 
StreamExecChangelogNormalize.CHANGELOG_NORMALIZE_TRANSFORMATION,
 minPlanVersion = FlinkVersion.v1_15,
 minStateVersion = FlinkVersion.v1_15)
+@ExecNodeMetadata(
+name = "stream-exec-changelog-normalize",
+version = 2,

Review Comment:
   Hi @twalthr, during the implementation of FLIP-292, I did notice that the 
current testing for compiled plans is not comprehensive enough, such as 
FLINK-31884, FLINK-31917, and FLINK-32219. FLIP-292 added 
`ExecNodeVersionUpgradeSerdeTest` and `TransformationsTest#testUidFlink1_15` to 
test the deserialization of old plans using the new version, but it may not be 
sufficient. After reconsideration, I think we can keep the original exec node 
version until we have a more comprehensive testing framework. I will open a PR 
and hope you can help review the code. Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-5336) Make Path immutable

2023-07-19 Thread Wencong Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-5336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744452#comment-17744452
 ] 

Wencong Liu edited comment on FLINK-5336 at 7/20/23 2:32 AM:
-

Hi all, I have checked all the classes that utilize the *Path* class. I found 
that there're still some classes are de/serializing the *Path* through 
*IOReadableWritable* interface.
 # {*}FileSourceSplitSerializer{*}: It de/serializes the *Path* during the 
process of de/serializing FileSourceSplit.
 # {*}TestManagedSinkCommittableSerializer{*}: It de/serializes the Path during 
the process of de/serializing TestManagedCommittable.
 # {*}TestManagedFileSourceSplitSerializer{*}: It de/serializes the Path during 
the process of de/serializing TestManagedIterableSourceSplit.

For 1, the Path needs to be serialized to save checkpoint data in source.

For 2/3, the IT case in flink-table-common depends on the Path 
de/serialization. [~qingyue] 

In summary, I think the Path class should still need to implement the 
*IOReadableWritable* interface to support de/serialization. WDYT? 


was (Author: JIRAUSER281639):
Hi all, I have checked all the classes that utilize the *Path* class. I found 
that there're still some classes are de/serialize the *Path* through 
*IOReadableWritable* interface.
 # {*}FileSourceSplitSerializer{*}: It de/serializes the *Path* during the 
process of de/serializing FileSourceSplit.
 # {*}TestManagedSinkCommittableSerializer{*}: It de/serializes the Path during 
the process of de/serializing TestManagedCommittable.
 # {*}TestManagedFileSourceSplitSerializer{*}: It de/serializes the Path during 
the process of de/serializing TestManagedIterableSourceSplit.

For 1, the Path needs to be serialized to save checkpoint data in source. 
[~sewen] 

For 2/3, the IT case in flink-table-common depends on the Path 
de/serialization. [~qingyue] 

In summary, I think the Path class should still need to implement the 
*IOReadableWritable* interface to support de/serialization. WDYT? 

> Make Path immutable
> ---
>
> Key: FLINK-5336
> URL: https://issues.apache.org/jira/browse/FLINK-5336
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataSet
>Reporter: Stephan Ewen
>Priority: Major
> Fix For: 2.0.0
>
>
> The {{Path}} class is currently mutable to support the {{IOReadableWritable}} 
> serialization. Since that serialization is not used any more, I suggest to 
> drop that interface from Path and make the Path's URI final.
> Being immutable, we can store configures paths properly without the chance of 
> them being mutated as side effects.
> Many parts of the code make the assumption that the Path is immutable, being 
> susceptible to subtle errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32578) Cascaded group by window time columns on a proctime window aggregate may result hang for ever

2023-07-19 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-32578.
---

> Cascaded group by window time columns on a proctime window aggregate may 
> result hang for ever
> -
>
> Key: FLINK-32578
> URL: https://issues.apache.org/jira/browse/FLINK-32578
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.17.1
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.17.2
>
>
> Currently when group by window time columns on a proctime window aggregate 
> result will get a wrong plan which may result hang for ever in runtime.
> For such a query:
> {code}
> insert into s1
> SELECT
>   window_start,
>   window_end,
>   sum(cnt),
>   count(*)
> FROM (
>  SELECT
> a,
> b,
> window_start,
> window_end,
> count(*) as cnt,
> sum(d) as sum_d,
> max(d) as max_d
>  FROM TABLE(TUMBLE(TABLE src1, DESCRIPTOR(proctime), INTERVAL '5' MINUTE))
>  GROUP BY a, window_start, window_end, b
> )
> GROUP BY a, window_start, window_end
> {code}
> the inner proctime window works fine, but the outer one doesn't work due to a 
> wrong plan which will translate to a unexpected event mode window operator:
> {code}
> Sink(table=[default_catalog.default_database.s1], fields=[ws, we, b, c])
> +- Calc(select=[CAST(window_start AS TIMESTAMP(6)) AS ws, CAST(window_end AS 
> TIMESTAMP(6)) AS we, CAST(EXPR$2 AS BIGINT) AS b, CAST(EXPR$3 AS BIGINT) AS 
> c])
>+- WindowAggregate(groupBy=[a], window=[TUMBLE(win_start=[window_start], 
> win_end=[window_end], size=[5 min])], select=[a, SUM(cnt) AS EXPR$2, COUNT(*) 
> AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end])
>   +- Exchange(distribution=[hash[a]])
>  +- Calc(select=[a, window_start, window_end, cnt])
> +- WindowAggregate(groupBy=[a, b], 
> window=[TUMBLE(time_col=[proctime], size=[5 min])], select=[a, b, COUNT(*) AS 
> cnt, start('w$) AS window_start, end('w$) AS window_end])
>+- Exchange(distribution=[hash[a, b]])
>   +- Calc(select=[a, b, d, PROCTIME() AS proctime])
>  +- TableSourceScan(table=[[default_catalog, 
> default_database, src1, project=[a, b, d], metadata=[]]], fields=[a, b, d])
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32578) Cascaded group by window time columns on a proctime window aggregate may result hang for ever

2023-07-19 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744847#comment-17744847
 ] 

lincoln lee commented on FLINK-32578:
-

Fixed in 1.17: dc8b70c2fcbb429a27a9cc1e263d9a38c2d7da34

> Cascaded group by window time columns on a proctime window aggregate may 
> result hang for ever
> -
>
> Key: FLINK-32578
> URL: https://issues.apache.org/jira/browse/FLINK-32578
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.17.1
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.17.2
>
>
> Currently when group by window time columns on a proctime window aggregate 
> result will get a wrong plan which may result hang for ever in runtime.
> For such a query:
> {code}
> insert into s1
> SELECT
>   window_start,
>   window_end,
>   sum(cnt),
>   count(*)
> FROM (
>  SELECT
> a,
> b,
> window_start,
> window_end,
> count(*) as cnt,
> sum(d) as sum_d,
> max(d) as max_d
>  FROM TABLE(TUMBLE(TABLE src1, DESCRIPTOR(proctime), INTERVAL '5' MINUTE))
>  GROUP BY a, window_start, window_end, b
> )
> GROUP BY a, window_start, window_end
> {code}
> the inner proctime window works fine, but the outer one doesn't work due to a 
> wrong plan which will translate to a unexpected event mode window operator:
> {code}
> Sink(table=[default_catalog.default_database.s1], fields=[ws, we, b, c])
> +- Calc(select=[CAST(window_start AS TIMESTAMP(6)) AS ws, CAST(window_end AS 
> TIMESTAMP(6)) AS we, CAST(EXPR$2 AS BIGINT) AS b, CAST(EXPR$3 AS BIGINT) AS 
> c])
>+- WindowAggregate(groupBy=[a], window=[TUMBLE(win_start=[window_start], 
> win_end=[window_end], size=[5 min])], select=[a, SUM(cnt) AS EXPR$2, COUNT(*) 
> AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end])
>   +- Exchange(distribution=[hash[a]])
>  +- Calc(select=[a, window_start, window_end, cnt])
> +- WindowAggregate(groupBy=[a, b], 
> window=[TUMBLE(time_col=[proctime], size=[5 min])], select=[a, b, COUNT(*) AS 
> cnt, start('w$) AS window_start, end('w$) AS window_end])
>+- Exchange(distribution=[hash[a, b]])
>   +- Calc(select=[a, b, d, PROCTIME() AS proctime])
>  +- TableSourceScan(table=[[default_catalog, 
> default_database, src1, project=[a, b, d], metadata=[]]], fields=[a, b, d])
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #23030: fix(sec): upgrade com.google.guava:guava to 32.0.0-jre

2023-07-19 Thread via GitHub


flinkbot commented on PR #23030:
URL: https://github.com/apache/flink/pull/23030#issuecomment-1643008141

   
   ## CI report:
   
   * 4c9b2396007652e405eb3fcd537afac0fd0ab1e5 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] lincoln-lil merged pull request #23022: [FLINK-32578][table-planner] Fix wrong plan which group by window time columns on a proctime window operator may result hang for ever

2023-07-19 Thread via GitHub


lincoln-lil merged PR #23022:
URL: https://github.com/apache/flink/pull/23022


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-pulsar] syhily commented on pull request #55: [FLINK-24302] Extend offheap memory for JDK11 test coverage

2023-07-19 Thread via GitHub


syhily commented on PR #55:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/55#issuecomment-1642986077

   Cool, I'm so happy to see that we finally have the JDK 11 test support. The 
extensive off heap memory usage in Pulsar client could be the main cause of the 
OOM in JDK 11. Should we mention it in our document if the user want to use 
this connector in JDK 11 or above? @reswqa @tisonkun


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hackergin commented on pull request #22937: [FLINK-32428][table] Introduce base interfaces for CatalogStore

2023-07-19 Thread via GitHub


hackergin commented on PR #22937:
URL: https://github.com/apache/flink/pull/22937#issuecomment-1642970805

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-32633) Kubernetes e2e test is not stable

2023-07-19 Thread Fang Yong (Jira)
Fang Yong created FLINK-32633:
-

 Summary: Kubernetes e2e test is not stable
 Key: FLINK-32633
 URL: https://issues.apache.org/jira/browse/FLINK-32633
 Project: Flink
  Issue Type: Technical Debt
  Components: Deployment / Kubernetes, Kubernetes Operator
Affects Versions: 1.18.0
Reporter: Fang Yong


The output file is: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51444=logs=bea52777-eaf8-5663-8482-18fbc3630e81=43ba8ce7-ebbf-57cd-9163-444305d74117

Jul 19 17:06:02 Stopping minikube ...
Jul 19 17:06:02 * Stopping node "minikube"  ...
Jul 19 17:06:13 * 1 node stopped.
Jul 19 17:06:13 [FAIL] Test script contains errors.
Jul 19 17:06:13 Checking for errors...
Jul 19 17:06:13 No errors in log files.
Jul 19 17:06:13 Checking for exceptions...
Jul 19 17:06:13 No exceptions in log files.
Jul 19 17:06:13 Checking for non-empty .out files...
grep: /home/vsts/work/_temp/debug_files/flink-logs/*.out: No such file or 
directory
Jul 19 17:06:13 No non-empty .out files.
Jul 19 17:06:13 
Jul 19 17:06:13 [FAIL] 'Run Kubernetes test' failed after 4 minutes and 28 
seconds! Test exited with exit code 1
Jul 19 17:06:13 
17:06:13 ##[group]Environment Information
Jul 19 17:06:13 Jps




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32632) Run Kubernetes test is unstable on AZP

2023-07-19 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-32632:
---

 Summary: Run Kubernetes test is unstable on AZP
 Key: FLINK-32632
 URL: https://issues.apache.org/jira/browse/FLINK-32632
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.18.0
Reporter: Sergey Nuyanzin


This test 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51447=logs=bea52777-eaf8-5663-8482-18fbc3630e81=43ba8ce7-ebbf-57cd-9163-444305d74117=6213

fails with

{noformat}
2023-07-19T17:14:49.8144730Z Jul 19 17:14:49 deployment.apps/flink-task-manager 
created
2023-07-19T17:15:03.7983703Z Jul 19 17:15:03 job.batch/flink-job-cluster 
condition met
2023-07-19T17:15:04.0937620Z error: Internal error occurred: error executing 
command in container: http: invalid Host header
2023-07-19T17:15:04.0988752Z sort: cannot read: 
'/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-11919909188/out/kubernetes_wc_out*':
 No such file or directory
2023-07-19T17:15:04.1017388Z Jul 19 17:15:04 FAIL WordCount: Output hash 
mismatch.  Got d41d8cd98f00b204e9800998ecf8427e, expected 
e682ec6622b5e83f2eb614617d5ab2cf.
{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31958) Table to DataStream allow partial fields

2023-07-19 Thread padavan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744768#comment-17744768
 ] 

padavan commented on FLINK-31958:
-

Perhaps it should be moved to the bugs category? Or is it by design?

> Table to DataStream allow partial fields
> 
>
> Key: FLINK-31958
> URL: https://issues.apache.org/jira/browse/FLINK-31958
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream, Table SQL / API
>Reporter: padavan
>Priority: Major
>
> Hello i have a Model with many many fields, example:
> {code:java}
> public class UserModel { public int userId; public int count; public int zip; 
> public LocalDateTime dt; public LocalDateTime wStart; public LocalDateTime 
> wEnd; }{code}
> I work with Table API, select fields and convert Table to DataStream by 
> Model. But problem what *i should select all fields if I don't even need it* 
> or i will get exception
> {quote}Column types of query result and sink for do not match. Cause: 
> Different number of columns.
> {quote}
> And I just have to substitute fake data for the plugs...
>  
> I want simple use with only fields wich i have selected like:
> {code:java}
> .select($("userId"), $("count").sum().as("count"));
> DataStream dataStream = te.toDataStream(win, 
> UserModel.class);{code}
>  
> *Excepted:* 
> Remove rule valdiation "Different number of columns.". If a column is not 
> selected it is initialized by default(T) / Null



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-pulsar] reswqa commented on a diff in pull request #55: [FLINK-24302] Extend offheap memory for JDK11 test coverage

2023-07-19 Thread via GitHub


reswqa commented on code in PR #55:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/55#discussion_r1268492288


##
flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerUtils.java:
##
@@ -35,9 +35,11 @@ public class FlinkContainerUtils {
 public static Configuration flinkConfiguration() {
 Configuration configuration = new Configuration();
 
-// Increase the jvm metaspace memory to avoid 
java.lang.OutOfMemoryError: Metaspace
-configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(2048));
+configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(2560));
+configuration.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, 
MemorySize.ofMebiBytes(512));
 configuration.set(TaskManagerOptions.JVM_METASPACE, 
MemorySize.ofMebiBytes(512));
+
+// Increase the jvm metaspace memory to avoid 
java.lang.OutOfMemoryError: Metaspace
 configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(2560));
 configuration.set(JobManagerOptions.JVM_METASPACE, 
MemorySize.ofMebiBytes(1024));

Review Comment:
   Is this more appropriate(perhaps)? But it doesn't matter if we don't make 
any change.  



##
flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerUtils.java:
##
@@ -35,9 +35,11 @@ public class FlinkContainerUtils {
 public static Configuration flinkConfiguration() {
 Configuration configuration = new Configuration();
 
-// Increase the jvm metaspace memory to avoid 
java.lang.OutOfMemoryError: Metaspace
-configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(2048));
+configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(2560));
+configuration.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, 
MemorySize.ofMebiBytes(512));
 configuration.set(TaskManagerOptions.JVM_METASPACE, 
MemorySize.ofMebiBytes(512));
+
+// Increase the jvm metaspace memory to avoid 
java.lang.OutOfMemoryError: Metaspace
 configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(2560));
 configuration.set(JobManagerOptions.JVM_METASPACE, 
MemorySize.ofMebiBytes(1024));

Review Comment:
   Is this more appropriate(perhaps)? But it doesn't matter if we don't make 
any change.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-pulsar] reswqa commented on a diff in pull request #55: [FLINK-24302] Extend offheap memory for JDK11 test coverage

2023-07-19 Thread via GitHub


reswqa commented on code in PR #55:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/55#discussion_r1268484338


##
flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerUtils.java:
##
@@ -35,9 +35,11 @@ public class FlinkContainerUtils {
 public static Configuration flinkConfiguration() {
 Configuration configuration = new Configuration();
 
-// Increase the jvm metaspace memory to avoid 
java.lang.OutOfMemoryError: Metaspace
-configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(2048));
+configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(2560));
+configuration.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, 
MemorySize.ofMebiBytes(512));
 configuration.set(TaskManagerOptions.JVM_METASPACE, 
MemorySize.ofMebiBytes(512));
+
+// Increase the jvm metaspace memory to avoid 
java.lang.OutOfMemoryError: Metaspace
 configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(2560));
 configuration.set(JobManagerOptions.JVM_METASPACE, 
MemorySize.ofMebiBytes(1024));

Review Comment:
   ```suggestion
   configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(2560));
   configuration.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, 
MemorySize.ofMebiBytes(512));
   // Increase the jvm metaspace memory to avoid 
java.lang.OutOfMemoryError: Metaspace
   configuration.set(TaskManagerOptions.JVM_METASPACE, 
MemorySize.ofMebiBytes(512));
   
 
   configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(2560));
   // Increase the jvm metaspace memory to avoid 
java.lang.OutOfMemoryError: Metaspace
   configuration.set(JobManagerOptions.JVM_METASPACE, 
MemorySize.ofMebiBytes(1024));
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-web] lindong28 commented on pull request #664: Allow content to expire

2023-07-19 Thread via GitHub


lindong28 commented on PR #664:
URL: https://github.com/apache/flink-web/pull/664#issuecomment-1642427268

   @MartijnVisser Thanks for the comments.
   
   flink-ml docs is built by this script 
https://github.com/apache/infrastructure-bb2/blob/master/flink-ml.py. This 
script is executed every day by a build bot whose status can be found by 
searching "flink ml" at https://ci2.apache.org/#/builders.
   
   If you are also not sure where to find/update `.htaccess` for 
https://nightlies.apache.org/flink, do you know who might know the answer? If 
none of us know, maybe I should create a JIRA for the Apache infra team.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32631) FlinkSessionJob stuck in Created/Reconciling state because of No Job found error in JobManager

2023-07-19 Thread Bhupendra Yadav (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bhupendra Yadav updated FLINK-32631:

Description: 
{*}Background{*}: We are using FlinkSessionJob for submitting jobs to a session 
cluster and flink kubernetes operator 1.5.0.

{*}Bug{*}: We frequently encounter a problem where the job gets stuck in 
CREATED/RECONCILING state. On checking flink operator logs we see the error 
{_}Job could not be found{_}. Full trace [here|https://ideone.com/NuAyEK].
 # When a Flink session job is submitted, the Flink operator submits the job to 
the Flink Cluster.
 # If the Flink job manager (JM) restarts for some reason, the job may no 
longer exist in the JM.
 # Upon reconciliation, the Flink operator queries the JM's REST API for the 
job using its jobID, but it receives a 404 error, indicating that the job is 
not found.
 # The operator then encounters an error and logs it, leading to the job 
getting stuck in an indefinite state.
 # Attempting to restart or suspend the job using the operator's provided 
mechanisms also fails because the operator keeps calling the REST API and 
receiving the same 404 error.

{*}Expected Behavior{*}: Ideally, when the Flink operator reconciles a job and 
finds that it no longer exists in the Flink Cluster, it should handle the 
situation gracefully. Instead of getting stuck and logging errors indefinitely, 
the operator should mark the job as failed or deleted, or set an appropriate 
status for it.

  was:
{*}Background{*}: We are using FlinkSessionJob for submitting jobs to a session 
cluster.

{*}Bug{*}: We frequently encounter a problem where the job gets stuck in 
CREATED/RECONCILING state. On checking flink operator logs we see the error 
{_}Job could not be found{_}. Full trace [here|https://ideone.com/NuAyEK].
 # When a Flink session job is submitted, the Flink operator submits the job to 
the Flink Cluster.
 # If the Flink job manager (JM) restarts for some reason, the job may no 
longer exist in the JM.
 # Upon reconciliation, the Flink operator queries the JM's REST API for the 
job using its jobID, but it receives a 404 error, indicating that the job is 
not found.
 # The operator then encounters an error and logs it, leading to the job 
getting stuck in an indefinite state.
 # Attempting to restart or suspend the job using the operator's provided 
mechanisms also fails because the operator keeps calling the REST API and 
receiving the same 404 error.

{*}Expected Behavior{*}: Ideally, when the Flink operator reconciles a job and 
finds that it no longer exists in the Flink Cluster, it should handle the 
situation gracefully. Instead of getting stuck and logging errors indefinitely, 
the operator should mark the job as failed or deleted, or set an appropriate 
status for it.


> FlinkSessionJob stuck in Created/Reconciling state because of No Job found 
> error in JobManager
> --
>
> Key: FLINK-32631
> URL: https://issues.apache.org/jira/browse/FLINK-32631
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: 1.16.0
> Environment: Local
>Reporter: Bhupendra Yadav
>Priority: Major
>
> {*}Background{*}: We are using FlinkSessionJob for submitting jobs to a 
> session cluster and flink kubernetes operator 1.5.0.
> {*}Bug{*}: We frequently encounter a problem where the job gets stuck in 
> CREATED/RECONCILING state. On checking flink operator logs we see the error 
> {_}Job could not be found{_}. Full trace [here|https://ideone.com/NuAyEK].
>  # When a Flink session job is submitted, the Flink operator submits the job 
> to the Flink Cluster.
>  # If the Flink job manager (JM) restarts for some reason, the job may no 
> longer exist in the JM.
>  # Upon reconciliation, the Flink operator queries the JM's REST API for the 
> job using its jobID, but it receives a 404 error, indicating that the job is 
> not found.
>  # The operator then encounters an error and logs it, leading to the job 
> getting stuck in an indefinite state.
>  # Attempting to restart or suspend the job using the operator's provided 
> mechanisms also fails because the operator keeps calling the REST API and 
> receiving the same 404 error.
> {*}Expected Behavior{*}: Ideally, when the Flink operator reconciles a job 
> and finds that it no longer exists in the Flink Cluster, it should handle the 
> situation gracefully. Instead of getting stuck and logging errors 
> indefinitely, the operator should mark the job as failed or deleted, or set 
> an appropriate status for it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32631) FlinkSessionJob stuck in Created/Reconciling state because of No Job found error in JobManager

2023-07-19 Thread Bhupendra Yadav (Jira)
Bhupendra Yadav created FLINK-32631:
---

 Summary: FlinkSessionJob stuck in Created/Reconciling state 
because of No Job found error in JobManager
 Key: FLINK-32631
 URL: https://issues.apache.org/jira/browse/FLINK-32631
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: 1.16.0
 Environment: Local
Reporter: Bhupendra Yadav


{*}Background{*}: We are using FlinkSessionJob for submitting jobs to a session 
cluster.

{*}Bug{*}: We frequently encounter a problem where the job gets stuck in 
CREATED/RECONCILING state. On checking flink operator logs we see the error 
{_}Job could not be found{_}. Full trace [here|https://ideone.com/NuAyEK].
 # When a Flink session job is submitted, the Flink operator submits the job to 
the Flink Cluster.
 # If the Flink job manager (JM) restarts for some reason, the job may no 
longer exist in the JM.
 # Upon reconciliation, the Flink operator queries the JM's REST API for the 
job using its jobID, but it receives a 404 error, indicating that the job is 
not found.
 # The operator then encounters an error and logs it, leading to the job 
getting stuck in an indefinite state.
 # Attempting to restart or suspend the job using the operator's provided 
mechanisms also fails because the operator keeps calling the REST API and 
receiving the same 404 error.

{*}Expected Behavior{*}: Ideally, when the Flink operator reconciles a job and 
finds that it no longer exists in the Flink Cluster, it should handle the 
situation gracefully. Instead of getting stuck and logging errors indefinitely, 
the operator should mark the job as failed or deleted, or set an appropriate 
status for it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #23029: [FLINK-32132][table-planner] Cast function CODEGEN does not work as e…

2023-07-19 Thread via GitHub


flinkbot commented on PR #23029:
URL: https://github.com/apache/flink/pull/23029#issuecomment-1642417243

   
   ## CI report:
   
   * b1967c4704f16360d638e3778509a2b2840c1b2e UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #23028: [FLINK-32132][table-planner] Cast function CODEGEN does not work as e…

2023-07-19 Thread via GitHub


flinkbot commented on PR #23028:
URL: https://github.com/apache/flink/pull/23028#issuecomment-1642407541

   
   ## CI report:
   
   * 939c4fb6b8bb3f8e47d62fda3d8f01f8f69c65b6 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-pulsar] tisonkun commented on a diff in pull request #55: [FLINK-24302] Extend offheap memory for JDK11 test coverage

2023-07-19 Thread via GitHub


tisonkun commented on code in PR #55:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/55#discussion_r1268324359


##
flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerUtils.java:
##
@@ -35,9 +35,11 @@ public class FlinkContainerUtils {
 public static Configuration flinkConfiguration() {
 Configuration configuration = new Configuration();
 
-// Increase the jvm metaspace memory to avoid 
java.lang.OutOfMemoryError: Metaspace
-configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(2048));
+configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(2560));
+configuration.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, 
MemorySize.ofMebiBytes(512));
 configuration.set(TaskManagerOptions.JVM_METASPACE, 
MemorySize.ofMebiBytes(512));
+
+// Increase the jvm metaspace memory to avoid 
java.lang.OutOfMemoryError: Metaspace

Review Comment:
   Please directly comment a suggestion or patch. I'm not sure where to move in 
your mind,



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-pulsar] tisonkun commented on a diff in pull request #55: [FLINK-24302] Extend offheap memory for JDK11 test coverage

2023-07-19 Thread via GitHub


tisonkun commented on code in PR #55:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/55#discussion_r1268323229


##
flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/MiniClusterTestEnvironment.java:
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.testframe.environment.ClusterControllable;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import 
org.apache.flink.connector.testframe.environment.TestEnvironmentSettings;
+import org.apache.flink.core.execution.JobClient;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.configuration.MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL;
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.METRIC_FETCHER_UPDATE_INTERVAL_MS;
+import static 
org.apache.flink.runtime.jobgraph.SavepointConfigOptions.SAVEPOINT_PATH;
+
+/** Test environment for running jobs on Flink mini-cluster. */
+@Experimental
+public class MiniClusterTestEnvironment implements TestEnvironment, 
ClusterControllable {

Review Comment:
   Yes and when we drop the support at least test coverage for 1.17.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-32469) Improve checkpoint REST APIs for programmatic access

2023-07-19 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer resolved FLINK-32469.
---
Resolution: Done

> Improve checkpoint REST APIs for programmatic access
> 
>
> Key: FLINK-32469
> URL: https://issues.apache.org/jira/browse/FLINK-32469
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Affects Versions: 1.16.2, 1.17.1
>Reporter: Hong Liang Teoh
>Assignee: Hong Liang Teoh
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> *Why*
> We want to enable programmatic use of the checkpoints REST API, independent 
> of the Flink dashboard.
> Currently, REST APIs that retrieve information relating to a given Flink job 
> passes through the {{{}ExecutionGraphCache{}}}. This means that all these 
> APIs will retrieve stale data depending on the {{{}web.refresh-interval{}}}, 
> which defaults to 3s. For programmatic use of the REST API, we should be able 
> to retrieve the latest / cached version depending on the client (Flink 
> dashboard gets the cached version, other clients get the updated version).
> For example, a user might want to use the REST API to retrieve the latest 
> completed checkpoint for a given Flink job. This might be useful when trying 
> to use existing checkpoints as state store when migrating a Flink job from 
> one cluster to another. See Appendix for example.
> *What*
> This change is about separating out the cache used for the checkpoints REST 
> APIs to a separate cache. This way, a user can set the timeout for the 
> checkpoints cache to 0s (disable cache), without causing much effect on the 
> user experience on the Flink dashboard.
> In addition, the checkpoint handlers first retrieve the 
> {{{}ExecutionGraph{}}}, then retrieve the {{CheckpointStatsSnapshot}} from 
> the graph. This is not needed, since the checkpoint handlers only need the 
> {{CheckpointStatsSnapshot.}} This change will mean these handlers retrieve 
> the minimal required information ({{{}CheckpointStatsSnapshot){}}} to 
> construct a reply.
>  
> *Example use case*
> When performing security patching / maintenance of the infrastructure 
> supporting the Flink cluster, we might want to transfer a given Flink job to 
> another cluster, whilst maintaining state. We can do this via the below steps:
>  # Old cluster - Select completed checkpoint on existing Flink job
>  # Old cluster - Stop the existing Flink job
>  # New cluster - Start a new Flink job with selected checkpoint
> Step 1 requires us to query the checkpoints REST API for the latest completed 
> checkpoint. With the status quo, we need to wait 3s (or whatever the 
> ExecutionGraphCache expiry may be). This is undesirable because this means 
> the Flink job will have to reprocess data equivalent to 3s / whatever the 
> execution graph cache timeout is.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32469) Improve checkpoint REST APIs for programmatic access

2023-07-19 Thread Danny Cranmer (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744702#comment-17744702
 ] 

Danny Cranmer commented on FLINK-32469:
---

Merged commit 
[{{7b9b4e5}}|https://github.com/apache/flink/commit/7b9b4e53a59ab8f4f2a99a6e162a794d264f7daf]
 into master 

> Improve checkpoint REST APIs for programmatic access
> 
>
> Key: FLINK-32469
> URL: https://issues.apache.org/jira/browse/FLINK-32469
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Affects Versions: 1.16.2, 1.17.1
>Reporter: Hong Liang Teoh
>Assignee: Hong Liang Teoh
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> *Why*
> We want to enable programmatic use of the checkpoints REST API, independent 
> of the Flink dashboard.
> Currently, REST APIs that retrieve information relating to a given Flink job 
> passes through the {{{}ExecutionGraphCache{}}}. This means that all these 
> APIs will retrieve stale data depending on the {{{}web.refresh-interval{}}}, 
> which defaults to 3s. For programmatic use of the REST API, we should be able 
> to retrieve the latest / cached version depending on the client (Flink 
> dashboard gets the cached version, other clients get the updated version).
> For example, a user might want to use the REST API to retrieve the latest 
> completed checkpoint for a given Flink job. This might be useful when trying 
> to use existing checkpoints as state store when migrating a Flink job from 
> one cluster to another. See Appendix for example.
> *What*
> This change is about separating out the cache used for the checkpoints REST 
> APIs to a separate cache. This way, a user can set the timeout for the 
> checkpoints cache to 0s (disable cache), without causing much effect on the 
> user experience on the Flink dashboard.
> In addition, the checkpoint handlers first retrieve the 
> {{{}ExecutionGraph{}}}, then retrieve the {{CheckpointStatsSnapshot}} from 
> the graph. This is not needed, since the checkpoint handlers only need the 
> {{CheckpointStatsSnapshot.}} This change will mean these handlers retrieve 
> the minimal required information ({{{}CheckpointStatsSnapshot){}}} to 
> construct a reply.
>  
> *Example use case*
> When performing security patching / maintenance of the infrastructure 
> supporting the Flink cluster, we might want to transfer a given Flink job to 
> another cluster, whilst maintaining state. We can do this via the below steps:
>  # Old cluster - Select completed checkpoint on existing Flink job
>  # Old cluster - Stop the existing Flink job
>  # New cluster - Start a new Flink job with selected checkpoint
> Step 1 requires us to query the checkpoints REST API for the latest completed 
> checkpoint. With the status quo, we need to wait 3s (or whatever the 
> ExecutionGraphCache expiry may be). This is undesirable because this means 
> the Flink job will have to reprocess data equivalent to 3s / whatever the 
> execution graph cache timeout is.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #23027: [FLINK-32616][jdbc-driver] Close result for non-query in executeQuery

2023-07-19 Thread via GitHub


flinkbot commented on PR #23027:
URL: https://github.com/apache/flink/pull/23027#issuecomment-1642372445

   
   ## CI report:
   
   * 55e9c628f6d3e2de3c0a00342ab6f86c78eca10b UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] lsyldliu commented on pull request #23012: [FLINK-32610][json] JSON format supports nested type projection pushdown

2023-07-19 Thread via GitHub


lsyldliu commented on PR #23012:
URL: https://github.com/apache/flink/pull/23012#issuecomment-1642369509

   @wuchong Thanks for review, I've updated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32616) FlinkStatement#executeQuery resource leaks when the input sql is not query

2023-07-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32616:
---
Labels: pull-request-available  (was: )

> FlinkStatement#executeQuery resource leaks when the input sql is not query
> --
>
> Key: FLINK-32616
> URL: https://issues.apache.org/jira/browse/FLINK-32616
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / JDBC
>Affects Versions: 1.18.0
>Reporter: Shengkai Fang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> The current implementation just throw the exception if the input sql is not 
> query. No one is responsible to close the StatementResult.
>  
> BTW, the current implementation just submit the sql to the gateway, which 
> means the sql is executed. I just wonder do we need to expose this features 
> to the users?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] FangYongs opened a new pull request, #23027: [FLINK-32616][jdbc-driver] Close result for non-query in executeQuery

2023-07-19 Thread via GitHub


FangYongs opened a new pull request, #23027:
URL: https://github.com/apache/flink/pull/23027

   ## What is the purpose of the change
   
   This pr aims to close statement result in FlinkStatement.executeQuery when 
the statement is not a query sql.
   
   ## Brief change log
 - Close statement result when the sql is not query for 
FlinkStatement.executeQuery
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - Added FlinkStatementTest.testCloseNonQuery to validate the statement 
result is closed.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no) no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no) no
 - The serializers: (yes / no / don't know) no
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know) no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) 
no
 - The S3 file system connector: (yes / no / don't know) no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no) no
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] lsyldliu commented on a diff in pull request #23012: [FLINK-32610][json] JSON format supports nested type projection pushdown

2023-07-19 Thread via GitHub


lsyldliu commented on code in PR #23012:
URL: https://github.com/apache/flink/pull/23012#discussion_r1268292637


##
flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java:
##
@@ -724,7 +786,7 @@ private void testParseErrors(TestSpec spec) {
 TestSpec.json("{\"id\":\"abc\"}")
 .rowType(ROW(FIELD("id", INT(
 .expectErrorMessage("Failed to deserialize JSON 
'{\"id\":\"abc\"}'."),
-TestSpec.json("{\"id\":112.013}")
+TestSpec.json("{\"id\":112}")

Review Comment:
   Yes, the `JsonParser` can't parse double type to bigint.
   `JsonParser` logic:
   
![image](https://github.com/apache/flink/assets/17698589/112771b9-380d-46d6-bc77-3278db6526ed)
   
   `JsonNode` logic:
   
![image](https://github.com/apache/flink/assets/17698589/19b1a839-6863-440b-95de-74b2ccd3baf0)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #627: [FLINK-32551] Add option to take a savepoint on flinkdeployment/flinksessionjob deletion

2023-07-19 Thread via GitHub


gyfora commented on PR #627:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/627#issuecomment-1642349101

   I am closing this based on feedback from @JTaky (Oleksandr Nitavskyi)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] JTaky commented on pull request #634: [FLINK-32551] Add option to take a savepoint when deleting a flinkdeployment/flinksessionjob

2023-07-19 Thread via GitHub


JTaky commented on PR #634:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/634#issuecomment-1642264925

   > Are you working with @ashangit ? Just curious because he also has an open 
PR and the JIRA is on him
   
   yes, he is OOO currently, I am continuing his development. Would value 
feedback


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-web] MartijnVisser commented on pull request #664: Allow content to expire

2023-07-19 Thread via GitHub


MartijnVisser commented on PR #664:
URL: https://github.com/apache/flink-web/pull/664#issuecomment-1642260699

   > Do you have time to review this PR?
   
   Sure. I do think that there's a different issue. This `.htaccess` file is 
only used on https://flink.apache.org project website, but not for the 
documentation that's build on https://nightlies.apache.org/flink.
   
   I don't immediately see a workflow for building the flink-ml docs: where is 
that done? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hackergin commented on a diff in pull request #22939: [FLINK-32474][table] Support time travel in table planner

2023-07-19 Thread via GitHub


hackergin commented on code in PR #22939:
URL: https://github.com/apache/flink/pull/22939#discussion_r1268198538


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java:
##
@@ -125,4 +162,118 @@ public void validateColumnListParams(
 // this makes it possible to ignore them in the validator and fall 
back to regular row types
 // see also SqlFunction#deriveType
 }
+
+@Override
+protected void registerNamespace(
+@Nullable SqlValidatorScope usingScope,
+@Nullable String alias,
+SqlValidatorNamespace ns,
+boolean forceNullable) {
+
+// Generate a new validator namespace for time travel scenario.
+// Time travel only supports constant expressions, so we need to 
investigate scenarios
+// where the period of Snapshot is a SqlIdentifier.
+Optional timeTravelNode = getTimeTravelNode(ns);
+if (usingScope != null
+&& timeTravelNode.isPresent()
+&& !(timeTravelNode.get().getPeriod() instanceof 
SqlIdentifier)) {
+SqlSnapshot sqlSnapshot = timeTravelNode.get();
+SqlNode periodNode = sqlSnapshot.getPeriod();
+SqlToRelConverter sqlToRelConverter = 
this.createSqlToRelConverter();
+RexNode rexNode = sqlToRelConverter.convertExpression(periodNode);
+RexNode simplifiedRexNode =
+FlinkRexUtil.simplify(
+sqlToRelConverter.getRexBuilder(),
+rexNode,
+relOptCluster.getPlanner().getExecutor());
+List reducedNodes = new ArrayList<>();
+relOptCluster
+.getPlanner()
+.getExecutor()
+.reduce(
+relOptCluster.getRexBuilder(),
+Collections.singletonList(simplifiedRexNode),
+reducedNodes);
+// check whether period is the unsupported expression
+if (!(reducedNodes.get(0) instanceof RexLiteral)) {
+throw new UnsupportedOperationException(
+String.format(
+"Unsupported time travel expression: %s for 
the expression can not be reduced to a constant by Flink.",
+periodNode));
+}
+
+RexLiteral rexLiteral = (RexLiteral) (reducedNodes).get(0);
+TimestampString timestampString = 
rexLiteral.getValueAs(TimestampString.class);
+checkNotNull(
+timestampString,
+"The time travel expression %s can not reduce to a valid 
timestamp string. This is a bug. Please file an issue.",
+periodNode);
+
+TableConfig tableConfig = 
ShortcutUtils.unwrapContext(relOptCluster).getTableConfig();
+ZoneId zoneId = tableConfig.getLocalTimeZone();
+long timeTravelTimestamp =
+
TimestampData.fromEpochMillis(timestampString.getMillisSinceEpoch())
+.toLocalDateTime()
+.atZone(zoneId)
+.toInstant()
+.toEpochMilli();
+
+SchemaVersion schemaVersion = 
TimestampSchemaVersion.of(timeTravelTimestamp);
+IdentifierNamespace identifierNamespace = (IdentifierNamespace) ns;
+IdentifierNamespace snapshotNameSpace =
+new IdentifierSnapshotNamespace(
+identifierNamespace,
+schemaVersion,
+((DelegatingScope) usingScope).getParent());
+ns = snapshotNameSpace;
+
+sqlSnapshot.setOperand(

Review Comment:
   Since we have reduce the period expression here, we can just build a simple 
SqlLiteral.  And we don't need to reduce again when convertting to RelNode. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hackergin commented on a diff in pull request #22939: [FLINK-32474][table] Support time travel in table planner

2023-07-19 Thread via GitHub


hackergin commented on code in PR #22939:
URL: https://github.com/apache/flink/pull/22939#discussion_r1268195677


##
flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java:
##
@@ -2927,7 +2948,37 @@ private void convertTemporalTable(Blackboard bb, SqlCall 
call) {
 
 // convert inner query, could be a table name or a derived table
 SqlNode expr = snapshot.getTableRef();
-convertFrom(bb, expr);
+SqlNode tableRef = snapshot.getTableRef();
+// Since we have simplified the SqlSnapshot in the validate phase, we 
only need to check

Review Comment:
   I mean reduce the period expression. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-32615) Cache AutoscalerInfo for each resource

2023-07-19 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-32615.
--
Fix Version/s: kubernetes-operator-1.6.0
   Resolution: Fixed

merged to main 0c341ebe13645f4e9802cfd780c5b50f59e29363

> Cache AutoscalerInfo for each resource
> --
>
> Key: FLINK-32615
> URL: https://issues.apache.org/jira/browse/FLINK-32615
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
> Fix For: kubernetes-operator-1.6.0
>
>
> Currently we always get the autoscaler info configmap through the Kubernetes 
> rest api. This is a very heavy and unnecessary operation as the lifecycle is 
> directly tied to the resource. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] gyfora merged pull request #633: [FLINK-32589] Carry over parallelism overrides between spec changes

2023-07-19 Thread via GitHub


gyfora merged PR #633:
URL: https://github.com/apache/flink-kubernetes-operator/pull/633


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-32589) Carry over parallelism overrides to prevent users from clearing them on updates

2023-07-19 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-32589.
--
Fix Version/s: kubernetes-operator-1.6.0
   Resolution: Fixed

merged to main 9fe68251eb1a37333a6e862bf7b048061396498c

> Carry over parallelism overrides to prevent users from clearing them on 
> updates
> ---
>
> Key: FLINK-32589
> URL: https://issues.apache.org/jira/browse/FLINK-32589
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Gyula Fora
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.6.0
>
>
> The autoscaler currently sets the parallelism overrides via the Flink config 
> {{pipeline.jobvertex-parallelism-overrides}}. Whenever the user posts specs 
> updates, special care needs to be taken in order to carry over existing 
> overrides. Otherwise the job will reset to the default parallelism 
> configuration. Users shouldn't have to deal with this. Instead, whenever a 
> new spec is posted which does not contain the overrides, the operator should 
> automatically apply the last-used overrides (if autoscaling is enabled).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] RanJinh commented on a diff in pull request #23000: [FLINK-32594][runtime] Use blocking ResultPartitionType if operator only outputs records on EOF

2023-07-19 Thread via GitHub


RanJinh commented on code in PR #23000:
URL: https://github.com/apache/flink/pull/23000#discussion_r1268184648


##
flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java:
##
@@ -27,4 +28,16 @@
  * method) interfaces that can be implemented via Java 8 lambdas.
  */
 @Public
-public interface Function extends java.io.Serializable {}
+public interface Function extends java.io.Serializable {

Review Comment:
   Related change is in this reply: 
https://github.com/apache/flink/pull/23000#discussion_r1267862410



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-19249) Detect broken connections in case TCP Timeout takes too long.

2023-07-19 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744645#comment-17744645
 ] 

Piotr Nowojski commented on FLINK-19249:


Hi [~Jiangang], no sorry there was no progress on that issue.

> Detect broken connections in case TCP Timeout takes too long.
> -
>
> Key: FLINK-19249
> URL: https://issues.apache.org/jira/browse/FLINK-19249
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Congxian Qiu
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> {quote}encountered this error on 1.7, after going through the master code, I 
> think the problem is still there
> {quote}
> When the network environment is not so good, the connection between the 
> server and the client may be disconnected innocently. After the 
> disconnection, the server will receive the IOException such as below
> {code:java}
> java.io.IOException: Connection timed out
>  at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>  at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>  at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>  at sun.nio.ch.IOUtil.write(IOUtil.java:51)
>  at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:468)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:403)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:367)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:639)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
>  at java.lang.Thread.run(Thread.java:748)
> {code}
> then release the view reader.
> But the job would not fail until the downstream detect the disconnection 
> because of {{channelInactive}} later(~10 min). between such time, the job can 
> still process data, but the broken channel can't transfer any data or event, 
> so snapshot would fail during this time. this will cause the job to replay 
> many data after failover.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] RanJinh commented on a diff in pull request #23000: [FLINK-32594][runtime] Use blocking ResultPartitionType if operator only outputs records on EOF

2023-07-19 Thread via GitHub


RanJinh commented on code in PR #23000:
URL: https://github.com/apache/flink/pull/23000#discussion_r1268169556


##
flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java:
##
@@ -27,4 +28,16 @@
  * method) interfaces that can be implemented via Java 8 lambdas.
  */
 @Public
-public interface Function extends java.io.Serializable {}
+public interface Function extends java.io.Serializable {
+/** Returns true iff the operator can only emit records after inputs have 
reached EOF. */
+@Internal
+default boolean isOutputEOF() {

Review Comment:
   It's more reasonable to overwrite `OperatorAttribute` in an operator, now 
you can see the new usage in  
`StreamingJobGraphGeneratorTest#testOverwriteOperatorAttributesPartitionTypes`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] mxm commented on a diff in pull request #633: [FLINK-32589] Carry over parallelism overrides between spec changes

2023-07-19 Thread via GitHub


mxm commented on code in PR #633:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/633#discussion_r1268149481


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobAutoScaler.java:
##
@@ -17,15 +17,19 @@
 
 package org.apache.flink.kubernetes.operator.reconciler.deployment;
 
-import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 
+import java.util.Map;
+
 /** Per-job Autoscaler instance. */
 public interface JobAutoScaler {
 
 /** Called as part of the reconciliation loop. Returns true if this call 
led to scaling. */
-boolean scale(FlinkResourceContext> 
ctx);
+boolean scale(FlinkResourceContext ctx);
 
 /** Called when the custom resource is deleted. */
-void cleanup(AbstractFlinkResource cr);
+void cleanup(FlinkResourceContext ctx);
+
+/** Get the current parallelism overrides for the job. */

Review Comment:
   I think I prefer this being part of the autoscaling code but we can refactor 
at a later point in time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] huwh commented on pull request #22861: [FLINK-32387][runtime] InputGateDeploymentDescriptor uses cache to avoid deserializing shuffle descriptors multiple times

2023-07-19 Thread via GitHub


huwh commented on PR #22861:
URL: https://github.com/apache/flink/pull/22861#issuecomment-1642164847

   @wanglijie95 I squash the commits and rebase to master branch. PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] twalthr commented on a diff in pull request #22593: [FLINK-32053][table-planner] Introduce StateMetadata to ExecNode to support configure operator-level state TTL via CompiledPlan

2023-07-19 Thread via GitHub


twalthr commented on code in PR #22593:
URL: https://github.com/apache/flink/pull/22593#discussion_r1268129439


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java:
##
@@ -71,20 +75,36 @@
 producedTransformations = 
StreamExecChangelogNormalize.CHANGELOG_NORMALIZE_TRANSFORMATION,
 minPlanVersion = FlinkVersion.v1_15,
 minStateVersion = FlinkVersion.v1_15)
+@ExecNodeMetadata(
+name = "stream-exec-changelog-normalize",
+version = 2,

Review Comment:
   @godfreyhe @luoyuxia @LadyForest If the testing infrastructure would be 
already in place, I would agree with increasing the version. But we are not 
testing different versions of ExecNode yet. So increasing the annotation has no 
effect.
   
   I just noticed that you bumped the ExecNode version of a couple of ExecNodes 
for the new state metadata properties in CompiledPlan. Your change actually 
allows having null  for the stateMetadataList , I'm wondering if we should not 
increase the version for 1.18. We can stay at the old ExecNode version as 1.18 
is able to consume 1.17 plans and state.
   The reason why noticed that is due to FLINK-32613 where I added an option to 
the wrong annotation during rebase and no test failed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] twalthr commented on a diff in pull request #22593: [FLINK-32053][table-planner] Introduce StateMetadata to ExecNode to support configure operator-level state TTL via CompiledPlan

2023-07-19 Thread via GitHub


twalthr commented on code in PR #22593:
URL: https://github.com/apache/flink/pull/22593#discussion_r1268129439


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java:
##
@@ -71,20 +75,36 @@
 producedTransformations = 
StreamExecChangelogNormalize.CHANGELOG_NORMALIZE_TRANSFORMATION,
 minPlanVersion = FlinkVersion.v1_15,
 minStateVersion = FlinkVersion.v1_15)
+@ExecNodeMetadata(
+name = "stream-exec-changelog-normalize",
+version = 2,

Review Comment:
   @godfreyhe @luoyuxia @LadyForest If the testing infrastructure would be 
already in place, I would agree with increasing the version. But we are not 
testing different versions of ExecNode yet. So increasing the annotation has 
not effect.
   
   I just noticed that you bumped the ExecNode version of a couple of ExecNodes 
for the new state metadata properties in CompiledPlan. Your change actually 
allows having null  for the stateMetadataList , I'm wondering if we should not 
increase the version for 1.18. We can stay at the old ExecNode version as 1.18 
is able to consume 1.17 plans and state.
   The reason why noticed that is due to FLINK-32613 where I added an option to 
the wrong annotation during rebase and no test failed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] twalthr commented on a diff in pull request #22593: [FLINK-32053][table-planner] Introduce StateMetadata to ExecNode to support configure operator-level state TTL via CompiledPlan

2023-07-19 Thread via GitHub


twalthr commented on code in PR #22593:
URL: https://github.com/apache/flink/pull/22593#discussion_r1268129439


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java:
##
@@ -71,20 +75,36 @@
 producedTransformations = 
StreamExecChangelogNormalize.CHANGELOG_NORMALIZE_TRANSFORMATION,
 minPlanVersion = FlinkVersion.v1_15,
 minStateVersion = FlinkVersion.v1_15)
+@ExecNodeMetadata(
+name = "stream-exec-changelog-normalize",
+version = 2,

Review Comment:
   @luoyuxia @LadyForest If the testing infrastructure would be already in 
place, I would agree with increasing the version. But we are not testing 
different versions of ExecNode yet. So increasing the annotation has not effect.
   
   I just noticed that you bumped the ExecNode version of a couple of ExecNodes 
for the new state metadata properties in CompiledPlan. Your change actually 
allows having null  for the stateMetadataList , I'm wondering if we should not 
increase the version for 1.18. We can stay at the old ExecNode version as 1.18 
is able to consume 1.17 plans and state.
   The reason why noticed that is due to FLINK-32613 where I added an option to 
the wrong annotation during rebase and no test failed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-pulsar] reswqa commented on a diff in pull request #55: [FLINK-24302] Extend offheap memory for JDK11 test coverage

2023-07-19 Thread via GitHub


reswqa commented on code in PR #55:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/55#discussion_r1268105952


##
flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/MiniClusterTestEnvironment.java:
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.testframe.environment.ClusterControllable;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import 
org.apache.flink.connector.testframe.environment.TestEnvironmentSettings;
+import org.apache.flink.core.execution.JobClient;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.configuration.MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL;
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.METRIC_FETCHER_UPDATE_INTERVAL_MS;
+import static 
org.apache.flink.runtime.jobgraph.SavepointConfigOptions.SAVEPOINT_PATH;
+
+/** Test environment for running jobs on Flink mini-cluster. */
+@Experimental
+public class MiniClusterTestEnvironment implements TestEnvironment, 
ClusterControllable {

Review Comment:
   IIUC, we will remove this after `FLINK-1.18` released?



##
flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerUtils.java:
##
@@ -35,9 +35,11 @@ public class FlinkContainerUtils {
 public static Configuration flinkConfiguration() {
 Configuration configuration = new Configuration();
 
-// Increase the jvm metaspace memory to avoid 
java.lang.OutOfMemoryError: Metaspace
-configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(2048));
+configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(2560));
+configuration.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, 
MemorySize.ofMebiBytes(512));
 configuration.set(TaskManagerOptions.JVM_METASPACE, 
MemorySize.ofMebiBytes(512));
+
+// Increase the jvm metaspace memory to avoid 
java.lang.OutOfMemoryError: Metaspace

Review Comment:
   Should this line of comment be moved forward a bit?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fredia commented on pull request #22772: [FLINK-19010][metric] Introduce subtask level restore metric

2023-07-19 Thread via GitHub


fredia commented on PR #22772:
URL: https://github.com/apache/flink/pull/22772#issuecomment-1642089197

   @rkhachatryan Thanks for the review, I have addressed your comments. It 
could be very nice if you could take another look :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fredia commented on a diff in pull request #22772: [FLINK-19010][metric] Introduce subtask level restore metric

2023-07-19 Thread via GitHub


fredia commented on code in PR #22772:
URL: https://github.com/apache/flink/pull/22772#discussion_r1268077347


##
docs/content/docs/ops/metrics.md:
##
@@ -1343,6 +1343,11 @@ Note that for failed checkpoints, metrics are updated on 
a best efforts basis an
   The time in nanoseconds that elapsed between the creation of the 
last checkpoint and the time when the checkpointing process has started by this 
Task. This delay shows how long it takes for the first checkpoint barrier to 
reach the task. A high value indicates back-pressure. If only a specific task 
has a long start delay, the most likely reason is data skew.
   Gauge
 
+
+  checkpointRestoreTime
+  The time in milliseconds that one task spends on 
restoring/initialization, return 0 when the task is not in 
initialization/running status.
+  Counter

Review Comment:
   I renamed `checkpointRestoreTime` to `initializationTime` in subtask level, 
and moveed it to [`IO` 
section](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#io).
   For `Availability` section, I think it's a metric that describes job level 
status, we can reuse `initializingTime/initializingTotalTime`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fredia commented on a diff in pull request #22772: [FLINK-19010][metric] Introduce subtask level restore metric

2023-07-19 Thread via GitHub


fredia commented on code in PR #22772:
URL: https://github.com/apache/flink/pull/22772#discussion_r1268060700


##
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java:
##
@@ -128,6 +134,29 @@ public TaskIOMetricGroup(TaskMetricGroup parent) {
 this.mailboxLatency =
 histogram(MetricNames.MAILBOX_LATENCY, new 
DescriptiveStatisticsHistogram(60));
 this.mailboxSize = gauge(MetricNames.MAILBOX_SIZE, new SizeGauge());
+this.initializationDuration =
+counter(
+CHECKPOINT_RESTORE_TIME,
+new Counter() {
+@Override
+public void inc() {}
+
+@Override
+public void inc(long n) {}
+
+@Override
+public void dec() {}
+
+@Override
+public void dec(long n) {}
+
+@Override
+public long getCount() {
+return getTaskInitializationDuration();
+}
+});
+this.taskStartTime = INVALID_TIMESTAMP;

Review Comment:
   Agreed, I checked `taskStartTime` in `getAccumulatedBusyTime`, if 
`taskStartTime<0`, return `Double.NaN`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fredia commented on a diff in pull request #22772: [FLINK-19010][metric] Introduce subtask level restore metric

2023-07-19 Thread via GitHub


fredia commented on code in PR #22772:
URL: https://github.com/apache/flink/pull/22772#discussion_r1268058951


##
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java:
##
@@ -95,6 +95,15 @@ void testTaskIOMetricGroup() throws InterruptedException {
 .isGreaterThanOrEqualTo(softSleepTime);
 assertThat(taskIO.getHardBackPressuredTimePerSecond().getCount())
 .isGreaterThanOrEqualTo(hardSleepTime);
+
+// test initializing time
+assertThat(taskIO.getTaskInitializationDuration()).isEqualTo(0L);
+taskIO.markTaskInitializationStarted();
+Thread.sleep(1000);
+assertThat(taskIO.getTaskInitializationDuration()).isGreaterThan(0L);

Review Comment:
   Good point, I introduced `org.apache.flink.util.clock.Clock` into 
`TaskIOMetricGroup` to avoid `sleep()`.



##
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java:
##
@@ -95,6 +95,15 @@ void testTaskIOMetricGroup() throws InterruptedException {
 .isGreaterThanOrEqualTo(softSleepTime);
 assertThat(taskIO.getHardBackPressuredTimePerSecond().getCount())
 .isGreaterThanOrEqualTo(hardSleepTime);
+
+// test initializing time
+assertThat(taskIO.getTaskInitializationDuration()).isEqualTo(0L);
+taskIO.markTaskInitializationStarted();
+Thread.sleep(1000);
+assertThat(taskIO.getTaskInitializationDuration()).isGreaterThan(0L);

Review Comment:
   Good point, I introduced `org.apache.flink.util.clock.Clock` into 
`TaskIOMetricGroup` to avoid `sleep()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] JTaky opened a new pull request, #634: [FLINK-32551] Add option to take a savepoint when deleting a flinkdeployment/flinksessionjob

2023-07-19 Thread via GitHub


JTaky opened a new pull request, #634:
URL: https://github.com/apache/flink-kubernetes-operator/pull/634

   
   
   ## What is the purpose of the change
   
   Add an operator option to make savepoint on FlinkDeployment/FlinkSessionJob 
deletion. Default behaviour is not changed and no savepoint are being made.
   
   ## Brief change log
   
   - Adds a new operator configuration
   - Inject the Operator config to all reconcilers upon creation
   - Factorise cleanUp job interface to unify Upgrade mode determination upon 
cleanup
   - Added unit test to the new cleanup behaviour
   
   ## Verifying this change
   
   
   - Additional unit tests
   - Manual tests on K8s
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
yes, Adds a new operator configuration
 - Core observer or reconciler logic that is regularly executed: yes
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? docs are generated from the config
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32619) ConfigOptions to support fallback configuration

2023-07-19 Thread Hong Liang Teoh (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744606#comment-17744606
 ] 

Hong Liang Teoh commented on FLINK-32619:
-

That's a great callout [~wangm92] . Will use that instead and close this JIRA

> ConfigOptions to support fallback configuration
> ---
>
> Key: FLINK-32619
> URL: https://issues.apache.org/jira/browse/FLINK-32619
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Configuration
>Affects Versions: 1.16.2, 1.17.1
>Reporter: Hong Liang Teoh
>Priority: Minor
>
> ConfigOptions has no option to specify a "fallback configuration" as the 
> default.
>  
> For example, if we want {{rest.cache.checkpoint-statistics.timeout}} to 
> fallback to web.refresh-interval instead of a static default value, we have 
> to specify
>  
> {code:java}
> @Documentation.OverrideDefault("web.refresh-interval")
> @Documentation.Section(Documentation.Sections.EXPERT_REST)
> public static final ConfigOption 
> CACHE_CHECKPOINT_STATISTICS_TIMEOUT =
> key("rest.cache.checkpoint-statistics.timeout")
> .durationType()
> .noDefaultValue()
> .withDescription(
> "");
>  {code}
>  
>  
> The {{.noDefault()}} is misleading as it actually has a default.
>  
> We should introduce a {{.fallbackConfiguration()}} that is handled gracefully 
> by doc generators.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-32619) ConfigOptions to support fallback configuration

2023-07-19 Thread Hong Liang Teoh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hong Liang Teoh resolved FLINK-32619.
-
Resolution: Not A Problem

> ConfigOptions to support fallback configuration
> ---
>
> Key: FLINK-32619
> URL: https://issues.apache.org/jira/browse/FLINK-32619
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Configuration
>Affects Versions: 1.16.2, 1.17.1
>Reporter: Hong Liang Teoh
>Priority: Minor
>
> ConfigOptions has no option to specify a "fallback configuration" as the 
> default.
>  
> For example, if we want {{rest.cache.checkpoint-statistics.timeout}} to 
> fallback to web.refresh-interval instead of a static default value, we have 
> to specify
>  
> {code:java}
> @Documentation.OverrideDefault("web.refresh-interval")
> @Documentation.Section(Documentation.Sections.EXPERT_REST)
> public static final ConfigOption 
> CACHE_CHECKPOINT_STATISTICS_TIMEOUT =
> key("rest.cache.checkpoint-statistics.timeout")
> .durationType()
> .noDefaultValue()
> .withDescription(
> "");
>  {code}
>  
>  
> The {{.noDefault()}} is misleading as it actually has a default.
>  
> We should introduce a {{.fallbackConfiguration()}} that is handled gracefully 
> by doc generators.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32630) The log level of job failed info should change from INFO to WARN/ERROR if job failed

2023-07-19 Thread Zili Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zili Chen reassigned FLINK-32630:
-

Assignee: Matt Wang

> The log level of job failed info should change from INFO to WARN/ERROR if job 
> failed
> 
>
> Key: FLINK-32630
> URL: https://issues.apache.org/jira/browse/FLINK-32630
> Project: Flink
>  Issue Type: Improvement
>  Components: Client / Job Submission, Runtime / Coordination
>Affects Versions: 1.17.1
>Reporter: Matt Wang
>Assignee: Matt Wang
>Priority: Minor
>
> When a job fails to submit or run, the following log level should be changed 
> to WARN or ERROR, INFO will confuse users
> {code:java}
> 2023-07-14 20:05:26,863 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
> flink_test_job (08eefd50) switched from state FAILING 
> to FAILED.
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100)
>  
> 2023-07-14 20:05:26,889 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 
> 08eefd50 reached terminal state FAILED.
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100)
> 2023-07-14 20:05:26,956 INFO  
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap 
> [] - Application FAILED: 
> java.util.concurrent.CompletionException: 
> org.apache.flink.client.deployment.application.ApplicationExecutionException: 
> Could not execute application.{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32630) The log level of job failed info should change from INFO to WARN/ERROR if job failed

2023-07-19 Thread Zili Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744605#comment-17744605
 ] 

Zili Chen commented on FLINK-32630:
---

You can ping me on a patch ready.

> The log level of job failed info should change from INFO to WARN/ERROR if job 
> failed
> 
>
> Key: FLINK-32630
> URL: https://issues.apache.org/jira/browse/FLINK-32630
> Project: Flink
>  Issue Type: Improvement
>  Components: Client / Job Submission, Runtime / Coordination
>Affects Versions: 1.17.1
>Reporter: Matt Wang
>Assignee: Matt Wang
>Priority: Minor
>
> When a job fails to submit or run, the following log level should be changed 
> to WARN or ERROR, INFO will confuse users
> {code:java}
> 2023-07-14 20:05:26,863 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
> flink_test_job (08eefd50) switched from state FAILING 
> to FAILED.
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100)
>  
> 2023-07-14 20:05:26,889 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 
> 08eefd50 reached terminal state FAILED.
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100)
> 2023-07-14 20:05:26,956 INFO  
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap 
> [] - Application FAILED: 
> java.util.concurrent.CompletionException: 
> org.apache.flink.client.deployment.application.ApplicationExecutionException: 
> Could not execute application.{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-24302) Direct buffer memory leak on Pulsar connector with Java 11

2023-07-19 Thread Zili Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744604#comment-17744604
 ] 

Zili Chen commented on FLINK-24302:
---

Pending to close. This should be an issue on the Pulsar side. Little thing we 
can do only from the connector side.

> Direct buffer memory leak on Pulsar connector with Java 11
> --
>
> Key: FLINK-24302
> URL: https://issues.apache.org/jira/browse/FLINK-24302
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.14.0
>Reporter: Yufan Sheng
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: pulsar-4.0.1
>
>
> Running the Pulsar connector with multiple split readers on Java 11 could 
> throw {{a java.lang.OutOfMemoryError exception}}.
> {code:java}
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector$RetryException:
>  Could not complete the operation. Number of retries has been exhausted. 
> Failed reason: java.lang.OutOfMemoryError: Direct buffer memory
>   at 
> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
>   at 
> java.base/java.util.concurrent.CompletableFuture$OrApply.tryFire(CompletableFuture.java:1503)
>   ... 42 more
> Caused by: 
> org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector$RetryException:
>  Could not complete the operation. Number of retries has been exhausted. 
> Failed reason: java.lang.OutOfMemoryError: Direct buffer memory
>   at 
> org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$retryOperation$4(AsyncHttpConnector.java:249)
>   ... 39 more
> Caused by: org.apache.pulsar.shade.io.netty.handler.codec.EncoderException: 
> java.lang.OutOfMemoryError: Direct buffer memory
>   at 
> org.apache.pulsar.shade.io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:104)
>   at 
> org.apache.pulsar.shade.io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:346)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
>   at 
> org.apache.pulsar.shade.io.netty.handler.stream.ChunkedWriteHandler.doFlush(ChunkedWriteHandler.java:303)
>   at 
> org.apache.pulsar.shade.io.netty.handler.stream.ChunkedWriteHandler.flush(ChunkedWriteHandler.java:132)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
>   at 
> org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1020)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:311)
>   at 
> org.apache.pulsar.shade.org.asynchttpclient.netty.request.NettyRequestSender.writeRequest(NettyRequestSender.java:420)
>   ... 23 more
> {code}
> The reason is that under Java 11, the Netty will allocate memory from the 
> pool of Java Direct Memory and is affected by the MaxDirectMemory limit. 
> Under Java 8, it allocates native memory and is not affected by that setting.
> We have to reduce the direct memory usage by using a newer Pulsar client 
> which has a memory-limits configuration.
> This issue is addressed on Pulsar, and 
> [PIP-74|https://github.com/apache/pulsar/wiki/PIP-74%3A-Pulsar-client-memory-limits]
>  has been created for resolving this issue.
> We should keep this issue open with no resolved versions until Pulsar 
> provides a new client with memory limits.
> h2. Update: 2022/08/04
> The memory limit on consumer API has been released 
> https://github.com/apache/pulsar/pull/15216, we need to add 
> autoScaledReceiverQueueSizeEnabled option to enable this feature. 

[jira] [Commented] (FLINK-24302) Direct buffer memory leak on Pulsar connector with Java 11

2023-07-19 Thread Zili Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744603#comment-17744603
 ] 

Zili Chen commented on FLINK-24302:
---

Workaround to turn on tests for JDK11 in 
https://github.com/apache/flink-connector-pulsar/pull/55.

> Direct buffer memory leak on Pulsar connector with Java 11
> --
>
> Key: FLINK-24302
> URL: https://issues.apache.org/jira/browse/FLINK-24302
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.14.0
>Reporter: Yufan Sheng
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: pulsar-4.0.1
>
>
> Running the Pulsar connector with multiple split readers on Java 11 could 
> throw {{a java.lang.OutOfMemoryError exception}}.
> {code:java}
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector$RetryException:
>  Could not complete the operation. Number of retries has been exhausted. 
> Failed reason: java.lang.OutOfMemoryError: Direct buffer memory
>   at 
> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
>   at 
> java.base/java.util.concurrent.CompletableFuture$OrApply.tryFire(CompletableFuture.java:1503)
>   ... 42 more
> Caused by: 
> org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector$RetryException:
>  Could not complete the operation. Number of retries has been exhausted. 
> Failed reason: java.lang.OutOfMemoryError: Direct buffer memory
>   at 
> org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$retryOperation$4(AsyncHttpConnector.java:249)
>   ... 39 more
> Caused by: org.apache.pulsar.shade.io.netty.handler.codec.EncoderException: 
> java.lang.OutOfMemoryError: Direct buffer memory
>   at 
> org.apache.pulsar.shade.io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:104)
>   at 
> org.apache.pulsar.shade.io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:346)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
>   at 
> org.apache.pulsar.shade.io.netty.handler.stream.ChunkedWriteHandler.doFlush(ChunkedWriteHandler.java:303)
>   at 
> org.apache.pulsar.shade.io.netty.handler.stream.ChunkedWriteHandler.flush(ChunkedWriteHandler.java:132)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
>   at 
> org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1020)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:311)
>   at 
> org.apache.pulsar.shade.org.asynchttpclient.netty.request.NettyRequestSender.writeRequest(NettyRequestSender.java:420)
>   ... 23 more
> {code}
> The reason is that under Java 11, the Netty will allocate memory from the 
> pool of Java Direct Memory and is affected by the MaxDirectMemory limit. 
> Under Java 8, it allocates native memory and is not affected by that setting.
> We have to reduce the direct memory usage by using a newer Pulsar client 
> which has a memory-limits configuration.
> This issue is addressed on Pulsar, and 
> [PIP-74|https://github.com/apache/pulsar/wiki/PIP-74%3A-Pulsar-client-memory-limits]
>  has been created for resolving this issue.
> We should keep this issue open with no resolved versions until Pulsar 
> provides a new client with memory limits.
> h2. Update: 2022/08/04
> The memory limit on consumer API has been released 
> https://github.com/apache/pulsar/pull/15216, we need to add 
> autoScaledReceiverQueueSizeEnabled option to enable this feature. This memory 
> 

[jira] [Updated] (FLINK-32630) The log level of job failed info should change from INFO to WARN/ERROR if job failed

2023-07-19 Thread Matt Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matt Wang updated FLINK-32630:
--
Summary: The log level of job failed info should change from INFO to 
WARN/ERROR if job failed  (was: The log level of job failed info should change 
from info to warn/error if job failed)

> The log level of job failed info should change from INFO to WARN/ERROR if job 
> failed
> 
>
> Key: FLINK-32630
> URL: https://issues.apache.org/jira/browse/FLINK-32630
> Project: Flink
>  Issue Type: Improvement
>  Components: Client / Job Submission, Runtime / Coordination
>Affects Versions: 1.17.1
>Reporter: Matt Wang
>Priority: Minor
>
> When a job fails to submit or run, the following log level should be changed 
> to WARN or ERROR, INFO will confuse users
> {code:java}
> 2023-07-14 20:05:26,863 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
> flink_test_job (08eefd50) switched from state FAILING 
> to FAILED.
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100)
>  
> 2023-07-14 20:05:26,889 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 
> 08eefd50 reached terminal state FAILED.
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100)
> 2023-07-14 20:05:26,956 INFO  
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap 
> [] - Application FAILED: 
> java.util.concurrent.CompletionException: 
> org.apache.flink.client.deployment.application.ApplicationExecutionException: 
> Could not execute application.{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32630) The log level of job failed info should change from info to warn/error if job failed

2023-07-19 Thread Matt Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matt Wang updated FLINK-32630:
--
Summary: The log level of job failed info should change from info to 
warn/error if job failed  (was: The log level should change from info to 
warn/error if job failed)

> The log level of job failed info should change from info to warn/error if job 
> failed
> 
>
> Key: FLINK-32630
> URL: https://issues.apache.org/jira/browse/FLINK-32630
> Project: Flink
>  Issue Type: Improvement
>  Components: Client / Job Submission, Runtime / Coordination
>Affects Versions: 1.17.1
>Reporter: Matt Wang
>Priority: Minor
>
> When a job fails to submit or run, the following log level should be changed 
> to WARN or ERROR, INFO will confuse users
> {code:java}
> 2023-07-14 20:05:26,863 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
> flink_test_job (08eefd50) switched from state FAILING 
> to FAILED.
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100)
>  
> 2023-07-14 20:05:26,889 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 
> 08eefd50 reached terminal state FAILED.
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100)
> 2023-07-14 20:05:26,956 INFO  
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap 
> [] - Application FAILED: 
> java.util.concurrent.CompletionException: 
> org.apache.flink.client.deployment.application.ApplicationExecutionException: 
> Could not execute application.{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32630) The log level should change from info to warn/error if job failed

2023-07-19 Thread Matt Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744584#comment-17744584
 ] 

Matt Wang commented on FLINK-32630:
---

hi, [~tison] [~Weijie Guo] I think this is a point that can be optimized, can 
you guys assign this Jira to me

> The log level should change from info to warn/error if job failed
> -
>
> Key: FLINK-32630
> URL: https://issues.apache.org/jira/browse/FLINK-32630
> Project: Flink
>  Issue Type: Improvement
>  Components: Client / Job Submission, Runtime / Coordination
>Affects Versions: 1.17.1
>Reporter: Matt Wang
>Priority: Minor
>
> When a job fails to submit or run, the following log level should be changed 
> to WARN or ERROR, INFO will confuse users
> {code:java}
> 2023-07-14 20:05:26,863 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
> flink_test_job (08eefd50) switched from state FAILING 
> to FAILED.
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100)
>  
> 2023-07-14 20:05:26,889 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 
> 08eefd50 reached terminal state FAILED.
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100)
> 2023-07-14 20:05:26,956 INFO  
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap 
> [] - Application FAILED: 
> java.util.concurrent.CompletionException: 
> org.apache.flink.client.deployment.application.ApplicationExecutionException: 
> Could not execute application.{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32586) Enable input locality in SimpleExecutionSlotAllocator

2023-07-19 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-32586.
---
Resolution: Done

Done via e732edb41a423f19d5eefc397ddbfacadaf0179e

> Enable input locality in SimpleExecutionSlotAllocator
> -
>
> Key: FLINK-32586
> URL: https://issues.apache.org/jira/browse/FLINK-32586
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: xingbe
>Assignee: xingbe
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> At present, the AdaptiveBatchScheduler uses the 
> `SimpleExecutionSlotAllocator` to assign slot to execution, but it currently 
> lacks support for the capability of input locality, which may increase 
> unnecessary data transmission overhead. In this issue, we aim to enable the 
> `SimpleExecutionSlotAllocator` to support the input locality.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32630) The log level should change from info to warn/error if job failed

2023-07-19 Thread Matt Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matt Wang updated FLINK-32630:
--
Description: 
When a job fails to submit or run, the following log level should be changed to 
WARN or ERROR, INFO will confuse users
{code:java}
2023-07-14 20:05:26,863 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
flink_test_job (08eefd50) switched from state FAILING 
to FAILED.
org.apache.flink.runtime.JobException: Recovery is suppressed by 
FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100)
 
2023-07-14 20:05:26,889 INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 
08eefd50 reached terminal state FAILED.
org.apache.flink.runtime.JobException: Recovery is suppressed by 
FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100)

2023-07-14 20:05:26,956 INFO  
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap 
[] - Application FAILED: 
java.util.concurrent.CompletionException: 
org.apache.flink.client.deployment.application.ApplicationExecutionException: 
Could not execute application.{code}

  was:
When a job fails to submit or run, the following log level should be changed to 
WARN or ERROR, INFO will confuse users
{code:java}
2023-07-14 20:05:26,863 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
flink_test_job (08eefd50) switched from state FAILING 
to FAILED.
org.apache.flink.runtime.JobException: Recovery is suppressed by 
FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100){code}
{code:java}
 {code}


> The log level should change from info to warn/error if job failed
> -
>
> Key: FLINK-32630
> URL: https://issues.apache.org/jira/browse/FLINK-32630
> Project: Flink
>  Issue Type: Improvement
>  Components: Client / Job Submission, Runtime / Coordination
>Affects Versions: 1.17.1
>Reporter: Matt Wang
>Priority: Minor
>
> When a job fails to submit or run, the following log level should be changed 
> to WARN or ERROR, INFO will confuse users
> {code:java}
> 2023-07-14 20:05:26,863 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
> flink_test_job (08eefd50) switched from state FAILING 
> to FAILED.
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100)
>  
> 2023-07-14 20:05:26,889 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 
> 08eefd50 reached terminal state FAILED.
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100)
> 2023-07-14 20:05:26,956 INFO  
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap 
> [] - Application FAILED: 
> java.util.concurrent.CompletionException: 
> org.apache.flink.client.deployment.application.ApplicationExecutionException: 
> Could not execute application.{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zhuzhurk closed pull request #23009: [FLINK-32586][coordination] Enable input locality in SimpleExecutionSlotAllocator.

2023-07-19 Thread via GitHub


zhuzhurk closed pull request #23009: [FLINK-32586][coordination] Enable input 
locality in SimpleExecutionSlotAllocator.
URL: https://github.com/apache/flink/pull/23009


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-32630) The log level should change from info to warn/error if job failed

2023-07-19 Thread Matt Wang (Jira)
Matt Wang created FLINK-32630:
-

 Summary: The log level should change from info to warn/error if 
job failed
 Key: FLINK-32630
 URL: https://issues.apache.org/jira/browse/FLINK-32630
 Project: Flink
  Issue Type: Improvement
  Components: Client / Job Submission, Runtime / Coordination
Affects Versions: 1.17.1
Reporter: Matt Wang


When a job fails to submit or run, the following log level should be changed to 
WARN or ERROR, INFO will confuse users
{code:java}
2023-07-14 20:05:26,863 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
flink_test_job (08eefd50) switched from state FAILING 
to FAILED.
org.apache.flink.runtime.JobException: Recovery is suppressed by 
FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100){code}
{code:java}
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-32552) Mixed up Flink session job deployments

2023-07-19 Thread Fabio Wanner (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabio Wanner resolved FLINK-32552.
--
Release Note: Not a bug of the flink k8s operator.
  Resolution: Not A Bug

> Mixed up Flink session job deployments
> --
>
> Key: FLINK-32552
> URL: https://issues.apache.org/jira/browse/FLINK-32552
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Fabio Wanner
>Priority: Major
>
> *Context*
> In the scope of end-to-end tests we deploy all the Flink session jobs we have 
> regularly in a staging environment. Some of the jobs are bundled together in 
> one helm chart and therefore deployed at the same time. There are around 40 
> individual Flink jobs (running on the same Flink session cluster). The 
> session cluster is individual for each e2e test run. The problems described 
> below happen scarcely (1 in ~ 50 run maybe).
> *Problem*
> Rarely the operator seems to "mix up" the deployments. This can be seen in 
> the Flink cluster logs as multiple {{Received JobGraph submission ' NAME>' ()}} logs are created from jobs with the same job_id. This 
> results in errors such as:
> {{DuplicateJobSubmissionException}} or {{ClassNotFoundException.}}
> It' also visible in the FlinkSessionJob resource: status.jobStatus.jobName 
> does not match the expected job name of the job being deployed (The job name 
> is passed to the application via argument).
> So far we were unable to reliably reproduce the error.
> *Details*
> The following lines show the status of 3 jobs form the view point of the 
> Flink cluster dashboard, and the FlinkSessionJob ressource:
>  
> *aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615*
> Apache Flink Dashboard:
>  * State: Restarting
>  * ID: a7d36f3881f943a2
>  * Exceptions: Cannot load user class: 
> aelps.pipelines.aletsch.smc.SMCUrlMapper
> FlinkSessionJob Ressource:
>  * State: RUNNING
>  * jobId: a1221c743367497b0002
>  * uid: a1221c74-3367-497b-ad2f-8793ab23919d
>  
> *aletsch_mat_e5730831db8092adb12f5189c4c895ef3a268615*
> Apache Flink Dashboard:
>  * State: -
>  * ID: -
> FlinkSessionJob Ressource:
>  * State: UPGRADING
>  * jobId: -
>  * uid: a7d36f38-81f9-43a0-898f-19b950430e9d
> Flink K8s Operator:
>  * Exceptions: DuplicateJobSubmissionException: Job has already been 
> submitted.
>  
> *aletsch_wp_wafer_e5730831db8092adb12f5189c4c895ef3a268615*
> Apache Flink Dashboard:
>  * State: Running
>  * ID: e692c2dfaa18441c0002
>  * Exceptions: -
> FlinkSessionJob Ressource:
>  * State: RUNNING
>  * jobId: e692c2dfaa18441c0002
>  * uid: e692c2df-aa18-441c-a352-88aefa9a3017
> As we can see the *aletsch_smc* job is presumably running according to the 
> FlinkSessionJob resource, but crash-looping in the cluster and it has the 
> jobID matching the uid of the resource of {*}aletsch_mat{*}. While 
> *aletsch_mat* is not even running. The following logs also show some 
> suspicious entries: There are several {{Received JobGraph submission}} from 
> different jobs with the same jobID.
>  
> *Logs*
> The logs are filtered by the 3 jobIds from above.
>  
> JobID: a7d36f3881f943a2
> {code:bash}
> Flink Cluster
> ...
> 023-07-06 10:23:50,552 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
> aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 
> (a7d36f3881f943a2) switched from state RUNNING to RESTARTING.
> 2023-07-06 10:23:50   file: 
> '/tmp/tm_10.0.11.159:6122-e9fadc/blobStorage/job_a7d36f3881f943a2/blob_p-40c7a30adef8868254191d2cf2dbc4cb7ab46f0d-8a02a0583d91c5e8e6c94f378aa444c2'
>  (valid JAR)
> 2023-07-06 10:23:50,522 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager 
> [] - Received resource requirements from job 
> a7d36f3881f943a2: 
> [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
> numberOfRequiredSlots=4}]
> 2023-07-06 10:23:50,522 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager 
> [] - Received resource requirements from job 
> a7d36f3881f943a2: 
> [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
> numberOfRequiredSlots=3}]
> 2023-07-06 10:23:50,522 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager 
> [] - Received resource requirements from job 
> a7d36f3881f943a2: 
> [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
> numberOfRequiredSlots=2}]
> 2023-07-06 10:23:50,522 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager 
> [] - Received resource requirements from job 
> a7d36f3881f943a2: 
> 

[GitHub] [flink] pvary commented on a diff in pull request #22694: [FLINK-32223][runtime][security] Add Hive delegation token support

2023-07-19 Thread via GitHub


pvary commented on code in PR #22694:
URL: https://github.com/apache/flink/pull/22694#discussion_r1267947674


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenProvider.java:
##
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.security.token;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.security.token.DelegationTokenProvider;
+import 
org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenConverter;
+import org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.time.Clock;
+import java.util.Optional;
+
+import static org.apache.flink.runtime.hadoop.HadoopUserUtils.getIssueDate;
+
+/** Delegation token provider for HiveServer2. */
+@Internal
+public class HiveServer2DelegationTokenProvider implements 
DelegationTokenProvider {
+
+private static final Logger LOG =
+LoggerFactory.getLogger(HiveServer2DelegationTokenProvider.class);
+
+org.apache.hadoop.conf.Configuration hiveConf;
+
+private KerberosLoginProvider kerberosLoginProvider;
+
+private Long tokenRenewalInterval;
+
+@Override
+public String serviceName() {
+return "HiveServer2";
+}
+
+@Override
+public void init(Configuration configuration) throws Exception {
+hiveConf = getHiveConfiguration(configuration);
+kerberosLoginProvider = new KerberosLoginProvider(configuration);
+}
+
+private org.apache.hadoop.conf.Configuration 
getHiveConfiguration(Configuration conf) {
+try {
+org.apache.hadoop.conf.Configuration hadoopConf =
+HadoopUtils.getHadoopConfiguration(conf);
+hiveConf = new HiveConf(hadoopConf, HiveConf.class);
+} catch (Exception | NoClassDefFoundError e) {
+LOG.warn("Fail to create HiveServer2 Configuration", e);
+}
+return hiveConf;
+}
+
+@Override
+public boolean delegationTokensRequired() throws Exception {
+/**
+ * The general rule how a provider/receiver must behave is the 
following: The provider and
+ * the receiver must be added to the classpath together with all the 
additionally required
+ * dependencies.
+ *
+ * This null check is required because the HiveServer2 provider is 
always on classpath
+ * but Hive jars are optional. Such case configuration is not able to 
be loaded. This
+ * construct is intended to be removed when HiveServer2 
provider/receiver pair can be
+ * externalized (namely if a provider/receiver throws an exception 
then workload must be
+ * stopped).
+ */
+if (hiveConf == null) {
+LOG.debug(
+"HiveServer2 is not available (not packaged with this 
application), hence no "
++ "hiveServer2 tokens will be acquired.");
+return false;
+}
+try {
+if 
(!HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser())) 
{
+LOG.debug(
+"Hadoop Kerberos is not enabled,hence no hiveServer2 
tokens will be acquired.");
+return false;
+}
+} catch (IOException e) {
+LOG.debug(
+"Hadoop Kerberos is 

[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #633: [FLINK-32589] Carry over parallelism overrides between spec changes

2023-07-19 Thread via GitHub


gyfora commented on PR #633:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/633#issuecomment-1641872870

   @mxm let me know if you have further comments


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-32623) Rest api doesn't return minimum resource requirements correctly

2023-07-19 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-32623.

Resolution: Fixed

master: 6b3d291f6a573fb34a528313e5683d3a48a66771

> Rest api doesn't return minimum resource requirements correctly
> ---
>
> Key: FLINK-32623
> URL: https://issues.apache.org/jira/browse/FLINK-32623
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Affects Versions: 1.18.0
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> The resource requirements returned by the rest api always return a hardcoded 
> 1 lower bound for each vertex.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zentol merged pull request #23014: [FLINK-32623] Return correct vertex resource lower bound

2023-07-19 Thread via GitHub


zentol merged PR #23014:
URL: https://github.com/apache/flink/pull/23014


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] RanJinh commented on a diff in pull request #23000: [FLINK-32594][runtime] Use blocking ResultPartitionType if operator only outputs records on EOF

2023-07-19 Thread via GitHub


RanJinh commented on code in PR #23000:
URL: https://github.com/apache/flink/pull/23000#discussion_r1267888031


##
flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java:
##
@@ -637,4 +637,16 @@ public int hashCode() {
 result = 31 * result + (int) (bufferTimeout ^ (bufferTimeout >>> 32));
 return result;
 }
+

Review Comment:
   Yeah, you are right. I will modify this, and we should also update the FLIP.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32592) (Stream)ExEnv#initializeContextEnvironment isn't thread-safe

2023-07-19 Thread Chesnay Schepler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744529#comment-17744529
 ] 

Chesnay Schepler commented on FLINK-32592:
--

master: 13d35365f677813d5f0090f121e14e8bdec646d1
1.17: TBD
1.16: TBD

> (Stream)ExEnv#initializeContextEnvironment isn't thread-safe
> 
>
> Key: FLINK-32592
> URL: https://issues.apache.org/jira/browse/FLINK-32592
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.15.4, 1.18.0, 1.17.1
>Reporter: Fabio Wanner
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
>
> *Context*
> We are using the flink-k8s-operator to deploy multiple jobs (up to 32) to a 
> single session cluster. The job submissions done by the operator happen 
> concurrently, basically at the same time.
> Operator version: 1.5.0
> Flink version:  1.15.4, 1.7.1, 1.18 (master@f37d41cf)
> *Problem*
> Rarely (~once every 50 deployments) one of the jobs will not be executed. In 
> the following incident 4 jobs are deployed at the same time:
>  * gorner-task-staging-e5730831
>  * gorner-facility-staging-e5730831
>  * gorner-aepp-staging-e5730831
>  * gorner-session-staging-e5730831
>  
> The operator submits the job, they all get a reasonable jobID:
> {code:java}
> 2023-07-14 10:25:35,295 o.a.f.k.o.s.AbstractFlinkService [INFO 
> ][aelps-staging/gorner-task-staging-e5730831] Submitting job: 
> 4968b186061e44390002 to session cluster.
> 2023-07-14 10:25:35,297 o.a.f.k.o.s.AbstractFlinkService [INFO 
> ][aelps-staging/gorner-facility-staging-e5730831] Submitting job: 
> 91a5260d916c4dff0002 to session cluster.
> 2023-07-14 10:25:35,301 o.a.f.k.o.s.AbstractFlinkService [INFO 
> ][aelps-staging/gorner-aepp-staging-e5730831] Submitting job: 
> 103c0446e14749a10002 to session cluster.
> 2023-07-14 10:25:35,302 o.a.f.k.o.s.AbstractFlinkService [INFO 
> ][aelps-staging/gorner-session-staging-e5730831] Submitting job: 
> de59304d370b4b8e0002 to session cluster.
> {code}
> In the cluster the JarRunHandler's handleRequest() method will get the 
> request, all 4 jobIDs are present (also all args, etc are correct):
> {code:java}
> 2023-07-14 10:25:35,320 WARN  
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - 
> handleRequest - requestBody.jobId: 4968b186061e44390002
> 2023-07-14 10:25:35,321 WARN  
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - 
> handleRequest - requestBody.jobId: de59304d370b4b8e0002
> 2023-07-14 10:25:35,321 WARN  
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - 
> handleRequest - requestBody.jobId: 91a5260d916c4dff0002
> 2023-07-14 10:25:35,321 WARN  
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - 
> handleRequest - requestBody.jobId: 103c0446e14749a10002
> {code}
> But once the EmbeddedExecutor's submitAndGetJobClientFuture() method is 
> called instead of getting 1 call per jobID we have 4 calls but one of the 
> jobIDs twice:
> {code:java}
> 2023-07-14 10:25:35,616 WARN  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - execute - optJobId: Optional[4968b186061e44390002]
> 2023-07-14 10:25:35,616 WARN  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - execute - optJobId: Optional[103c0446e14749a10002]
> 2023-07-14 10:25:35,616 WARN  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - execute - optJobId: Optional[de59304d370b4b8e0002]
> 2023-07-14 10:25:35,721 WARN  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - execute - optJobId: Optional[de59304d370b4b8e0002]
> {code}
> If this is important: the jobGraph obtained does not match the jobID. We get 
> 2 times de59304d370b4b8e0002 but the jobgraph for this jobID is 
> never returned by getJobGraph() in 
> EmbeddedExecutor.submitAndGetJobClientFuture().
> This will then lead to the job already existing:
> {code:java}
> 2023-07-14 10:25:35,616 WARN  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - execute - submittedJobIds: []
> 2023-07-14 10:25:35,616 WARN  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - execute - submittedJobIds: []
> 2023-07-14 10:25:35,616 WARN  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - execute - submittedJobIds: []
> 2023-07-14 10:25:35,721 WARN  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - execute - submittedJobIds: [de59304d370b4b8e0002]
> {code}
> 

[GitHub] [flink] zentol merged pull request #22997: [FLINK-32592] Fix (Stream)ExEnv#initializeContextEnvironment thread-safety

2023-07-19 Thread via GitHub


zentol merged PR #22997:
URL: https://github.com/apache/flink/pull/22997


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] yunfengzhou-hub commented on a diff in pull request #22931: [FLINK-32514] Support configuring checkpointing interval during process backlog

2023-07-19 Thread via GitHub


yunfengzhou-hub commented on code in PR #22931:
URL: https://github.com/apache/flink/pull/22931#discussion_r1267876016


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##
@@ -2079,11 +2149,19 @@ private final class ScheduledTrigger implements 
Runnable {
 
 @Override
 public void run() {
+long currentTime = clock.relativeTimeMillis();
+if (lastCheckpointTriggeringRelativeTime != NO_CHECKPOINT
+&& currentTime - lastCheckpointTriggeringRelativeTime < 
baseInterval) {
+return;
+}
+lastCheckpointTriggeringRelativeTime = currentTime;
+
 try {
 triggerCheckpoint(checkpointProperties, null, true);
 } catch (Exception e) {
 LOG.error("Exception while triggering checkpoint for job {}.", 
job, e);
 }
+timer.schedule(this, baseInterval, TimeUnit.MILLISECONDS);

Review Comment:
   I'm not sure I understand the expected behavior clearly. The current 
implementation ensures that the next checkpoint will be <= T2+ X.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on pull request #22997: [FLINK-32592] Fix (Stream)ExEnv#initializeContextEnvironment thread-safety

2023-07-19 Thread via GitHub


zentol commented on PR #22997:
URL: https://github.com/apache/flink/pull/22997#issuecomment-1641827665

   > I'm curious about the fallback behavior in case you don't have thread 
local context. 
   
   An atomic reference or volatile field would be better than what exists right 
now. I don't know why the field wasn't removed when the thread local was 
introduced; maybe for backwards-compatibility where 1 thread sets the context 
but another runs the job? :shrug: 
   
   My initial feeling was that this field should just be removed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] yunfengzhou-hub commented on a diff in pull request #22931: [FLINK-32514] Support configuring checkpointing interval during process backlog

2023-07-19 Thread via GitHub


yunfengzhou-hub commented on code in PR #22931:
URL: https://github.com/apache/flink/pull/22931#discussion_r1267876016


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##
@@ -2079,11 +2149,19 @@ private final class ScheduledTrigger implements 
Runnable {
 
 @Override
 public void run() {
+long currentTime = clock.relativeTimeMillis();
+if (lastCheckpointTriggeringRelativeTime != NO_CHECKPOINT
+&& currentTime - lastCheckpointTriggeringRelativeTime < 
baseInterval) {
+return;
+}
+lastCheckpointTriggeringRelativeTime = currentTime;
+
 try {
 triggerCheckpoint(checkpointProperties, null, true);
 } catch (Exception e) {
 LOG.error("Exception while triggering checkpoint for job {}.", 
job, e);
 }
+timer.schedule(this, baseInterval, TimeUnit.MILLISECONDS);

Review Comment:
   I'm not sure I understand the expected behavior clearly. The current 
implementation ensures that the next checkpoint will be <= T2+ X, depending on 
the random init delay.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #22997: [FLINK-32592] Fix (Stream)ExEnv#initializeContextEnvironment thread-safety

2023-07-19 Thread via GitHub


zentol commented on code in PR #22997:
URL: https://github.com/apache/flink/pull/22997#discussion_r1267872976


##
flink-java/src/test/java/org/apache/flink/api/java/ExecutionEnvironmentTest.java:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java;
+
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.core.testutils.OneShotLatch;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class ExecutionEnvironmentTest {
+
+@Test
+void testConcurrentSetContext() throws Exception {

Review Comment:
   yes!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] RanJinh commented on a diff in pull request #23000: [FLINK-32594][runtime] Use blocking ResultPartitionType if operator only outputs records on EOF

2023-07-19 Thread via GitHub


RanJinh commented on code in PR #23000:
URL: https://github.com/apache/flink/pull/23000#discussion_r1267868354


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##
@@ -184,6 +184,9 @@ public static JobGraph createJobGraph(
 
 private final Map 
chainedInputOutputFormats;
 
+// The node id would be in this Set if it is an upstream node of an 
outputEOF node.

Review Comment:
   Thanks for your detailed comments! I will check all of the code comments 
according to your suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dmvk commented on pull request #22997: [FLINK-32592] Fix (Stream)ExEnv#initializeContextEnvironment thread-safety

2023-07-19 Thread via GitHub


dmvk commented on PR #22997:
URL: https://github.com/apache/flink/pull/22997#issuecomment-1641815189

   I'm curious about the fallback behavior in case you don't have thread local 
context. 樂 Especially since the `contextEnvironmentFactory` is not marked as 
volatile. I guess the correct behavior would be simply using atomicReference 
there and only allow setting it once.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] RanJinh commented on a diff in pull request #23000: [FLINK-32594][runtime] Use blocking ResultPartitionType if operator only outputs records on EOF

2023-07-19 Thread via GitHub


RanJinh commented on code in PR #23000:
URL: https://github.com/apache/flink/pull/23000#discussion_r1267862410


##
flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java:
##
@@ -27,4 +28,16 @@
  * method) interfaces that can be implemented via Java 8 lambdas.
  */
 @Public
-public interface Function extends java.io.Serializable {}
+public interface Function extends java.io.Serializable {
+/** Returns true iff the operator can only emit records after inputs have 
reached EOF. */
+@Internal
+default boolean isOutputEOF() {

Review Comment:
   Thanks for your comments! I simply want to provide a method for user defined 
function to determine the OperatorAttribute directly. Indeed, it does not make 
sense to connect `Function` and `OperatorAttribute` together. I will modify 
this later. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dmvk commented on a diff in pull request #22997: [FLINK-32592] Fix (Stream)ExEnv#initializeContextEnvironment thread-safety

2023-07-19 Thread via GitHub


dmvk commented on code in PR #22997:
URL: https://github.com/apache/flink/pull/22997#discussion_r1267858802


##
flink-java/src/test/java/org/apache/flink/api/java/ExecutionEnvironmentTest.java:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java;
+
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.core.testutils.OneShotLatch;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class ExecutionEnvironmentTest {
+
+@Test
+void testConcurrentSetContext() throws Exception {

Review Comment:
   Is it correct that without the fix, the test wasn't failing reliably but was 
just flaky?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] yunfengzhou-hub commented on a diff in pull request #22931: [FLINK-32514] Support configuring checkpointing interval during process backlog

2023-07-19 Thread via GitHub


yunfengzhou-hub commented on code in PR #22931:
URL: https://github.com/apache/flink/pull/22931#discussion_r1267857378


##
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##
@@ -164,6 +167,15 @@ public void lazyInitialize(
 context.lazyInitialize(
 globalFailureHandler, mainThreadExecutor, 
operatorCoordinatorMetricGroup);
 
+OperatorCoordinator rootCoordinator = coordinator;
+if (coordinator instanceof RecreateOnResetOperatorCoordinator) {
+rootCoordinator =
+((RecreateOnResetOperatorCoordinator) 
rootCoordinator).getInternalCoordinator();
+}
+if (rootCoordinator instanceof SourceCoordinator) {
+((SourceCoordinator) 
rootCoordinator).lazyInitialize(checkpointCoordinator);

Review Comment:
   Below is part of the invocation stack of SourceCoordinatorContext's 
constructor.
   ```
   :142, SourceCoordinatorContext 
(org.apache.flink.runtime.source.coordinator)
   :119, SourceCoordinatorContext 
(org.apache.flink.runtime.source.coordinator)
   getCoordinator:89, SourceCoordinatorProvider 
(org.apache.flink.runtime.source.coordinator)
   createNewInternalCoordinator:337, 
RecreateOnResetOperatorCoordinator$DeferrableCoordinator 
(org.apache.flink.runtime.operators.coordination)
   :60, RecreateOnResetOperatorCoordinator 
(org.apache.flink.runtime.operators.coordination)
   :43, RecreateOnResetOperatorCoordinator 
(org.apache.flink.runtime.operators.coordination)
   create:200, RecreateOnResetOperatorCoordinator$Provider 
(org.apache.flink.runtime.operators.coordination)
   create:194, RecreateOnResetOperatorCoordinator$Provider 
(org.apache.flink.runtime.operators.coordination)
   create:546, OperatorCoordinatorHolder 
(org.apache.flink.runtime.operators.coordination)
   create:509, OperatorCoordinatorHolder 
(org.apache.flink.runtime.operators.coordination)
   createOperatorCoordinatorHolder:286, ExecutionJobVertex 
(org.apache.flink.runtime.executiongraph)
   initialize:223, ExecutionJobVertex (org.apache.flink.runtime.executiongraph)
   initializeJobVertex:888, DefaultExecutionGraph 
(org.apache.flink.runtime.executiongraph)
   initializeJobVertex:218, ExecutionGraph 
(org.apache.flink.runtime.executiongraph)
   initializeJobVertices:870, DefaultExecutionGraph 
(org.apache.flink.runtime.executiongraph)
   attachJobGraph:826, DefaultExecutionGraph 
(org.apache.flink.runtime.executiongraph)
   buildGraph:219, DefaultExecutionGraphBuilder 
(org.apache.flink.runtime.executiongraph)
   createAndRestoreExecutionGraph:163, DefaultExecutionGraphFactory 
(org.apache.flink.runtime.scheduler)
   createAndRestoreExecutionGraph:368, SchedulerBase 
(org.apache.flink.runtime.scheduler)
   :210, SchedulerBase (org.apache.flink.runtime.scheduler)
   :140, DefaultScheduler (org.apache.flink.runtime.scheduler)
   ```
   
   In order to pass `checkpointCoordinator` through this stack to 
SourceCoordinatorContext's constructor, we may need to add a 
`getCheckpointCoordinator` method to `OperatorCoordinator.Context` class. This 
is a public API change that has not been agreed in the FLIP discussion, and I 
also personally think this API will yield too much freedom to 
OperatorCoordinator's developers so should not be introduced. Thus I did not 
choose this path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-pulsar] tisonkun commented on pull request #55: [FLINK-24302] Test coverage for JDK 11

2023-07-19 Thread via GitHub


tisonkun commented on PR #55:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/55#issuecomment-1641787804

   Pending to merge...
   
   @syhily do we have a ticket for supporting table connector already? Or I 
should open a new one? It should be my next step to track.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #23026: [FLINK-32558][flink-java] Deprecate all DataSet API

2023-07-19 Thread via GitHub


flinkbot commented on PR #23026:
URL: https://github.com/apache/flink/pull/23026#issuecomment-1641769442

   
   ## CI report:
   
   * fcbf13064bc547149c674d4b1c0ceaa265176221 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu closed pull request #23005: Test for path

2023-07-19 Thread via GitHub


WencongLiu closed pull request #23005: Test for path
URL: https://github.com/apache/flink/pull/23005


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu closed pull request #23002: Mark dataset related classes deprecated

2023-07-19 Thread via GitHub


WencongLiu closed pull request #23002: Mark dataset related classes deprecated
URL: https://github.com/apache/flink/pull/23002


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32558) Properly deprecate DataSet API

2023-07-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32558:
---
Labels: pull-request-available  (was: )

> Properly deprecate DataSet API
> --
>
> Key: FLINK-32558
> URL: https://issues.apache.org/jira/browse/FLINK-32558
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataSet
>Reporter: Xintong Song
>Assignee: Wencong Liu
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> DataSet API is described as "legacy", "soft deprecated" in user documentation 
> [1]. The required tasks for formally deprecating / removing it, according to 
> FLIP-131 [2], are all completed.
> This task include marking all related API classes as `@Deprecated` and update 
> the user documentation.
> [1] 
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/dataset/overview/
> [2] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] WencongLiu opened a new pull request, #23026: [FLINK-32558][flink-java] Deprecate all DataSet API

2023-07-19 Thread via GitHub


WencongLiu opened a new pull request, #23026:
URL: https://github.com/apache/flink/pull/23026

   ## What is the purpose of the change
   
   The entry points for using the DataSet API were deprecated. This PR ensures 
that all @Public, @PublicEvolving and @Experimental APIs are also marked as 
@Deprecated.
   This PR doesn't have any code changes outside of deprecating these classes. 
Classes that are not scheduled for removal might still need to be migrated.
   
   
   ## Brief change log
   
 - All classes that contain one of those methods are also marked as 
deprecated with a link to 
[FLIP-131](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741)
 .
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32627) Add support for dynamic time window function

2023-07-19 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-32627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744499#comment-17744499
 ] 

张一帆 commented on FLINK-32627:
-

[~martijnvisser] Thank you

> Add support for dynamic time window function
> 
>
> Key: FLINK-32627
> URL: https://issues.apache.org/jira/browse/FLINK-32627
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Affects Versions: 1.18.0
>Reporter: 张一帆
>Assignee: 张一帆
>Priority: Major
>  Labels: pull-request-available
>
> When using windows for calculations, when the logic is frequently modified 
> and adjusted, the entire program needs to be stopped, the code is modified, 
> the program is repackaged and then submitted to the cluster. It is impossible 
> to achieve logic dynamic modification and external dynamic injection. The 
> window information can be obtained from the data to trigger Redistribution of 
> windows to achieve the effect of dynamic windows



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32627) Add support for dynamic time window function

2023-07-19 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744497#comment-17744497
 ] 

Martijn Visser commented on FLINK-32627:


[~zhangyf] I've just granted you the permissions

> Add support for dynamic time window function
> 
>
> Key: FLINK-32627
> URL: https://issues.apache.org/jira/browse/FLINK-32627
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Affects Versions: 1.18.0
>Reporter: 张一帆
>Assignee: 张一帆
>Priority: Major
>  Labels: pull-request-available
>
> When using windows for calculations, when the logic is frequently modified 
> and adjusted, the entire program needs to be stopped, the code is modified, 
> the program is repackaged and then submitted to the cluster. It is impossible 
> to achieve logic dynamic modification and external dynamic injection. The 
> window information can be obtained from the data to trigger Redistribution of 
> windows to achieve the effect of dynamic windows



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32627) Add support for dynamic time window function

2023-07-19 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-32627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744494#comment-17744494
 ] 

张一帆 commented on FLINK-32627:
-

[~martijnvisser]  How do I create a FLIP, I don't seem to have permission

> Add support for dynamic time window function
> 
>
> Key: FLINK-32627
> URL: https://issues.apache.org/jira/browse/FLINK-32627
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Affects Versions: 1.18.0
>Reporter: 张一帆
>Assignee: 张一帆
>Priority: Major
>  Labels: pull-request-available
>
> When using windows for calculations, when the logic is frequently modified 
> and adjusted, the entire program needs to be stopped, the code is modified, 
> the program is repackaged and then submitted to the cluster. It is impossible 
> to achieve logic dynamic modification and external dynamic injection. The 
> window information can be obtained from the data to trigger Redistribution of 
> windows to achieve the effect of dynamic windows



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-29039) RowData produced by LineBytesInputFormat is reused, but DeserializationSchemaAdapter#Reader only shallow copies produced data, thus result will always be the last row v

2023-07-19 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser reassigned FLINK-29039:
--

Assignee: Hang Ruan

> RowData produced by LineBytesInputFormat is reused, but 
> DeserializationSchemaAdapter#Reader only shallow copies produced data, thus 
> result will always be the last row value
> 
>
> Key: FLINK-29039
> URL: https://issues.apache.org/jira/browse/FLINK-29039
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.1
> Environment: This issue was discovered on MacOS Big Sur.
>Reporter: Marco A. Villalobos
>Assignee: Hang Ruan
>Priority: Major
>
> RowData produced by LineBytesInputFormat is reused, but 
> DeserializationSchemaAdapter#Reader only shallow copies produced data, thus 
> result will always be the last row value.
>  
> Given this program:
> {code:java}
> package mvillalobos.bug;
> import org.apache.flink.api.common.RuntimeExecutionMode;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.TableResult;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import static org.apache.flink.table.api.Expressions.$;
> public class IsThisABatchSQLBug {  
>public static void main(String[] args) {
>      final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>      env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>      final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
>      tableEnv.executeSql("CREATE TABLE historical_raw_source_template(\n" +
>            "        `file.path`              STRING NOT NULL METADATA,\n" +
>            "        `file.name`              STRING NOT NULL METADATA,\n" +
>            "        `file.size`              BIGINT NOT NULL METADATA,\n" +
>            "        `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL 
> METADATA,\n" +
>            "        line                    STRING\n" +
>            "      ) WITH (\n" +
>            "        'connector' = 'filesystem', \n" +
>            "        'format' = 'raw'\n" +
>            "      );");
>      tableEnv.executeSql("CREATE TABLE historical_raw_source\n" +
>            "      WITH (\n" +
>            "        'path' = 
> '/Users/minmay/dev/mvillalobos/historical/data'\n" +
>            "      ) LIKE historical_raw_source_template;");     final 
> TableResult output = 
> tableEnv.from("historical_raw_source").select($("line")).execute();
>      output.print();
>   }
> } {code}
> and this sample.csv file in the 
> '/Users/minmay/dev/mvillalobos/historical/data' directory:
> {code:java}
> one
> two
> three
> four
> five
> six
> seven
> eight
> nine
> ten {code}
> {{The print results are:}}
> {code:java}
> +++
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> +++
> 10 rows in set {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] yunfengzhou-hub commented on a diff in pull request #22931: [FLINK-32514] Support configuring checkpointing interval during process backlog

2023-07-19 Thread via GitHub


yunfengzhou-hub commented on code in PR #22931:
URL: https://github.com/apache/flink/pull/22931#discussion_r1267760838


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##
@@ -2079,11 +2149,19 @@ private final class ScheduledTrigger implements 
Runnable {
 
 @Override
 public void run() {
+long currentTime = clock.relativeTimeMillis();
+if (lastCheckpointTriggeringRelativeTime != NO_CHECKPOINT
+&& currentTime - lastCheckpointTriggeringRelativeTime < 
baseInterval) {

Review Comment:
   With the current implementation in which 
`lastCheckpointTriggeringRelativeTime` could be `Long.MIN_VALUE`, `currentTime 
- lastCheckpointTriggeringRelativeTime` could cause stack overflow. Thus I 
treat `Long.MIN_VALUE` specially here. This would be resolved after I replace 
the default value with 0 or -1 according to the previous comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] yunfengzhou-hub commented on a diff in pull request #22931: [FLINK-32514] Support configuring checkpointing interval during process backlog

2023-07-19 Thread via GitHub


yunfengzhou-hub commented on code in PR #22931:
URL: https://github.com/apache/flink/pull/22931#discussion_r1267760838


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##
@@ -2079,11 +2149,19 @@ private final class ScheduledTrigger implements 
Runnable {
 
 @Override
 public void run() {
+long currentTime = clock.relativeTimeMillis();
+if (lastCheckpointTriggeringRelativeTime != NO_CHECKPOINT
+&& currentTime - lastCheckpointTriggeringRelativeTime < 
baseInterval) {

Review Comment:
   With the current implementation in which 
`lastCheckpointTriggeringRelativeTime` could be `Long.MIN_VALUE`, `currentTime 
- lastCheckpointTriggeringRelativeTime` could cause stack overflow. Thus I 
treat `Long.MIN_VALUE` specially here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] yunfengzhou-hub commented on a diff in pull request #22931: [FLINK-32514] Support configuring checkpointing interval during process backlog

2023-07-19 Thread via GitHub


yunfengzhou-hub commented on code in PR #22931:
URL: https://github.com/apache/flink/pull/22931#discussion_r1267760838


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##
@@ -2079,11 +2149,19 @@ private final class ScheduledTrigger implements 
Runnable {
 
 @Override
 public void run() {
+long currentTime = clock.relativeTimeMillis();
+if (lastCheckpointTriggeringRelativeTime != NO_CHECKPOINT
+&& currentTime - lastCheckpointTriggeringRelativeTime < 
baseInterval) {

Review Comment:
   With the current implementation in which 
`lastCheckpointTriggeringRelativeTime` could be `Long.MIN_VALUE`, `currentTime 
- lastCheckpointTriggeringRelativeTime` could cause stack overflow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #23025: [FLINK-7129]Support dynamically changing CEP patterns

2023-07-19 Thread via GitHub


flinkbot commented on PR #23025:
URL: https://github.com/apache/flink/pull/23025#issuecomment-1641670271

   
   ## CI report:
   
   * 1e5b0a3d55605fe1e6de27331a6c204b9824dc62 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >