[GitHub] [flink] hanyuzheng7 commented on pull request #22842: [FLINK-32261][table] Add built-in MAP_UNION function.
hanyuzheng7 commented on PR #22842: URL: https://github.com/apache/flink/pull/22842#issuecomment-1643174801 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30559) May get wrong result for `if` expression if it's string data type
[ https://issues.apache.org/jira/browse/FLINK-30559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-30559: Fix Version/s: 1.18.0 1.16.3 1.17.2 > May get wrong result for `if` expression if it's string data type > - > > Key: FLINK-30559 > URL: https://issues.apache.org/jira/browse/FLINK-30559 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: luoyuxia >Assignee: luoyuxia >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0, 1.16.3, 1.17.2 > > > Can be reproduced by the folowing code in > `org.apache.flink.table.planner.runtime.batch.sql.CalcITCase` > > {code:java} > checkResult("SELECT if(b > 10, 'ua', c) from Table3", data3) {code} > The actual result is [co, He, He, ...]. > Seems it will only get the first two characters. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-30559) May get wrong result for `if` expression if it's string data type
[ https://issues.apache.org/jira/browse/FLINK-30559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu resolved FLINK-30559. - Resolution: Duplicate > May get wrong result for `if` expression if it's string data type > - > > Key: FLINK-30559 > URL: https://issues.apache.org/jira/browse/FLINK-30559 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: luoyuxia >Assignee: luoyuxia >Priority: Major > Labels: pull-request-available > > Can be reproduced by the folowing code in > `org.apache.flink.table.planner.runtime.batch.sql.CalcITCase` > > {code:java} > checkResult("SELECT if(b > 10, 'ua', c) from Table3", data3) {code} > The actual result is [co, He, He, ...]. > Seems it will only get the first two characters. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (FLINK-30559) May get wrong result for `if` expression if it's string data type
[ https://issues.apache.org/jira/browse/FLINK-30559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reopened FLINK-30559: - > May get wrong result for `if` expression if it's string data type > - > > Key: FLINK-30559 > URL: https://issues.apache.org/jira/browse/FLINK-30559 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: luoyuxia >Assignee: luoyuxia >Priority: Major > Labels: pull-request-available > > Can be reproduced by the folowing code in > `org.apache.flink.table.planner.runtime.batch.sql.CalcITCase` > > {code:java} > checkResult("SELECT if(b > 10, 'ua', c) from Table3", data3) {code} > The actual result is [co, He, He, ...]. > Seems it will only get the first two characters. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-32559) Deprecate Queryable State
[ https://issues.apache.org/jira/browse/FLINK-32559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song reassigned FLINK-32559: Assignee: Xintong Song > Deprecate Queryable State > - > > Key: FLINK-32559 > URL: https://issues.apache.org/jira/browse/FLINK-32559 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Queryable State >Reporter: Xintong Song >Assignee: Xintong Song >Priority: Blocker > Fix For: 1.18.0 > > > Queryable State is described as approaching end-of-life in the roadmap [1], > but is neither deprecated in codes nor in user documentation [2]. There're > also more negative opinions than positive ones in the discussion about > rescuing it [3]. > [1] https://flink.apache.org/roadmap/ > [2] > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/queryable_state/ > [3] https://lists.apache.org/thread/9hmwcjb3q5c24pk3qshjvybfqk62v17m -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-5336) Make Path immutable
[ https://issues.apache.org/jira/browse/FLINK-5336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744452#comment-17744452 ] Wencong Liu edited comment on FLINK-5336 at 7/20/23 4:13 AM: - Hi all, I have checked all the classes that utilize the *Path* class. I found that there're still some classes are de/serializing the *Path* through *IOReadableWritable* interface. # {*}FileSourceSplitSerializer{*}: It de/serializes the *Path* during the process of de/serializing FileSourceSplit. # {*}TestManagedSinkCommittableSerializer{*}: It de/serializes the Path during the process of de/serializing TestManagedCommittable. # {*}TestManagedFileSourceSplitSerializer{*}: It de/serializes the Path during the process of de/serializing TestManagedIterableSourceSplit. For 1, the Path needs to be serialized to save checkpoint data in source. For 2/3, the IT case in flink-table-common depends on the Path de/serialization. [~qingyue] was (Author: JIRAUSER281639): Hi all, I have checked all the classes that utilize the *Path* class. I found that there're still some classes are de/serializing the *Path* through *IOReadableWritable* interface. # {*}FileSourceSplitSerializer{*}: It de/serializes the *Path* during the process of de/serializing FileSourceSplit. # {*}TestManagedSinkCommittableSerializer{*}: It de/serializes the Path during the process of de/serializing TestManagedCommittable. # {*}TestManagedFileSourceSplitSerializer{*}: It de/serializes the Path during the process of de/serializing TestManagedIterableSourceSplit. For 1, the Path needs to be serialized to save checkpoint data in source. For 2/3, the IT case in flink-table-common depends on the Path de/serialization. [~qingyue] In summary, I think the Path class should still need to implement the *IOReadableWritable* interface to support de/serialization. WDYT? > Make Path immutable > --- > > Key: FLINK-5336 > URL: https://issues.apache.org/jira/browse/FLINK-5336 > Project: Flink > Issue Type: Sub-task > Components: API / DataSet >Reporter: Stephan Ewen >Priority: Major > Fix For: 2.0.0 > > > The {{Path}} class is currently mutable to support the {{IOReadableWritable}} > serialization. Since that serialization is not used any more, I suggest to > drop that interface from Path and make the Path's URI final. > Being immutable, we can store configures paths properly without the chance of > them being mutated as side effects. > Many parts of the code make the assumption that the Path is immutable, being > susceptible to subtle errors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] masteryhx commented on a diff in pull request #22744: [FLINK-29802][state] Changelog supports native savepoint
masteryhx commented on code in PR #22744: URL: https://github.com/apache/flink/pull/22744#discussion_r1268880413 ## flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java: ## @@ -375,6 +378,51 @@ public RunnableFuture> snapshot( @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception { + +if (checkpointOptions.getCheckpointType().isSavepoint()) { +SnapshotType.SharingFilesStrategy sharingFilesStrategy = + checkpointOptions.getCheckpointType().getSharingFilesStrategy(); +if (sharingFilesStrategy == SnapshotType.SharingFilesStrategy.NO_SHARING) { +long materializationID = materializedId++; +// For NO_SHARING native savepoint, trigger delegated one +RunnableFuture> delegatedSnapshotResult = +keyedStateBackend.snapshot( +materializationID, timestamp, streamFactory, checkpointOptions); Review Comment: I think notification about checkpoint complete could be done like normal checkpoint by add mapping in the materializationIdByCheckpointId. notification about checkpoint abortion has not done for normal checkpoint (FLINK-25850), so I think it could be considered together in the FLINK-25850. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] LadyForest commented on a diff in pull request #22593: [FLINK-32053][table-planner] Introduce StateMetadata to ExecNode to support configure operator-level state TTL via CompiledPlan
LadyForest commented on code in PR #22593: URL: https://github.com/apache/flink/pull/22593#discussion_r1268866818 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java: ## @@ -71,20 +75,36 @@ producedTransformations = StreamExecChangelogNormalize.CHANGELOG_NORMALIZE_TRANSFORMATION, minPlanVersion = FlinkVersion.v1_15, minStateVersion = FlinkVersion.v1_15) +@ExecNodeMetadata( +name = "stream-exec-changelog-normalize", +version = 2, Review Comment: Hi @twalthr, during the implementation of FLIP-292, I did notice that the current testing for compiled plans is not comprehensive enough, such as FLINK-31884, FLINK-31917, and FLINK-32219. FLIP-292 added `ExecNodeVersionUpgradeSerdeTest` and `TransformationsTest#testUidFlink1_15` to test the deserialization of old plans using the new version, but it may not be sufficient. After reconsideration, I think we can keep the original exec node version until we have a more comprehensive testing framework. I will open a PR and hope you can help review the code. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-5336) Make Path immutable
[ https://issues.apache.org/jira/browse/FLINK-5336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744452#comment-17744452 ] Wencong Liu edited comment on FLINK-5336 at 7/20/23 2:32 AM: - Hi all, I have checked all the classes that utilize the *Path* class. I found that there're still some classes are de/serializing the *Path* through *IOReadableWritable* interface. # {*}FileSourceSplitSerializer{*}: It de/serializes the *Path* during the process of de/serializing FileSourceSplit. # {*}TestManagedSinkCommittableSerializer{*}: It de/serializes the Path during the process of de/serializing TestManagedCommittable. # {*}TestManagedFileSourceSplitSerializer{*}: It de/serializes the Path during the process of de/serializing TestManagedIterableSourceSplit. For 1, the Path needs to be serialized to save checkpoint data in source. For 2/3, the IT case in flink-table-common depends on the Path de/serialization. [~qingyue] In summary, I think the Path class should still need to implement the *IOReadableWritable* interface to support de/serialization. WDYT? was (Author: JIRAUSER281639): Hi all, I have checked all the classes that utilize the *Path* class. I found that there're still some classes are de/serialize the *Path* through *IOReadableWritable* interface. # {*}FileSourceSplitSerializer{*}: It de/serializes the *Path* during the process of de/serializing FileSourceSplit. # {*}TestManagedSinkCommittableSerializer{*}: It de/serializes the Path during the process of de/serializing TestManagedCommittable. # {*}TestManagedFileSourceSplitSerializer{*}: It de/serializes the Path during the process of de/serializing TestManagedIterableSourceSplit. For 1, the Path needs to be serialized to save checkpoint data in source. [~sewen] For 2/3, the IT case in flink-table-common depends on the Path de/serialization. [~qingyue] In summary, I think the Path class should still need to implement the *IOReadableWritable* interface to support de/serialization. WDYT? > Make Path immutable > --- > > Key: FLINK-5336 > URL: https://issues.apache.org/jira/browse/FLINK-5336 > Project: Flink > Issue Type: Sub-task > Components: API / DataSet >Reporter: Stephan Ewen >Priority: Major > Fix For: 2.0.0 > > > The {{Path}} class is currently mutable to support the {{IOReadableWritable}} > serialization. Since that serialization is not used any more, I suggest to > drop that interface from Path and make the Path's URI final. > Being immutable, we can store configures paths properly without the chance of > them being mutated as side effects. > Many parts of the code make the assumption that the Path is immutable, being > susceptible to subtle errors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-32578) Cascaded group by window time columns on a proctime window aggregate may result hang for ever
[ https://issues.apache.org/jira/browse/FLINK-32578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee closed FLINK-32578. --- > Cascaded group by window time columns on a proctime window aggregate may > result hang for ever > - > > Key: FLINK-32578 > URL: https://issues.apache.org/jira/browse/FLINK-32578 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.17.1 >Reporter: lincoln lee >Assignee: lincoln lee >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0, 1.17.2 > > > Currently when group by window time columns on a proctime window aggregate > result will get a wrong plan which may result hang for ever in runtime. > For such a query: > {code} > insert into s1 > SELECT > window_start, > window_end, > sum(cnt), > count(*) > FROM ( > SELECT > a, > b, > window_start, > window_end, > count(*) as cnt, > sum(d) as sum_d, > max(d) as max_d > FROM TABLE(TUMBLE(TABLE src1, DESCRIPTOR(proctime), INTERVAL '5' MINUTE)) > GROUP BY a, window_start, window_end, b > ) > GROUP BY a, window_start, window_end > {code} > the inner proctime window works fine, but the outer one doesn't work due to a > wrong plan which will translate to a unexpected event mode window operator: > {code} > Sink(table=[default_catalog.default_database.s1], fields=[ws, we, b, c]) > +- Calc(select=[CAST(window_start AS TIMESTAMP(6)) AS ws, CAST(window_end AS > TIMESTAMP(6)) AS we, CAST(EXPR$2 AS BIGINT) AS b, CAST(EXPR$3 AS BIGINT) AS > c]) >+- WindowAggregate(groupBy=[a], window=[TUMBLE(win_start=[window_start], > win_end=[window_end], size=[5 min])], select=[a, SUM(cnt) AS EXPR$2, COUNT(*) > AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end]) > +- Exchange(distribution=[hash[a]]) > +- Calc(select=[a, window_start, window_end, cnt]) > +- WindowAggregate(groupBy=[a, b], > window=[TUMBLE(time_col=[proctime], size=[5 min])], select=[a, b, COUNT(*) AS > cnt, start('w$) AS window_start, end('w$) AS window_end]) >+- Exchange(distribution=[hash[a, b]]) > +- Calc(select=[a, b, d, PROCTIME() AS proctime]) > +- TableSourceScan(table=[[default_catalog, > default_database, src1, project=[a, b, d], metadata=[]]], fields=[a, b, d]) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32578) Cascaded group by window time columns on a proctime window aggregate may result hang for ever
[ https://issues.apache.org/jira/browse/FLINK-32578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744847#comment-17744847 ] lincoln lee commented on FLINK-32578: - Fixed in 1.17: dc8b70c2fcbb429a27a9cc1e263d9a38c2d7da34 > Cascaded group by window time columns on a proctime window aggregate may > result hang for ever > - > > Key: FLINK-32578 > URL: https://issues.apache.org/jira/browse/FLINK-32578 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.17.1 >Reporter: lincoln lee >Assignee: lincoln lee >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0, 1.17.2 > > > Currently when group by window time columns on a proctime window aggregate > result will get a wrong plan which may result hang for ever in runtime. > For such a query: > {code} > insert into s1 > SELECT > window_start, > window_end, > sum(cnt), > count(*) > FROM ( > SELECT > a, > b, > window_start, > window_end, > count(*) as cnt, > sum(d) as sum_d, > max(d) as max_d > FROM TABLE(TUMBLE(TABLE src1, DESCRIPTOR(proctime), INTERVAL '5' MINUTE)) > GROUP BY a, window_start, window_end, b > ) > GROUP BY a, window_start, window_end > {code} > the inner proctime window works fine, but the outer one doesn't work due to a > wrong plan which will translate to a unexpected event mode window operator: > {code} > Sink(table=[default_catalog.default_database.s1], fields=[ws, we, b, c]) > +- Calc(select=[CAST(window_start AS TIMESTAMP(6)) AS ws, CAST(window_end AS > TIMESTAMP(6)) AS we, CAST(EXPR$2 AS BIGINT) AS b, CAST(EXPR$3 AS BIGINT) AS > c]) >+- WindowAggregate(groupBy=[a], window=[TUMBLE(win_start=[window_start], > win_end=[window_end], size=[5 min])], select=[a, SUM(cnt) AS EXPR$2, COUNT(*) > AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end]) > +- Exchange(distribution=[hash[a]]) > +- Calc(select=[a, window_start, window_end, cnt]) > +- WindowAggregate(groupBy=[a, b], > window=[TUMBLE(time_col=[proctime], size=[5 min])], select=[a, b, COUNT(*) AS > cnt, start('w$) AS window_start, end('w$) AS window_end]) >+- Exchange(distribution=[hash[a, b]]) > +- Calc(select=[a, b, d, PROCTIME() AS proctime]) > +- TableSourceScan(table=[[default_catalog, > default_database, src1, project=[a, b, d], metadata=[]]], fields=[a, b, d]) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #23030: fix(sec): upgrade com.google.guava:guava to 32.0.0-jre
flinkbot commented on PR #23030: URL: https://github.com/apache/flink/pull/23030#issuecomment-1643008141 ## CI report: * 4c9b2396007652e405eb3fcd537afac0fd0ab1e5 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lincoln-lil merged pull request #23022: [FLINK-32578][table-planner] Fix wrong plan which group by window time columns on a proctime window operator may result hang for ever
lincoln-lil merged PR #23022: URL: https://github.com/apache/flink/pull/23022 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-pulsar] syhily commented on pull request #55: [FLINK-24302] Extend offheap memory for JDK11 test coverage
syhily commented on PR #55: URL: https://github.com/apache/flink-connector-pulsar/pull/55#issuecomment-1642986077 Cool, I'm so happy to see that we finally have the JDK 11 test support. The extensive off heap memory usage in Pulsar client could be the main cause of the OOM in JDK 11. Should we mention it in our document if the user want to use this connector in JDK 11 or above? @reswqa @tisonkun -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] hackergin commented on pull request #22937: [FLINK-32428][table] Introduce base interfaces for CatalogStore
hackergin commented on PR #22937: URL: https://github.com/apache/flink/pull/22937#issuecomment-1642970805 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-32633) Kubernetes e2e test is not stable
Fang Yong created FLINK-32633: - Summary: Kubernetes e2e test is not stable Key: FLINK-32633 URL: https://issues.apache.org/jira/browse/FLINK-32633 Project: Flink Issue Type: Technical Debt Components: Deployment / Kubernetes, Kubernetes Operator Affects Versions: 1.18.0 Reporter: Fang Yong The output file is: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51444=logs=bea52777-eaf8-5663-8482-18fbc3630e81=43ba8ce7-ebbf-57cd-9163-444305d74117 Jul 19 17:06:02 Stopping minikube ... Jul 19 17:06:02 * Stopping node "minikube" ... Jul 19 17:06:13 * 1 node stopped. Jul 19 17:06:13 [FAIL] Test script contains errors. Jul 19 17:06:13 Checking for errors... Jul 19 17:06:13 No errors in log files. Jul 19 17:06:13 Checking for exceptions... Jul 19 17:06:13 No exceptions in log files. Jul 19 17:06:13 Checking for non-empty .out files... grep: /home/vsts/work/_temp/debug_files/flink-logs/*.out: No such file or directory Jul 19 17:06:13 No non-empty .out files. Jul 19 17:06:13 Jul 19 17:06:13 [FAIL] 'Run Kubernetes test' failed after 4 minutes and 28 seconds! Test exited with exit code 1 Jul 19 17:06:13 17:06:13 ##[group]Environment Information Jul 19 17:06:13 Jps -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32632) Run Kubernetes test is unstable on AZP
Sergey Nuyanzin created FLINK-32632: --- Summary: Run Kubernetes test is unstable on AZP Key: FLINK-32632 URL: https://issues.apache.org/jira/browse/FLINK-32632 Project: Flink Issue Type: Bug Affects Versions: 1.18.0 Reporter: Sergey Nuyanzin This test https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51447=logs=bea52777-eaf8-5663-8482-18fbc3630e81=43ba8ce7-ebbf-57cd-9163-444305d74117=6213 fails with {noformat} 2023-07-19T17:14:49.8144730Z Jul 19 17:14:49 deployment.apps/flink-task-manager created 2023-07-19T17:15:03.7983703Z Jul 19 17:15:03 job.batch/flink-job-cluster condition met 2023-07-19T17:15:04.0937620Z error: Internal error occurred: error executing command in container: http: invalid Host header 2023-07-19T17:15:04.0988752Z sort: cannot read: '/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-11919909188/out/kubernetes_wc_out*': No such file or directory 2023-07-19T17:15:04.1017388Z Jul 19 17:15:04 FAIL WordCount: Output hash mismatch. Got d41d8cd98f00b204e9800998ecf8427e, expected e682ec6622b5e83f2eb614617d5ab2cf. {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31958) Table to DataStream allow partial fields
[ https://issues.apache.org/jira/browse/FLINK-31958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744768#comment-17744768 ] padavan commented on FLINK-31958: - Perhaps it should be moved to the bugs category? Or is it by design? > Table to DataStream allow partial fields > > > Key: FLINK-31958 > URL: https://issues.apache.org/jira/browse/FLINK-31958 > Project: Flink > Issue Type: Improvement > Components: API / DataStream, Table SQL / API >Reporter: padavan >Priority: Major > > Hello i have a Model with many many fields, example: > {code:java} > public class UserModel { public int userId; public int count; public int zip; > public LocalDateTime dt; public LocalDateTime wStart; public LocalDateTime > wEnd; }{code} > I work with Table API, select fields and convert Table to DataStream by > Model. But problem what *i should select all fields if I don't even need it* > or i will get exception > {quote}Column types of query result and sink for do not match. Cause: > Different number of columns. > {quote} > And I just have to substitute fake data for the plugs... > > I want simple use with only fields wich i have selected like: > {code:java} > .select($("userId"), $("count").sum().as("count")); > DataStream dataStream = te.toDataStream(win, > UserModel.class);{code} > > *Excepted:* > Remove rule valdiation "Different number of columns.". If a column is not > selected it is initialized by default(T) / Null -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-pulsar] reswqa commented on a diff in pull request #55: [FLINK-24302] Extend offheap memory for JDK11 test coverage
reswqa commented on code in PR #55: URL: https://github.com/apache/flink-connector-pulsar/pull/55#discussion_r1268492288 ## flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerUtils.java: ## @@ -35,9 +35,11 @@ public class FlinkContainerUtils { public static Configuration flinkConfiguration() { Configuration configuration = new Configuration(); -// Increase the jvm metaspace memory to avoid java.lang.OutOfMemoryError: Metaspace -configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(2048)); +configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(2560)); +configuration.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, MemorySize.ofMebiBytes(512)); configuration.set(TaskManagerOptions.JVM_METASPACE, MemorySize.ofMebiBytes(512)); + +// Increase the jvm metaspace memory to avoid java.lang.OutOfMemoryError: Metaspace configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(2560)); configuration.set(JobManagerOptions.JVM_METASPACE, MemorySize.ofMebiBytes(1024)); Review Comment: Is this more appropriate(perhaps)? But it doesn't matter if we don't make any change. ## flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerUtils.java: ## @@ -35,9 +35,11 @@ public class FlinkContainerUtils { public static Configuration flinkConfiguration() { Configuration configuration = new Configuration(); -// Increase the jvm metaspace memory to avoid java.lang.OutOfMemoryError: Metaspace -configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(2048)); +configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(2560)); +configuration.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, MemorySize.ofMebiBytes(512)); configuration.set(TaskManagerOptions.JVM_METASPACE, MemorySize.ofMebiBytes(512)); + +// Increase the jvm metaspace memory to avoid java.lang.OutOfMemoryError: Metaspace configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(2560)); configuration.set(JobManagerOptions.JVM_METASPACE, MemorySize.ofMebiBytes(1024)); Review Comment: Is this more appropriate(perhaps)? But it doesn't matter if we don't make any change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-pulsar] reswqa commented on a diff in pull request #55: [FLINK-24302] Extend offheap memory for JDK11 test coverage
reswqa commented on code in PR #55: URL: https://github.com/apache/flink-connector-pulsar/pull/55#discussion_r1268484338 ## flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerUtils.java: ## @@ -35,9 +35,11 @@ public class FlinkContainerUtils { public static Configuration flinkConfiguration() { Configuration configuration = new Configuration(); -// Increase the jvm metaspace memory to avoid java.lang.OutOfMemoryError: Metaspace -configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(2048)); +configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(2560)); +configuration.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, MemorySize.ofMebiBytes(512)); configuration.set(TaskManagerOptions.JVM_METASPACE, MemorySize.ofMebiBytes(512)); + +// Increase the jvm metaspace memory to avoid java.lang.OutOfMemoryError: Metaspace configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(2560)); configuration.set(JobManagerOptions.JVM_METASPACE, MemorySize.ofMebiBytes(1024)); Review Comment: ```suggestion configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(2560)); configuration.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, MemorySize.ofMebiBytes(512)); // Increase the jvm metaspace memory to avoid java.lang.OutOfMemoryError: Metaspace configuration.set(TaskManagerOptions.JVM_METASPACE, MemorySize.ofMebiBytes(512)); configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(2560)); // Increase the jvm metaspace memory to avoid java.lang.OutOfMemoryError: Metaspace configuration.set(JobManagerOptions.JVM_METASPACE, MemorySize.ofMebiBytes(1024)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-web] lindong28 commented on pull request #664: Allow content to expire
lindong28 commented on PR #664: URL: https://github.com/apache/flink-web/pull/664#issuecomment-1642427268 @MartijnVisser Thanks for the comments. flink-ml docs is built by this script https://github.com/apache/infrastructure-bb2/blob/master/flink-ml.py. This script is executed every day by a build bot whose status can be found by searching "flink ml" at https://ci2.apache.org/#/builders. If you are also not sure where to find/update `.htaccess` for https://nightlies.apache.org/flink, do you know who might know the answer? If none of us know, maybe I should create a JIRA for the Apache infra team. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32631) FlinkSessionJob stuck in Created/Reconciling state because of No Job found error in JobManager
[ https://issues.apache.org/jira/browse/FLINK-32631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bhupendra Yadav updated FLINK-32631: Description: {*}Background{*}: We are using FlinkSessionJob for submitting jobs to a session cluster and flink kubernetes operator 1.5.0. {*}Bug{*}: We frequently encounter a problem where the job gets stuck in CREATED/RECONCILING state. On checking flink operator logs we see the error {_}Job could not be found{_}. Full trace [here|https://ideone.com/NuAyEK]. # When a Flink session job is submitted, the Flink operator submits the job to the Flink Cluster. # If the Flink job manager (JM) restarts for some reason, the job may no longer exist in the JM. # Upon reconciliation, the Flink operator queries the JM's REST API for the job using its jobID, but it receives a 404 error, indicating that the job is not found. # The operator then encounters an error and logs it, leading to the job getting stuck in an indefinite state. # Attempting to restart or suspend the job using the operator's provided mechanisms also fails because the operator keeps calling the REST API and receiving the same 404 error. {*}Expected Behavior{*}: Ideally, when the Flink operator reconciles a job and finds that it no longer exists in the Flink Cluster, it should handle the situation gracefully. Instead of getting stuck and logging errors indefinitely, the operator should mark the job as failed or deleted, or set an appropriate status for it. was: {*}Background{*}: We are using FlinkSessionJob for submitting jobs to a session cluster. {*}Bug{*}: We frequently encounter a problem where the job gets stuck in CREATED/RECONCILING state. On checking flink operator logs we see the error {_}Job could not be found{_}. Full trace [here|https://ideone.com/NuAyEK]. # When a Flink session job is submitted, the Flink operator submits the job to the Flink Cluster. # If the Flink job manager (JM) restarts for some reason, the job may no longer exist in the JM. # Upon reconciliation, the Flink operator queries the JM's REST API for the job using its jobID, but it receives a 404 error, indicating that the job is not found. # The operator then encounters an error and logs it, leading to the job getting stuck in an indefinite state. # Attempting to restart or suspend the job using the operator's provided mechanisms also fails because the operator keeps calling the REST API and receiving the same 404 error. {*}Expected Behavior{*}: Ideally, when the Flink operator reconciles a job and finds that it no longer exists in the Flink Cluster, it should handle the situation gracefully. Instead of getting stuck and logging errors indefinitely, the operator should mark the job as failed or deleted, or set an appropriate status for it. > FlinkSessionJob stuck in Created/Reconciling state because of No Job found > error in JobManager > -- > > Key: FLINK-32631 > URL: https://issues.apache.org/jira/browse/FLINK-32631 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: 1.16.0 > Environment: Local >Reporter: Bhupendra Yadav >Priority: Major > > {*}Background{*}: We are using FlinkSessionJob for submitting jobs to a > session cluster and flink kubernetes operator 1.5.0. > {*}Bug{*}: We frequently encounter a problem where the job gets stuck in > CREATED/RECONCILING state. On checking flink operator logs we see the error > {_}Job could not be found{_}. Full trace [here|https://ideone.com/NuAyEK]. > # When a Flink session job is submitted, the Flink operator submits the job > to the Flink Cluster. > # If the Flink job manager (JM) restarts for some reason, the job may no > longer exist in the JM. > # Upon reconciliation, the Flink operator queries the JM's REST API for the > job using its jobID, but it receives a 404 error, indicating that the job is > not found. > # The operator then encounters an error and logs it, leading to the job > getting stuck in an indefinite state. > # Attempting to restart or suspend the job using the operator's provided > mechanisms also fails because the operator keeps calling the REST API and > receiving the same 404 error. > {*}Expected Behavior{*}: Ideally, when the Flink operator reconciles a job > and finds that it no longer exists in the Flink Cluster, it should handle the > situation gracefully. Instead of getting stuck and logging errors > indefinitely, the operator should mark the job as failed or deleted, or set > an appropriate status for it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32631) FlinkSessionJob stuck in Created/Reconciling state because of No Job found error in JobManager
Bhupendra Yadav created FLINK-32631: --- Summary: FlinkSessionJob stuck in Created/Reconciling state because of No Job found error in JobManager Key: FLINK-32631 URL: https://issues.apache.org/jira/browse/FLINK-32631 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: 1.16.0 Environment: Local Reporter: Bhupendra Yadav {*}Background{*}: We are using FlinkSessionJob for submitting jobs to a session cluster. {*}Bug{*}: We frequently encounter a problem where the job gets stuck in CREATED/RECONCILING state. On checking flink operator logs we see the error {_}Job could not be found{_}. Full trace [here|https://ideone.com/NuAyEK]. # When a Flink session job is submitted, the Flink operator submits the job to the Flink Cluster. # If the Flink job manager (JM) restarts for some reason, the job may no longer exist in the JM. # Upon reconciliation, the Flink operator queries the JM's REST API for the job using its jobID, but it receives a 404 error, indicating that the job is not found. # The operator then encounters an error and logs it, leading to the job getting stuck in an indefinite state. # Attempting to restart or suspend the job using the operator's provided mechanisms also fails because the operator keeps calling the REST API and receiving the same 404 error. {*}Expected Behavior{*}: Ideally, when the Flink operator reconciles a job and finds that it no longer exists in the Flink Cluster, it should handle the situation gracefully. Instead of getting stuck and logging errors indefinitely, the operator should mark the job as failed or deleted, or set an appropriate status for it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #23029: [FLINK-32132][table-planner] Cast function CODEGEN does not work as e…
flinkbot commented on PR #23029: URL: https://github.com/apache/flink/pull/23029#issuecomment-1642417243 ## CI report: * b1967c4704f16360d638e3778509a2b2840c1b2e UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #23028: [FLINK-32132][table-planner] Cast function CODEGEN does not work as e…
flinkbot commented on PR #23028: URL: https://github.com/apache/flink/pull/23028#issuecomment-1642407541 ## CI report: * 939c4fb6b8bb3f8e47d62fda3d8f01f8f69c65b6 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-pulsar] tisonkun commented on a diff in pull request #55: [FLINK-24302] Extend offheap memory for JDK11 test coverage
tisonkun commented on code in PR #55: URL: https://github.com/apache/flink-connector-pulsar/pull/55#discussion_r1268324359 ## flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerUtils.java: ## @@ -35,9 +35,11 @@ public class FlinkContainerUtils { public static Configuration flinkConfiguration() { Configuration configuration = new Configuration(); -// Increase the jvm metaspace memory to avoid java.lang.OutOfMemoryError: Metaspace -configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(2048)); +configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(2560)); +configuration.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, MemorySize.ofMebiBytes(512)); configuration.set(TaskManagerOptions.JVM_METASPACE, MemorySize.ofMebiBytes(512)); + +// Increase the jvm metaspace memory to avoid java.lang.OutOfMemoryError: Metaspace Review Comment: Please directly comment a suggestion or patch. I'm not sure where to move in your mind, -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-pulsar] tisonkun commented on a diff in pull request #55: [FLINK-24302] Extend offheap memory for JDK11 test coverage
tisonkun commented on code in PR #55: URL: https://github.com/apache/flink-connector-pulsar/pull/55#discussion_r1268323229 ## flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/MiniClusterTestEnvironment.java: ## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.common; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.testframe.environment.ClusterControllable; +import org.apache.flink.connector.testframe.environment.TestEnvironment; +import org.apache.flink.connector.testframe.environment.TestEnvironmentSettings; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.TestStreamEnvironment; +import org.apache.flink.test.util.MiniClusterWithClientResource; + +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import static org.apache.flink.configuration.MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL; +import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.METRIC_FETCHER_UPDATE_INTERVAL_MS; +import static org.apache.flink.runtime.jobgraph.SavepointConfigOptions.SAVEPOINT_PATH; + +/** Test environment for running jobs on Flink mini-cluster. */ +@Experimental +public class MiniClusterTestEnvironment implements TestEnvironment, ClusterControllable { Review Comment: Yes and when we drop the support at least test coverage for 1.17. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-32469) Improve checkpoint REST APIs for programmatic access
[ https://issues.apache.org/jira/browse/FLINK-32469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer resolved FLINK-32469. --- Resolution: Done > Improve checkpoint REST APIs for programmatic access > > > Key: FLINK-32469 > URL: https://issues.apache.org/jira/browse/FLINK-32469 > Project: Flink > Issue Type: Improvement > Components: Runtime / REST >Affects Versions: 1.16.2, 1.17.1 >Reporter: Hong Liang Teoh >Assignee: Hong Liang Teoh >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > *Why* > We want to enable programmatic use of the checkpoints REST API, independent > of the Flink dashboard. > Currently, REST APIs that retrieve information relating to a given Flink job > passes through the {{{}ExecutionGraphCache{}}}. This means that all these > APIs will retrieve stale data depending on the {{{}web.refresh-interval{}}}, > which defaults to 3s. For programmatic use of the REST API, we should be able > to retrieve the latest / cached version depending on the client (Flink > dashboard gets the cached version, other clients get the updated version). > For example, a user might want to use the REST API to retrieve the latest > completed checkpoint for a given Flink job. This might be useful when trying > to use existing checkpoints as state store when migrating a Flink job from > one cluster to another. See Appendix for example. > *What* > This change is about separating out the cache used for the checkpoints REST > APIs to a separate cache. This way, a user can set the timeout for the > checkpoints cache to 0s (disable cache), without causing much effect on the > user experience on the Flink dashboard. > In addition, the checkpoint handlers first retrieve the > {{{}ExecutionGraph{}}}, then retrieve the {{CheckpointStatsSnapshot}} from > the graph. This is not needed, since the checkpoint handlers only need the > {{CheckpointStatsSnapshot.}} This change will mean these handlers retrieve > the minimal required information ({{{}CheckpointStatsSnapshot){}}} to > construct a reply. > > *Example use case* > When performing security patching / maintenance of the infrastructure > supporting the Flink cluster, we might want to transfer a given Flink job to > another cluster, whilst maintaining state. We can do this via the below steps: > # Old cluster - Select completed checkpoint on existing Flink job > # Old cluster - Stop the existing Flink job > # New cluster - Start a new Flink job with selected checkpoint > Step 1 requires us to query the checkpoints REST API for the latest completed > checkpoint. With the status quo, we need to wait 3s (or whatever the > ExecutionGraphCache expiry may be). This is undesirable because this means > the Flink job will have to reprocess data equivalent to 3s / whatever the > execution graph cache timeout is. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32469) Improve checkpoint REST APIs for programmatic access
[ https://issues.apache.org/jira/browse/FLINK-32469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744702#comment-17744702 ] Danny Cranmer commented on FLINK-32469: --- Merged commit [{{7b9b4e5}}|https://github.com/apache/flink/commit/7b9b4e53a59ab8f4f2a99a6e162a794d264f7daf] into master > Improve checkpoint REST APIs for programmatic access > > > Key: FLINK-32469 > URL: https://issues.apache.org/jira/browse/FLINK-32469 > Project: Flink > Issue Type: Improvement > Components: Runtime / REST >Affects Versions: 1.16.2, 1.17.1 >Reporter: Hong Liang Teoh >Assignee: Hong Liang Teoh >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > *Why* > We want to enable programmatic use of the checkpoints REST API, independent > of the Flink dashboard. > Currently, REST APIs that retrieve information relating to a given Flink job > passes through the {{{}ExecutionGraphCache{}}}. This means that all these > APIs will retrieve stale data depending on the {{{}web.refresh-interval{}}}, > which defaults to 3s. For programmatic use of the REST API, we should be able > to retrieve the latest / cached version depending on the client (Flink > dashboard gets the cached version, other clients get the updated version). > For example, a user might want to use the REST API to retrieve the latest > completed checkpoint for a given Flink job. This might be useful when trying > to use existing checkpoints as state store when migrating a Flink job from > one cluster to another. See Appendix for example. > *What* > This change is about separating out the cache used for the checkpoints REST > APIs to a separate cache. This way, a user can set the timeout for the > checkpoints cache to 0s (disable cache), without causing much effect on the > user experience on the Flink dashboard. > In addition, the checkpoint handlers first retrieve the > {{{}ExecutionGraph{}}}, then retrieve the {{CheckpointStatsSnapshot}} from > the graph. This is not needed, since the checkpoint handlers only need the > {{CheckpointStatsSnapshot.}} This change will mean these handlers retrieve > the minimal required information ({{{}CheckpointStatsSnapshot){}}} to > construct a reply. > > *Example use case* > When performing security patching / maintenance of the infrastructure > supporting the Flink cluster, we might want to transfer a given Flink job to > another cluster, whilst maintaining state. We can do this via the below steps: > # Old cluster - Select completed checkpoint on existing Flink job > # Old cluster - Stop the existing Flink job > # New cluster - Start a new Flink job with selected checkpoint > Step 1 requires us to query the checkpoints REST API for the latest completed > checkpoint. With the status quo, we need to wait 3s (or whatever the > ExecutionGraphCache expiry may be). This is undesirable because this means > the Flink job will have to reprocess data equivalent to 3s / whatever the > execution graph cache timeout is. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #23027: [FLINK-32616][jdbc-driver] Close result for non-query in executeQuery
flinkbot commented on PR #23027: URL: https://github.com/apache/flink/pull/23027#issuecomment-1642372445 ## CI report: * 55e9c628f6d3e2de3c0a00342ab6f86c78eca10b UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lsyldliu commented on pull request #23012: [FLINK-32610][json] JSON format supports nested type projection pushdown
lsyldliu commented on PR #23012: URL: https://github.com/apache/flink/pull/23012#issuecomment-1642369509 @wuchong Thanks for review, I've updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32616) FlinkStatement#executeQuery resource leaks when the input sql is not query
[ https://issues.apache.org/jira/browse/FLINK-32616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32616: --- Labels: pull-request-available (was: ) > FlinkStatement#executeQuery resource leaks when the input sql is not query > -- > > Key: FLINK-32616 > URL: https://issues.apache.org/jira/browse/FLINK-32616 > Project: Flink > Issue Type: Bug > Components: Table SQL / JDBC >Affects Versions: 1.18.0 >Reporter: Shengkai Fang >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > The current implementation just throw the exception if the input sql is not > query. No one is responsible to close the StatementResult. > > BTW, the current implementation just submit the sql to the gateway, which > means the sql is executed. I just wonder do we need to expose this features > to the users? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] FangYongs opened a new pull request, #23027: [FLINK-32616][jdbc-driver] Close result for non-query in executeQuery
FangYongs opened a new pull request, #23027: URL: https://github.com/apache/flink/pull/23027 ## What is the purpose of the change This pr aims to close statement result in FlinkStatement.executeQuery when the statement is not a query sql. ## Brief change log - Close statement result when the sql is not query for FlinkStatement.executeQuery ## Verifying this change This change added tests and can be verified as follows: - Added FlinkStatementTest.testCloseNonQuery to validate the statement result is closed. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) no - The serializers: (yes / no / don't know) no - The runtime per-record code paths (performance sensitive): (yes / no / don't know) no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) no - The S3 file system connector: (yes / no / don't know) no ## Documentation - Does this pull request introduce a new feature? (yes / no) no - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lsyldliu commented on a diff in pull request #23012: [FLINK-32610][json] JSON format supports nested type projection pushdown
lsyldliu commented on code in PR #23012: URL: https://github.com/apache/flink/pull/23012#discussion_r1268292637 ## flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java: ## @@ -724,7 +786,7 @@ private void testParseErrors(TestSpec spec) { TestSpec.json("{\"id\":\"abc\"}") .rowType(ROW(FIELD("id", INT( .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"abc\"}'."), -TestSpec.json("{\"id\":112.013}") +TestSpec.json("{\"id\":112}") Review Comment: Yes, the `JsonParser` can't parse double type to bigint. `JsonParser` logic: ![image](https://github.com/apache/flink/assets/17698589/112771b9-380d-46d6-bc77-3278db6526ed) `JsonNode` logic: ![image](https://github.com/apache/flink/assets/17698589/19b1a839-6863-440b-95de-74b2ccd3baf0) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #627: [FLINK-32551] Add option to take a savepoint on flinkdeployment/flinksessionjob deletion
gyfora commented on PR #627: URL: https://github.com/apache/flink-kubernetes-operator/pull/627#issuecomment-1642349101 I am closing this based on feedback from @JTaky (Oleksandr Nitavskyi) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] JTaky commented on pull request #634: [FLINK-32551] Add option to take a savepoint when deleting a flinkdeployment/flinksessionjob
JTaky commented on PR #634: URL: https://github.com/apache/flink-kubernetes-operator/pull/634#issuecomment-1642264925 > Are you working with @ashangit ? Just curious because he also has an open PR and the JIRA is on him yes, he is OOO currently, I am continuing his development. Would value feedback -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-web] MartijnVisser commented on pull request #664: Allow content to expire
MartijnVisser commented on PR #664: URL: https://github.com/apache/flink-web/pull/664#issuecomment-1642260699 > Do you have time to review this PR? Sure. I do think that there's a different issue. This `.htaccess` file is only used on https://flink.apache.org project website, but not for the documentation that's build on https://nightlies.apache.org/flink. I don't immediately see a workflow for building the flink-ml docs: where is that done? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] hackergin commented on a diff in pull request #22939: [FLINK-32474][table] Support time travel in table planner
hackergin commented on code in PR #22939: URL: https://github.com/apache/flink/pull/22939#discussion_r1268198538 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java: ## @@ -125,4 +162,118 @@ public void validateColumnListParams( // this makes it possible to ignore them in the validator and fall back to regular row types // see also SqlFunction#deriveType } + +@Override +protected void registerNamespace( +@Nullable SqlValidatorScope usingScope, +@Nullable String alias, +SqlValidatorNamespace ns, +boolean forceNullable) { + +// Generate a new validator namespace for time travel scenario. +// Time travel only supports constant expressions, so we need to investigate scenarios +// where the period of Snapshot is a SqlIdentifier. +Optional timeTravelNode = getTimeTravelNode(ns); +if (usingScope != null +&& timeTravelNode.isPresent() +&& !(timeTravelNode.get().getPeriod() instanceof SqlIdentifier)) { +SqlSnapshot sqlSnapshot = timeTravelNode.get(); +SqlNode periodNode = sqlSnapshot.getPeriod(); +SqlToRelConverter sqlToRelConverter = this.createSqlToRelConverter(); +RexNode rexNode = sqlToRelConverter.convertExpression(periodNode); +RexNode simplifiedRexNode = +FlinkRexUtil.simplify( +sqlToRelConverter.getRexBuilder(), +rexNode, +relOptCluster.getPlanner().getExecutor()); +List reducedNodes = new ArrayList<>(); +relOptCluster +.getPlanner() +.getExecutor() +.reduce( +relOptCluster.getRexBuilder(), +Collections.singletonList(simplifiedRexNode), +reducedNodes); +// check whether period is the unsupported expression +if (!(reducedNodes.get(0) instanceof RexLiteral)) { +throw new UnsupportedOperationException( +String.format( +"Unsupported time travel expression: %s for the expression can not be reduced to a constant by Flink.", +periodNode)); +} + +RexLiteral rexLiteral = (RexLiteral) (reducedNodes).get(0); +TimestampString timestampString = rexLiteral.getValueAs(TimestampString.class); +checkNotNull( +timestampString, +"The time travel expression %s can not reduce to a valid timestamp string. This is a bug. Please file an issue.", +periodNode); + +TableConfig tableConfig = ShortcutUtils.unwrapContext(relOptCluster).getTableConfig(); +ZoneId zoneId = tableConfig.getLocalTimeZone(); +long timeTravelTimestamp = + TimestampData.fromEpochMillis(timestampString.getMillisSinceEpoch()) +.toLocalDateTime() +.atZone(zoneId) +.toInstant() +.toEpochMilli(); + +SchemaVersion schemaVersion = TimestampSchemaVersion.of(timeTravelTimestamp); +IdentifierNamespace identifierNamespace = (IdentifierNamespace) ns; +IdentifierNamespace snapshotNameSpace = +new IdentifierSnapshotNamespace( +identifierNamespace, +schemaVersion, +((DelegatingScope) usingScope).getParent()); +ns = snapshotNameSpace; + +sqlSnapshot.setOperand( Review Comment: Since we have reduce the period expression here, we can just build a simple SqlLiteral. And we don't need to reduce again when convertting to RelNode. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] hackergin commented on a diff in pull request #22939: [FLINK-32474][table] Support time travel in table planner
hackergin commented on code in PR #22939: URL: https://github.com/apache/flink/pull/22939#discussion_r1268195677 ## flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java: ## @@ -2927,7 +2948,37 @@ private void convertTemporalTable(Blackboard bb, SqlCall call) { // convert inner query, could be a table name or a derived table SqlNode expr = snapshot.getTableRef(); -convertFrom(bb, expr); +SqlNode tableRef = snapshot.getTableRef(); +// Since we have simplified the SqlSnapshot in the validate phase, we only need to check Review Comment: I mean reduce the period expression. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-32615) Cache AutoscalerInfo for each resource
[ https://issues.apache.org/jira/browse/FLINK-32615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-32615. -- Fix Version/s: kubernetes-operator-1.6.0 Resolution: Fixed merged to main 0c341ebe13645f4e9802cfd780c5b50f59e29363 > Cache AutoscalerInfo for each resource > -- > > Key: FLINK-32615 > URL: https://issues.apache.org/jira/browse/FLINK-32615 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Major > Fix For: kubernetes-operator-1.6.0 > > > Currently we always get the autoscaler info configmap through the Kubernetes > rest api. This is a very heavy and unnecessary operation as the lifecycle is > directly tied to the resource. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] gyfora merged pull request #633: [FLINK-32589] Carry over parallelism overrides between spec changes
gyfora merged PR #633: URL: https://github.com/apache/flink-kubernetes-operator/pull/633 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-32589) Carry over parallelism overrides to prevent users from clearing them on updates
[ https://issues.apache.org/jira/browse/FLINK-32589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-32589. -- Fix Version/s: kubernetes-operator-1.6.0 Resolution: Fixed merged to main 9fe68251eb1a37333a6e862bf7b048061396498c > Carry over parallelism overrides to prevent users from clearing them on > updates > --- > > Key: FLINK-32589 > URL: https://issues.apache.org/jira/browse/FLINK-32589 > Project: Flink > Issue Type: Improvement > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Gyula Fora >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.6.0 > > > The autoscaler currently sets the parallelism overrides via the Flink config > {{pipeline.jobvertex-parallelism-overrides}}. Whenever the user posts specs > updates, special care needs to be taken in order to carry over existing > overrides. Otherwise the job will reset to the default parallelism > configuration. Users shouldn't have to deal with this. Instead, whenever a > new spec is posted which does not contain the overrides, the operator should > automatically apply the last-used overrides (if autoscaling is enabled). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] RanJinh commented on a diff in pull request #23000: [FLINK-32594][runtime] Use blocking ResultPartitionType if operator only outputs records on EOF
RanJinh commented on code in PR #23000: URL: https://github.com/apache/flink/pull/23000#discussion_r1268184648 ## flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java: ## @@ -27,4 +28,16 @@ * method) interfaces that can be implemented via Java 8 lambdas. */ @Public -public interface Function extends java.io.Serializable {} +public interface Function extends java.io.Serializable { Review Comment: Related change is in this reply: https://github.com/apache/flink/pull/23000#discussion_r1267862410 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-19249) Detect broken connections in case TCP Timeout takes too long.
[ https://issues.apache.org/jira/browse/FLINK-19249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744645#comment-17744645 ] Piotr Nowojski commented on FLINK-19249: Hi [~Jiangang], no sorry there was no progress on that issue. > Detect broken connections in case TCP Timeout takes too long. > - > > Key: FLINK-19249 > URL: https://issues.apache.org/jira/browse/FLINK-19249 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: Congxian Qiu >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor > > {quote}encountered this error on 1.7, after going through the master code, I > think the problem is still there > {quote} > When the network environment is not so good, the connection between the > server and the client may be disconnected innocently. After the > disconnection, the server will receive the IOException such as below > {code:java} > java.io.IOException: Connection timed out > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:51) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:468) > at > org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:403) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:367) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:639) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) > at java.lang.Thread.run(Thread.java:748) > {code} > then release the view reader. > But the job would not fail until the downstream detect the disconnection > because of {{channelInactive}} later(~10 min). between such time, the job can > still process data, but the broken channel can't transfer any data or event, > so snapshot would fail during this time. this will cause the job to replay > many data after failover. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] RanJinh commented on a diff in pull request #23000: [FLINK-32594][runtime] Use blocking ResultPartitionType if operator only outputs records on EOF
RanJinh commented on code in PR #23000: URL: https://github.com/apache/flink/pull/23000#discussion_r1268169556 ## flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java: ## @@ -27,4 +28,16 @@ * method) interfaces that can be implemented via Java 8 lambdas. */ @Public -public interface Function extends java.io.Serializable {} +public interface Function extends java.io.Serializable { +/** Returns true iff the operator can only emit records after inputs have reached EOF. */ +@Internal +default boolean isOutputEOF() { Review Comment: It's more reasonable to overwrite `OperatorAttribute` in an operator, now you can see the new usage in `StreamingJobGraphGeneratorTest#testOverwriteOperatorAttributesPartitionTypes`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] mxm commented on a diff in pull request #633: [FLINK-32589] Carry over parallelism overrides between spec changes
mxm commented on code in PR #633: URL: https://github.com/apache/flink-kubernetes-operator/pull/633#discussion_r1268149481 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobAutoScaler.java: ## @@ -17,15 +17,19 @@ package org.apache.flink.kubernetes.operator.reconciler.deployment; -import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; +import java.util.Map; + /** Per-job Autoscaler instance. */ public interface JobAutoScaler { /** Called as part of the reconciliation loop. Returns true if this call led to scaling. */ -boolean scale(FlinkResourceContext> ctx); +boolean scale(FlinkResourceContext ctx); /** Called when the custom resource is deleted. */ -void cleanup(AbstractFlinkResource cr); +void cleanup(FlinkResourceContext ctx); + +/** Get the current parallelism overrides for the job. */ Review Comment: I think I prefer this being part of the autoscaling code but we can refactor at a later point in time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] huwh commented on pull request #22861: [FLINK-32387][runtime] InputGateDeploymentDescriptor uses cache to avoid deserializing shuffle descriptors multiple times
huwh commented on PR #22861: URL: https://github.com/apache/flink/pull/22861#issuecomment-1642164847 @wanglijie95 I squash the commits and rebase to master branch. PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] twalthr commented on a diff in pull request #22593: [FLINK-32053][table-planner] Introduce StateMetadata to ExecNode to support configure operator-level state TTL via CompiledPlan
twalthr commented on code in PR #22593: URL: https://github.com/apache/flink/pull/22593#discussion_r1268129439 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java: ## @@ -71,20 +75,36 @@ producedTransformations = StreamExecChangelogNormalize.CHANGELOG_NORMALIZE_TRANSFORMATION, minPlanVersion = FlinkVersion.v1_15, minStateVersion = FlinkVersion.v1_15) +@ExecNodeMetadata( +name = "stream-exec-changelog-normalize", +version = 2, Review Comment: @godfreyhe @luoyuxia @LadyForest If the testing infrastructure would be already in place, I would agree with increasing the version. But we are not testing different versions of ExecNode yet. So increasing the annotation has no effect. I just noticed that you bumped the ExecNode version of a couple of ExecNodes for the new state metadata properties in CompiledPlan. Your change actually allows having null for the stateMetadataList , I'm wondering if we should not increase the version for 1.18. We can stay at the old ExecNode version as 1.18 is able to consume 1.17 plans and state. The reason why noticed that is due to FLINK-32613 where I added an option to the wrong annotation during rebase and no test failed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] twalthr commented on a diff in pull request #22593: [FLINK-32053][table-planner] Introduce StateMetadata to ExecNode to support configure operator-level state TTL via CompiledPlan
twalthr commented on code in PR #22593: URL: https://github.com/apache/flink/pull/22593#discussion_r1268129439 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java: ## @@ -71,20 +75,36 @@ producedTransformations = StreamExecChangelogNormalize.CHANGELOG_NORMALIZE_TRANSFORMATION, minPlanVersion = FlinkVersion.v1_15, minStateVersion = FlinkVersion.v1_15) +@ExecNodeMetadata( +name = "stream-exec-changelog-normalize", +version = 2, Review Comment: @godfreyhe @luoyuxia @LadyForest If the testing infrastructure would be already in place, I would agree with increasing the version. But we are not testing different versions of ExecNode yet. So increasing the annotation has not effect. I just noticed that you bumped the ExecNode version of a couple of ExecNodes for the new state metadata properties in CompiledPlan. Your change actually allows having null for the stateMetadataList , I'm wondering if we should not increase the version for 1.18. We can stay at the old ExecNode version as 1.18 is able to consume 1.17 plans and state. The reason why noticed that is due to FLINK-32613 where I added an option to the wrong annotation during rebase and no test failed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] twalthr commented on a diff in pull request #22593: [FLINK-32053][table-planner] Introduce StateMetadata to ExecNode to support configure operator-level state TTL via CompiledPlan
twalthr commented on code in PR #22593: URL: https://github.com/apache/flink/pull/22593#discussion_r1268129439 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java: ## @@ -71,20 +75,36 @@ producedTransformations = StreamExecChangelogNormalize.CHANGELOG_NORMALIZE_TRANSFORMATION, minPlanVersion = FlinkVersion.v1_15, minStateVersion = FlinkVersion.v1_15) +@ExecNodeMetadata( +name = "stream-exec-changelog-normalize", +version = 2, Review Comment: @luoyuxia @LadyForest If the testing infrastructure would be already in place, I would agree with increasing the version. But we are not testing different versions of ExecNode yet. So increasing the annotation has not effect. I just noticed that you bumped the ExecNode version of a couple of ExecNodes for the new state metadata properties in CompiledPlan. Your change actually allows having null for the stateMetadataList , I'm wondering if we should not increase the version for 1.18. We can stay at the old ExecNode version as 1.18 is able to consume 1.17 plans and state. The reason why noticed that is due to FLINK-32613 where I added an option to the wrong annotation during rebase and no test failed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-pulsar] reswqa commented on a diff in pull request #55: [FLINK-24302] Extend offheap memory for JDK11 test coverage
reswqa commented on code in PR #55: URL: https://github.com/apache/flink-connector-pulsar/pull/55#discussion_r1268105952 ## flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/MiniClusterTestEnvironment.java: ## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.common; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.testframe.environment.ClusterControllable; +import org.apache.flink.connector.testframe.environment.TestEnvironment; +import org.apache.flink.connector.testframe.environment.TestEnvironmentSettings; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.TestStreamEnvironment; +import org.apache.flink.test.util.MiniClusterWithClientResource; + +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import static org.apache.flink.configuration.MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL; +import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.METRIC_FETCHER_UPDATE_INTERVAL_MS; +import static org.apache.flink.runtime.jobgraph.SavepointConfigOptions.SAVEPOINT_PATH; + +/** Test environment for running jobs on Flink mini-cluster. */ +@Experimental +public class MiniClusterTestEnvironment implements TestEnvironment, ClusterControllable { Review Comment: IIUC, we will remove this after `FLINK-1.18` released? ## flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerUtils.java: ## @@ -35,9 +35,11 @@ public class FlinkContainerUtils { public static Configuration flinkConfiguration() { Configuration configuration = new Configuration(); -// Increase the jvm metaspace memory to avoid java.lang.OutOfMemoryError: Metaspace -configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(2048)); +configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(2560)); +configuration.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, MemorySize.ofMebiBytes(512)); configuration.set(TaskManagerOptions.JVM_METASPACE, MemorySize.ofMebiBytes(512)); + +// Increase the jvm metaspace memory to avoid java.lang.OutOfMemoryError: Metaspace Review Comment: Should this line of comment be moved forward a bit? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] fredia commented on pull request #22772: [FLINK-19010][metric] Introduce subtask level restore metric
fredia commented on PR #22772: URL: https://github.com/apache/flink/pull/22772#issuecomment-1642089197 @rkhachatryan Thanks for the review, I have addressed your comments. It could be very nice if you could take another look :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] fredia commented on a diff in pull request #22772: [FLINK-19010][metric] Introduce subtask level restore metric
fredia commented on code in PR #22772: URL: https://github.com/apache/flink/pull/22772#discussion_r1268077347 ## docs/content/docs/ops/metrics.md: ## @@ -1343,6 +1343,11 @@ Note that for failed checkpoints, metrics are updated on a best efforts basis an The time in nanoseconds that elapsed between the creation of the last checkpoint and the time when the checkpointing process has started by this Task. This delay shows how long it takes for the first checkpoint barrier to reach the task. A high value indicates back-pressure. If only a specific task has a long start delay, the most likely reason is data skew. Gauge + + checkpointRestoreTime + The time in milliseconds that one task spends on restoring/initialization, return 0 when the task is not in initialization/running status. + Counter Review Comment: I renamed `checkpointRestoreTime` to `initializationTime` in subtask level, and moveed it to [`IO` section](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#io). For `Availability` section, I think it's a metric that describes job level status, we can reuse `initializingTime/initializingTotalTime`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] fredia commented on a diff in pull request #22772: [FLINK-19010][metric] Introduce subtask level restore metric
fredia commented on code in PR #22772: URL: https://github.com/apache/flink/pull/22772#discussion_r1268060700 ## flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java: ## @@ -128,6 +134,29 @@ public TaskIOMetricGroup(TaskMetricGroup parent) { this.mailboxLatency = histogram(MetricNames.MAILBOX_LATENCY, new DescriptiveStatisticsHistogram(60)); this.mailboxSize = gauge(MetricNames.MAILBOX_SIZE, new SizeGauge()); +this.initializationDuration = +counter( +CHECKPOINT_RESTORE_TIME, +new Counter() { +@Override +public void inc() {} + +@Override +public void inc(long n) {} + +@Override +public void dec() {} + +@Override +public void dec(long n) {} + +@Override +public long getCount() { +return getTaskInitializationDuration(); +} +}); +this.taskStartTime = INVALID_TIMESTAMP; Review Comment: Agreed, I checked `taskStartTime` in `getAccumulatedBusyTime`, if `taskStartTime<0`, return `Double.NaN`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] fredia commented on a diff in pull request #22772: [FLINK-19010][metric] Introduce subtask level restore metric
fredia commented on code in PR #22772: URL: https://github.com/apache/flink/pull/22772#discussion_r1268058951 ## flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java: ## @@ -95,6 +95,15 @@ void testTaskIOMetricGroup() throws InterruptedException { .isGreaterThanOrEqualTo(softSleepTime); assertThat(taskIO.getHardBackPressuredTimePerSecond().getCount()) .isGreaterThanOrEqualTo(hardSleepTime); + +// test initializing time +assertThat(taskIO.getTaskInitializationDuration()).isEqualTo(0L); +taskIO.markTaskInitializationStarted(); +Thread.sleep(1000); +assertThat(taskIO.getTaskInitializationDuration()).isGreaterThan(0L); Review Comment: Good point, I introduced `org.apache.flink.util.clock.Clock` into `TaskIOMetricGroup` to avoid `sleep()`. ## flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java: ## @@ -95,6 +95,15 @@ void testTaskIOMetricGroup() throws InterruptedException { .isGreaterThanOrEqualTo(softSleepTime); assertThat(taskIO.getHardBackPressuredTimePerSecond().getCount()) .isGreaterThanOrEqualTo(hardSleepTime); + +// test initializing time +assertThat(taskIO.getTaskInitializationDuration()).isEqualTo(0L); +taskIO.markTaskInitializationStarted(); +Thread.sleep(1000); +assertThat(taskIO.getTaskInitializationDuration()).isGreaterThan(0L); Review Comment: Good point, I introduced `org.apache.flink.util.clock.Clock` into `TaskIOMetricGroup` to avoid `sleep()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] JTaky opened a new pull request, #634: [FLINK-32551] Add option to take a savepoint when deleting a flinkdeployment/flinksessionjob
JTaky opened a new pull request, #634: URL: https://github.com/apache/flink-kubernetes-operator/pull/634 ## What is the purpose of the change Add an operator option to make savepoint on FlinkDeployment/FlinkSessionJob deletion. Default behaviour is not changed and no savepoint are being made. ## Brief change log - Adds a new operator configuration - Inject the Operator config to all reconcilers upon creation - Factorise cleanUp job interface to unify Upgrade mode determination upon cleanup - Added unit test to the new cleanup behaviour ## Verifying this change - Additional unit tests - Manual tests on K8s ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changes to the `CustomResourceDescriptors`: yes, Adds a new operator configuration - Core observer or reconciler logic that is regularly executed: yes ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs are generated from the config -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32619) ConfigOptions to support fallback configuration
[ https://issues.apache.org/jira/browse/FLINK-32619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744606#comment-17744606 ] Hong Liang Teoh commented on FLINK-32619: - That's a great callout [~wangm92] . Will use that instead and close this JIRA > ConfigOptions to support fallback configuration > --- > > Key: FLINK-32619 > URL: https://issues.apache.org/jira/browse/FLINK-32619 > Project: Flink > Issue Type: Technical Debt > Components: Runtime / Configuration >Affects Versions: 1.16.2, 1.17.1 >Reporter: Hong Liang Teoh >Priority: Minor > > ConfigOptions has no option to specify a "fallback configuration" as the > default. > > For example, if we want {{rest.cache.checkpoint-statistics.timeout}} to > fallback to web.refresh-interval instead of a static default value, we have > to specify > > {code:java} > @Documentation.OverrideDefault("web.refresh-interval") > @Documentation.Section(Documentation.Sections.EXPERT_REST) > public static final ConfigOption > CACHE_CHECKPOINT_STATISTICS_TIMEOUT = > key("rest.cache.checkpoint-statistics.timeout") > .durationType() > .noDefaultValue() > .withDescription( > ""); > {code} > > > The {{.noDefault()}} is misleading as it actually has a default. > > We should introduce a {{.fallbackConfiguration()}} that is handled gracefully > by doc generators. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-32619) ConfigOptions to support fallback configuration
[ https://issues.apache.org/jira/browse/FLINK-32619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh resolved FLINK-32619. - Resolution: Not A Problem > ConfigOptions to support fallback configuration > --- > > Key: FLINK-32619 > URL: https://issues.apache.org/jira/browse/FLINK-32619 > Project: Flink > Issue Type: Technical Debt > Components: Runtime / Configuration >Affects Versions: 1.16.2, 1.17.1 >Reporter: Hong Liang Teoh >Priority: Minor > > ConfigOptions has no option to specify a "fallback configuration" as the > default. > > For example, if we want {{rest.cache.checkpoint-statistics.timeout}} to > fallback to web.refresh-interval instead of a static default value, we have > to specify > > {code:java} > @Documentation.OverrideDefault("web.refresh-interval") > @Documentation.Section(Documentation.Sections.EXPERT_REST) > public static final ConfigOption > CACHE_CHECKPOINT_STATISTICS_TIMEOUT = > key("rest.cache.checkpoint-statistics.timeout") > .durationType() > .noDefaultValue() > .withDescription( > ""); > {code} > > > The {{.noDefault()}} is misleading as it actually has a default. > > We should introduce a {{.fallbackConfiguration()}} that is handled gracefully > by doc generators. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-32630) The log level of job failed info should change from INFO to WARN/ERROR if job failed
[ https://issues.apache.org/jira/browse/FLINK-32630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zili Chen reassigned FLINK-32630: - Assignee: Matt Wang > The log level of job failed info should change from INFO to WARN/ERROR if job > failed > > > Key: FLINK-32630 > URL: https://issues.apache.org/jira/browse/FLINK-32630 > Project: Flink > Issue Type: Improvement > Components: Client / Job Submission, Runtime / Coordination >Affects Versions: 1.17.1 >Reporter: Matt Wang >Assignee: Matt Wang >Priority: Minor > > When a job fails to submit or run, the following log level should be changed > to WARN or ERROR, INFO will confuse users > {code:java} > 2023-07-14 20:05:26,863 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job > flink_test_job (08eefd50) switched from state FAILING > to FAILED. > org.apache.flink.runtime.JobException: Recovery is suppressed by > FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100) > > 2023-07-14 20:05:26,889 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job > 08eefd50 reached terminal state FAILED. > org.apache.flink.runtime.JobException: Recovery is suppressed by > FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100) > 2023-07-14 20:05:26,956 INFO > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap > [] - Application FAILED: > java.util.concurrent.CompletionException: > org.apache.flink.client.deployment.application.ApplicationExecutionException: > Could not execute application.{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32630) The log level of job failed info should change from INFO to WARN/ERROR if job failed
[ https://issues.apache.org/jira/browse/FLINK-32630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744605#comment-17744605 ] Zili Chen commented on FLINK-32630: --- You can ping me on a patch ready. > The log level of job failed info should change from INFO to WARN/ERROR if job > failed > > > Key: FLINK-32630 > URL: https://issues.apache.org/jira/browse/FLINK-32630 > Project: Flink > Issue Type: Improvement > Components: Client / Job Submission, Runtime / Coordination >Affects Versions: 1.17.1 >Reporter: Matt Wang >Assignee: Matt Wang >Priority: Minor > > When a job fails to submit or run, the following log level should be changed > to WARN or ERROR, INFO will confuse users > {code:java} > 2023-07-14 20:05:26,863 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job > flink_test_job (08eefd50) switched from state FAILING > to FAILED. > org.apache.flink.runtime.JobException: Recovery is suppressed by > FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100) > > 2023-07-14 20:05:26,889 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job > 08eefd50 reached terminal state FAILED. > org.apache.flink.runtime.JobException: Recovery is suppressed by > FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100) > 2023-07-14 20:05:26,956 INFO > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap > [] - Application FAILED: > java.util.concurrent.CompletionException: > org.apache.flink.client.deployment.application.ApplicationExecutionException: > Could not execute application.{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-24302) Direct buffer memory leak on Pulsar connector with Java 11
[ https://issues.apache.org/jira/browse/FLINK-24302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744604#comment-17744604 ] Zili Chen commented on FLINK-24302: --- Pending to close. This should be an issue on the Pulsar side. Little thing we can do only from the connector side. > Direct buffer memory leak on Pulsar connector with Java 11 > -- > > Key: FLINK-24302 > URL: https://issues.apache.org/jira/browse/FLINK-24302 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.14.0 >Reporter: Yufan Sheng >Priority: Major > Labels: pull-request-available, test-stability > Fix For: pulsar-4.0.1 > > > Running the Pulsar connector with multiple split readers on Java 11 could > throw {{a java.lang.OutOfMemoryError exception}}. > {code:java} > Caused by: java.util.concurrent.CompletionException: > org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector$RetryException: > Could not complete the operation. Number of retries has been exhausted. > Failed reason: java.lang.OutOfMemoryError: Direct buffer memory > at > java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) > at > java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) > at > java.base/java.util.concurrent.CompletableFuture$OrApply.tryFire(CompletableFuture.java:1503) > ... 42 more > Caused by: > org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector$RetryException: > Could not complete the operation. Number of retries has been exhausted. > Failed reason: java.lang.OutOfMemoryError: Direct buffer memory > at > org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$retryOperation$4(AsyncHttpConnector.java:249) > ... 39 more > Caused by: org.apache.pulsar.shade.io.netty.handler.codec.EncoderException: > java.lang.OutOfMemoryError: Direct buffer memory > at > org.apache.pulsar.shade.io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:104) > at > org.apache.pulsar.shade.io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:346) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702) > at > org.apache.pulsar.shade.io.netty.handler.stream.ChunkedWriteHandler.doFlush(ChunkedWriteHandler.java:303) > at > org.apache.pulsar.shade.io.netty.handler.stream.ChunkedWriteHandler.flush(ChunkedWriteHandler.java:132) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758) > at > org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1020) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:311) > at > org.apache.pulsar.shade.org.asynchttpclient.netty.request.NettyRequestSender.writeRequest(NettyRequestSender.java:420) > ... 23 more > {code} > The reason is that under Java 11, the Netty will allocate memory from the > pool of Java Direct Memory and is affected by the MaxDirectMemory limit. > Under Java 8, it allocates native memory and is not affected by that setting. > We have to reduce the direct memory usage by using a newer Pulsar client > which has a memory-limits configuration. > This issue is addressed on Pulsar, and > [PIP-74|https://github.com/apache/pulsar/wiki/PIP-74%3A-Pulsar-client-memory-limits] > has been created for resolving this issue. > We should keep this issue open with no resolved versions until Pulsar > provides a new client with memory limits. > h2. Update: 2022/08/04 > The memory limit on consumer API has been released > https://github.com/apache/pulsar/pull/15216, we need to add > autoScaledReceiverQueueSizeEnabled option to enable this feature.
[jira] [Commented] (FLINK-24302) Direct buffer memory leak on Pulsar connector with Java 11
[ https://issues.apache.org/jira/browse/FLINK-24302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744603#comment-17744603 ] Zili Chen commented on FLINK-24302: --- Workaround to turn on tests for JDK11 in https://github.com/apache/flink-connector-pulsar/pull/55. > Direct buffer memory leak on Pulsar connector with Java 11 > -- > > Key: FLINK-24302 > URL: https://issues.apache.org/jira/browse/FLINK-24302 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.14.0 >Reporter: Yufan Sheng >Priority: Major > Labels: pull-request-available, test-stability > Fix For: pulsar-4.0.1 > > > Running the Pulsar connector with multiple split readers on Java 11 could > throw {{a java.lang.OutOfMemoryError exception}}. > {code:java} > Caused by: java.util.concurrent.CompletionException: > org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector$RetryException: > Could not complete the operation. Number of retries has been exhausted. > Failed reason: java.lang.OutOfMemoryError: Direct buffer memory > at > java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) > at > java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) > at > java.base/java.util.concurrent.CompletableFuture$OrApply.tryFire(CompletableFuture.java:1503) > ... 42 more > Caused by: > org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector$RetryException: > Could not complete the operation. Number of retries has been exhausted. > Failed reason: java.lang.OutOfMemoryError: Direct buffer memory > at > org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$retryOperation$4(AsyncHttpConnector.java:249) > ... 39 more > Caused by: org.apache.pulsar.shade.io.netty.handler.codec.EncoderException: > java.lang.OutOfMemoryError: Direct buffer memory > at > org.apache.pulsar.shade.io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:104) > at > org.apache.pulsar.shade.io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:346) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702) > at > org.apache.pulsar.shade.io.netty.handler.stream.ChunkedWriteHandler.doFlush(ChunkedWriteHandler.java:303) > at > org.apache.pulsar.shade.io.netty.handler.stream.ChunkedWriteHandler.flush(ChunkedWriteHandler.java:132) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758) > at > org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1020) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:311) > at > org.apache.pulsar.shade.org.asynchttpclient.netty.request.NettyRequestSender.writeRequest(NettyRequestSender.java:420) > ... 23 more > {code} > The reason is that under Java 11, the Netty will allocate memory from the > pool of Java Direct Memory and is affected by the MaxDirectMemory limit. > Under Java 8, it allocates native memory and is not affected by that setting. > We have to reduce the direct memory usage by using a newer Pulsar client > which has a memory-limits configuration. > This issue is addressed on Pulsar, and > [PIP-74|https://github.com/apache/pulsar/wiki/PIP-74%3A-Pulsar-client-memory-limits] > has been created for resolving this issue. > We should keep this issue open with no resolved versions until Pulsar > provides a new client with memory limits. > h2. Update: 2022/08/04 > The memory limit on consumer API has been released > https://github.com/apache/pulsar/pull/15216, we need to add > autoScaledReceiverQueueSizeEnabled option to enable this feature. This memory >
[jira] [Updated] (FLINK-32630) The log level of job failed info should change from INFO to WARN/ERROR if job failed
[ https://issues.apache.org/jira/browse/FLINK-32630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matt Wang updated FLINK-32630: -- Summary: The log level of job failed info should change from INFO to WARN/ERROR if job failed (was: The log level of job failed info should change from info to warn/error if job failed) > The log level of job failed info should change from INFO to WARN/ERROR if job > failed > > > Key: FLINK-32630 > URL: https://issues.apache.org/jira/browse/FLINK-32630 > Project: Flink > Issue Type: Improvement > Components: Client / Job Submission, Runtime / Coordination >Affects Versions: 1.17.1 >Reporter: Matt Wang >Priority: Minor > > When a job fails to submit or run, the following log level should be changed > to WARN or ERROR, INFO will confuse users > {code:java} > 2023-07-14 20:05:26,863 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job > flink_test_job (08eefd50) switched from state FAILING > to FAILED. > org.apache.flink.runtime.JobException: Recovery is suppressed by > FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100) > > 2023-07-14 20:05:26,889 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job > 08eefd50 reached terminal state FAILED. > org.apache.flink.runtime.JobException: Recovery is suppressed by > FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100) > 2023-07-14 20:05:26,956 INFO > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap > [] - Application FAILED: > java.util.concurrent.CompletionException: > org.apache.flink.client.deployment.application.ApplicationExecutionException: > Could not execute application.{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32630) The log level of job failed info should change from info to warn/error if job failed
[ https://issues.apache.org/jira/browse/FLINK-32630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matt Wang updated FLINK-32630: -- Summary: The log level of job failed info should change from info to warn/error if job failed (was: The log level should change from info to warn/error if job failed) > The log level of job failed info should change from info to warn/error if job > failed > > > Key: FLINK-32630 > URL: https://issues.apache.org/jira/browse/FLINK-32630 > Project: Flink > Issue Type: Improvement > Components: Client / Job Submission, Runtime / Coordination >Affects Versions: 1.17.1 >Reporter: Matt Wang >Priority: Minor > > When a job fails to submit or run, the following log level should be changed > to WARN or ERROR, INFO will confuse users > {code:java} > 2023-07-14 20:05:26,863 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job > flink_test_job (08eefd50) switched from state FAILING > to FAILED. > org.apache.flink.runtime.JobException: Recovery is suppressed by > FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100) > > 2023-07-14 20:05:26,889 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job > 08eefd50 reached terminal state FAILED. > org.apache.flink.runtime.JobException: Recovery is suppressed by > FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100) > 2023-07-14 20:05:26,956 INFO > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap > [] - Application FAILED: > java.util.concurrent.CompletionException: > org.apache.flink.client.deployment.application.ApplicationExecutionException: > Could not execute application.{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32630) The log level should change from info to warn/error if job failed
[ https://issues.apache.org/jira/browse/FLINK-32630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744584#comment-17744584 ] Matt Wang commented on FLINK-32630: --- hi, [~tison] [~Weijie Guo] I think this is a point that can be optimized, can you guys assign this Jira to me > The log level should change from info to warn/error if job failed > - > > Key: FLINK-32630 > URL: https://issues.apache.org/jira/browse/FLINK-32630 > Project: Flink > Issue Type: Improvement > Components: Client / Job Submission, Runtime / Coordination >Affects Versions: 1.17.1 >Reporter: Matt Wang >Priority: Minor > > When a job fails to submit or run, the following log level should be changed > to WARN or ERROR, INFO will confuse users > {code:java} > 2023-07-14 20:05:26,863 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job > flink_test_job (08eefd50) switched from state FAILING > to FAILED. > org.apache.flink.runtime.JobException: Recovery is suppressed by > FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100) > > 2023-07-14 20:05:26,889 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job > 08eefd50 reached terminal state FAILED. > org.apache.flink.runtime.JobException: Recovery is suppressed by > FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100) > 2023-07-14 20:05:26,956 INFO > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap > [] - Application FAILED: > java.util.concurrent.CompletionException: > org.apache.flink.client.deployment.application.ApplicationExecutionException: > Could not execute application.{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-32586) Enable input locality in SimpleExecutionSlotAllocator
[ https://issues.apache.org/jira/browse/FLINK-32586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu closed FLINK-32586. --- Resolution: Done Done via e732edb41a423f19d5eefc397ddbfacadaf0179e > Enable input locality in SimpleExecutionSlotAllocator > - > > Key: FLINK-32586 > URL: https://issues.apache.org/jira/browse/FLINK-32586 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.18.0 >Reporter: xingbe >Assignee: xingbe >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > At present, the AdaptiveBatchScheduler uses the > `SimpleExecutionSlotAllocator` to assign slot to execution, but it currently > lacks support for the capability of input locality, which may increase > unnecessary data transmission overhead. In this issue, we aim to enable the > `SimpleExecutionSlotAllocator` to support the input locality. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32630) The log level should change from info to warn/error if job failed
[ https://issues.apache.org/jira/browse/FLINK-32630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matt Wang updated FLINK-32630: -- Description: When a job fails to submit or run, the following log level should be changed to WARN or ERROR, INFO will confuse users {code:java} 2023-07-14 20:05:26,863 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job flink_test_job (08eefd50) switched from state FAILING to FAILED. org.apache.flink.runtime.JobException: Recovery is suppressed by FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100) 2023-07-14 20:05:26,889 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 08eefd50 reached terminal state FAILED. org.apache.flink.runtime.JobException: Recovery is suppressed by FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100) 2023-07-14 20:05:26,956 INFO org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap [] - Application FAILED: java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.{code} was: When a job fails to submit or run, the following log level should be changed to WARN or ERROR, INFO will confuse users {code:java} 2023-07-14 20:05:26,863 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job flink_test_job (08eefd50) switched from state FAILING to FAILED. org.apache.flink.runtime.JobException: Recovery is suppressed by FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100){code} {code:java} {code} > The log level should change from info to warn/error if job failed > - > > Key: FLINK-32630 > URL: https://issues.apache.org/jira/browse/FLINK-32630 > Project: Flink > Issue Type: Improvement > Components: Client / Job Submission, Runtime / Coordination >Affects Versions: 1.17.1 >Reporter: Matt Wang >Priority: Minor > > When a job fails to submit or run, the following log level should be changed > to WARN or ERROR, INFO will confuse users > {code:java} > 2023-07-14 20:05:26,863 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job > flink_test_job (08eefd50) switched from state FAILING > to FAILED. > org.apache.flink.runtime.JobException: Recovery is suppressed by > FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100) > > 2023-07-14 20:05:26,889 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job > 08eefd50 reached terminal state FAILED. > org.apache.flink.runtime.JobException: Recovery is suppressed by > FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100) > 2023-07-14 20:05:26,956 INFO > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap > [] - Application FAILED: > java.util.concurrent.CompletionException: > org.apache.flink.client.deployment.application.ApplicationExecutionException: > Could not execute application.{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] zhuzhurk closed pull request #23009: [FLINK-32586][coordination] Enable input locality in SimpleExecutionSlotAllocator.
zhuzhurk closed pull request #23009: [FLINK-32586][coordination] Enable input locality in SimpleExecutionSlotAllocator. URL: https://github.com/apache/flink/pull/23009 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-32630) The log level should change from info to warn/error if job failed
Matt Wang created FLINK-32630: - Summary: The log level should change from info to warn/error if job failed Key: FLINK-32630 URL: https://issues.apache.org/jira/browse/FLINK-32630 Project: Flink Issue Type: Improvement Components: Client / Job Submission, Runtime / Coordination Affects Versions: 1.17.1 Reporter: Matt Wang When a job fails to submit or run, the following log level should be changed to WARN or ERROR, INFO will confuse users {code:java} 2023-07-14 20:05:26,863 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job flink_test_job (08eefd50) switched from state FAILING to FAILED. org.apache.flink.runtime.JobException: Recovery is suppressed by FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=240,backoffTimeMS=2,maxFailuresPerInterval=100){code} {code:java} {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-32552) Mixed up Flink session job deployments
[ https://issues.apache.org/jira/browse/FLINK-32552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabio Wanner resolved FLINK-32552. -- Release Note: Not a bug of the flink k8s operator. Resolution: Not A Bug > Mixed up Flink session job deployments > -- > > Key: FLINK-32552 > URL: https://issues.apache.org/jira/browse/FLINK-32552 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Fabio Wanner >Priority: Major > > *Context* > In the scope of end-to-end tests we deploy all the Flink session jobs we have > regularly in a staging environment. Some of the jobs are bundled together in > one helm chart and therefore deployed at the same time. There are around 40 > individual Flink jobs (running on the same Flink session cluster). The > session cluster is individual for each e2e test run. The problems described > below happen scarcely (1 in ~ 50 run maybe). > *Problem* > Rarely the operator seems to "mix up" the deployments. This can be seen in > the Flink cluster logs as multiple {{Received JobGraph submission ' NAME>' ()}} logs are created from jobs with the same job_id. This > results in errors such as: > {{DuplicateJobSubmissionException}} or {{ClassNotFoundException.}} > It' also visible in the FlinkSessionJob resource: status.jobStatus.jobName > does not match the expected job name of the job being deployed (The job name > is passed to the application via argument). > So far we were unable to reliably reproduce the error. > *Details* > The following lines show the status of 3 jobs form the view point of the > Flink cluster dashboard, and the FlinkSessionJob ressource: > > *aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615* > Apache Flink Dashboard: > * State: Restarting > * ID: a7d36f3881f943a2 > * Exceptions: Cannot load user class: > aelps.pipelines.aletsch.smc.SMCUrlMapper > FlinkSessionJob Ressource: > * State: RUNNING > * jobId: a1221c743367497b0002 > * uid: a1221c74-3367-497b-ad2f-8793ab23919d > > *aletsch_mat_e5730831db8092adb12f5189c4c895ef3a268615* > Apache Flink Dashboard: > * State: - > * ID: - > FlinkSessionJob Ressource: > * State: UPGRADING > * jobId: - > * uid: a7d36f38-81f9-43a0-898f-19b950430e9d > Flink K8s Operator: > * Exceptions: DuplicateJobSubmissionException: Job has already been > submitted. > > *aletsch_wp_wafer_e5730831db8092adb12f5189c4c895ef3a268615* > Apache Flink Dashboard: > * State: Running > * ID: e692c2dfaa18441c0002 > * Exceptions: - > FlinkSessionJob Ressource: > * State: RUNNING > * jobId: e692c2dfaa18441c0002 > * uid: e692c2df-aa18-441c-a352-88aefa9a3017 > As we can see the *aletsch_smc* job is presumably running according to the > FlinkSessionJob resource, but crash-looping in the cluster and it has the > jobID matching the uid of the resource of {*}aletsch_mat{*}. While > *aletsch_mat* is not even running. The following logs also show some > suspicious entries: There are several {{Received JobGraph submission}} from > different jobs with the same jobID. > > *Logs* > The logs are filtered by the 3 jobIds from above. > > JobID: a7d36f3881f943a2 > {code:bash} > Flink Cluster > ... > 023-07-06 10:23:50,552 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job > aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 > (a7d36f3881f943a2) switched from state RUNNING to RESTARTING. > 2023-07-06 10:23:50 file: > '/tmp/tm_10.0.11.159:6122-e9fadc/blobStorage/job_a7d36f3881f943a2/blob_p-40c7a30adef8868254191d2cf2dbc4cb7ab46f0d-8a02a0583d91c5e8e6c94f378aa444c2' > (valid JAR) > 2023-07-06 10:23:50,522 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager > [] - Received resource requirements from job > a7d36f3881f943a2: > [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, > numberOfRequiredSlots=4}] > 2023-07-06 10:23:50,522 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager > [] - Received resource requirements from job > a7d36f3881f943a2: > [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, > numberOfRequiredSlots=3}] > 2023-07-06 10:23:50,522 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager > [] - Received resource requirements from job > a7d36f3881f943a2: > [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, > numberOfRequiredSlots=2}] > 2023-07-06 10:23:50,522 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager > [] - Received resource requirements from job > a7d36f3881f943a2: >
[GitHub] [flink] pvary commented on a diff in pull request #22694: [FLINK-32223][runtime][security] Add Hive delegation token support
pvary commented on code in PR #22694: URL: https://github.com/apache/flink/pull/22694#discussion_r1267947674 ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenProvider.java: ## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.security.token; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.security.token.DelegationTokenProvider; +import org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenConverter; +import org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider; +import org.apache.flink.runtime.util.HadoopUtils; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.time.Clock; +import java.util.Optional; + +import static org.apache.flink.runtime.hadoop.HadoopUserUtils.getIssueDate; + +/** Delegation token provider for HiveServer2. */ +@Internal +public class HiveServer2DelegationTokenProvider implements DelegationTokenProvider { + +private static final Logger LOG = +LoggerFactory.getLogger(HiveServer2DelegationTokenProvider.class); + +org.apache.hadoop.conf.Configuration hiveConf; + +private KerberosLoginProvider kerberosLoginProvider; + +private Long tokenRenewalInterval; + +@Override +public String serviceName() { +return "HiveServer2"; +} + +@Override +public void init(Configuration configuration) throws Exception { +hiveConf = getHiveConfiguration(configuration); +kerberosLoginProvider = new KerberosLoginProvider(configuration); +} + +private org.apache.hadoop.conf.Configuration getHiveConfiguration(Configuration conf) { +try { +org.apache.hadoop.conf.Configuration hadoopConf = +HadoopUtils.getHadoopConfiguration(conf); +hiveConf = new HiveConf(hadoopConf, HiveConf.class); +} catch (Exception | NoClassDefFoundError e) { +LOG.warn("Fail to create HiveServer2 Configuration", e); +} +return hiveConf; +} + +@Override +public boolean delegationTokensRequired() throws Exception { +/** + * The general rule how a provider/receiver must behave is the following: The provider and + * the receiver must be added to the classpath together with all the additionally required + * dependencies. + * + * This null check is required because the HiveServer2 provider is always on classpath + * but Hive jars are optional. Such case configuration is not able to be loaded. This + * construct is intended to be removed when HiveServer2 provider/receiver pair can be + * externalized (namely if a provider/receiver throws an exception then workload must be + * stopped). + */ +if (hiveConf == null) { +LOG.debug( +"HiveServer2 is not available (not packaged with this application), hence no " ++ "hiveServer2 tokens will be acquired."); +return false; +} +try { +if (!HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser())) { +LOG.debug( +"Hadoop Kerberos is not enabled,hence no hiveServer2 tokens will be acquired."); +return false; +} +} catch (IOException e) { +LOG.debug( +"Hadoop Kerberos is
[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #633: [FLINK-32589] Carry over parallelism overrides between spec changes
gyfora commented on PR #633: URL: https://github.com/apache/flink-kubernetes-operator/pull/633#issuecomment-1641872870 @mxm let me know if you have further comments -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-32623) Rest api doesn't return minimum resource requirements correctly
[ https://issues.apache.org/jira/browse/FLINK-32623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-32623. Resolution: Fixed master: 6b3d291f6a573fb34a528313e5683d3a48a66771 > Rest api doesn't return minimum resource requirements correctly > --- > > Key: FLINK-32623 > URL: https://issues.apache.org/jira/browse/FLINK-32623 > Project: Flink > Issue Type: Bug > Components: Runtime / REST >Affects Versions: 1.18.0 >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > The resource requirements returned by the rest api always return a hardcoded > 1 lower bound for each vertex. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] zentol merged pull request #23014: [FLINK-32623] Return correct vertex resource lower bound
zentol merged PR #23014: URL: https://github.com/apache/flink/pull/23014 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] RanJinh commented on a diff in pull request #23000: [FLINK-32594][runtime] Use blocking ResultPartitionType if operator only outputs records on EOF
RanJinh commented on code in PR #23000: URL: https://github.com/apache/flink/pull/23000#discussion_r1267888031 ## flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java: ## @@ -637,4 +637,16 @@ public int hashCode() { result = 31 * result + (int) (bufferTimeout ^ (bufferTimeout >>> 32)); return result; } + Review Comment: Yeah, you are right. I will modify this, and we should also update the FLIP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32592) (Stream)ExEnv#initializeContextEnvironment isn't thread-safe
[ https://issues.apache.org/jira/browse/FLINK-32592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744529#comment-17744529 ] Chesnay Schepler commented on FLINK-32592: -- master: 13d35365f677813d5f0090f121e14e8bdec646d1 1.17: TBD 1.16: TBD > (Stream)ExEnv#initializeContextEnvironment isn't thread-safe > > > Key: FLINK-32592 > URL: https://issues.apache.org/jira/browse/FLINK-32592 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.15.4, 1.18.0, 1.17.1 >Reporter: Fabio Wanner >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > > *Context* > We are using the flink-k8s-operator to deploy multiple jobs (up to 32) to a > single session cluster. The job submissions done by the operator happen > concurrently, basically at the same time. > Operator version: 1.5.0 > Flink version: 1.15.4, 1.7.1, 1.18 (master@f37d41cf) > *Problem* > Rarely (~once every 50 deployments) one of the jobs will not be executed. In > the following incident 4 jobs are deployed at the same time: > * gorner-task-staging-e5730831 > * gorner-facility-staging-e5730831 > * gorner-aepp-staging-e5730831 > * gorner-session-staging-e5730831 > > The operator submits the job, they all get a reasonable jobID: > {code:java} > 2023-07-14 10:25:35,295 o.a.f.k.o.s.AbstractFlinkService [INFO > ][aelps-staging/gorner-task-staging-e5730831] Submitting job: > 4968b186061e44390002 to session cluster. > 2023-07-14 10:25:35,297 o.a.f.k.o.s.AbstractFlinkService [INFO > ][aelps-staging/gorner-facility-staging-e5730831] Submitting job: > 91a5260d916c4dff0002 to session cluster. > 2023-07-14 10:25:35,301 o.a.f.k.o.s.AbstractFlinkService [INFO > ][aelps-staging/gorner-aepp-staging-e5730831] Submitting job: > 103c0446e14749a10002 to session cluster. > 2023-07-14 10:25:35,302 o.a.f.k.o.s.AbstractFlinkService [INFO > ][aelps-staging/gorner-session-staging-e5730831] Submitting job: > de59304d370b4b8e0002 to session cluster. > {code} > In the cluster the JarRunHandler's handleRequest() method will get the > request, all 4 jobIDs are present (also all args, etc are correct): > {code:java} > 2023-07-14 10:25:35,320 WARN > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - > handleRequest - requestBody.jobId: 4968b186061e44390002 > 2023-07-14 10:25:35,321 WARN > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - > handleRequest - requestBody.jobId: de59304d370b4b8e0002 > 2023-07-14 10:25:35,321 WARN > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - > handleRequest - requestBody.jobId: 91a5260d916c4dff0002 > 2023-07-14 10:25:35,321 WARN > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - > handleRequest - requestBody.jobId: 103c0446e14749a10002 > {code} > But once the EmbeddedExecutor's submitAndGetJobClientFuture() method is > called instead of getting 1 call per jobID we have 4 calls but one of the > jobIDs twice: > {code:java} > 2023-07-14 10:25:35,616 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - optJobId: Optional[4968b186061e44390002] > 2023-07-14 10:25:35,616 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - optJobId: Optional[103c0446e14749a10002] > 2023-07-14 10:25:35,616 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - optJobId: Optional[de59304d370b4b8e0002] > 2023-07-14 10:25:35,721 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - optJobId: Optional[de59304d370b4b8e0002] > {code} > If this is important: the jobGraph obtained does not match the jobID. We get > 2 times de59304d370b4b8e0002 but the jobgraph for this jobID is > never returned by getJobGraph() in > EmbeddedExecutor.submitAndGetJobClientFuture(). > This will then lead to the job already existing: > {code:java} > 2023-07-14 10:25:35,616 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - submittedJobIds: [] > 2023-07-14 10:25:35,616 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - submittedJobIds: [] > 2023-07-14 10:25:35,616 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - submittedJobIds: [] > 2023-07-14 10:25:35,721 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - submittedJobIds: [de59304d370b4b8e0002] > {code} >
[GitHub] [flink] zentol merged pull request #22997: [FLINK-32592] Fix (Stream)ExEnv#initializeContextEnvironment thread-safety
zentol merged PR #22997: URL: https://github.com/apache/flink/pull/22997 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] yunfengzhou-hub commented on a diff in pull request #22931: [FLINK-32514] Support configuring checkpointing interval during process backlog
yunfengzhou-hub commented on code in PR #22931: URL: https://github.com/apache/flink/pull/22931#discussion_r1267876016 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java: ## @@ -2079,11 +2149,19 @@ private final class ScheduledTrigger implements Runnable { @Override public void run() { +long currentTime = clock.relativeTimeMillis(); +if (lastCheckpointTriggeringRelativeTime != NO_CHECKPOINT +&& currentTime - lastCheckpointTriggeringRelativeTime < baseInterval) { +return; +} +lastCheckpointTriggeringRelativeTime = currentTime; + try { triggerCheckpoint(checkpointProperties, null, true); } catch (Exception e) { LOG.error("Exception while triggering checkpoint for job {}.", job, e); } +timer.schedule(this, baseInterval, TimeUnit.MILLISECONDS); Review Comment: I'm not sure I understand the expected behavior clearly. The current implementation ensures that the next checkpoint will be <= T2+ X. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on pull request #22997: [FLINK-32592] Fix (Stream)ExEnv#initializeContextEnvironment thread-safety
zentol commented on PR #22997: URL: https://github.com/apache/flink/pull/22997#issuecomment-1641827665 > I'm curious about the fallback behavior in case you don't have thread local context. An atomic reference or volatile field would be better than what exists right now. I don't know why the field wasn't removed when the thread local was introduced; maybe for backwards-compatibility where 1 thread sets the context but another runs the job? :shrug: My initial feeling was that this field should just be removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] yunfengzhou-hub commented on a diff in pull request #22931: [FLINK-32514] Support configuring checkpointing interval during process backlog
yunfengzhou-hub commented on code in PR #22931: URL: https://github.com/apache/flink/pull/22931#discussion_r1267876016 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java: ## @@ -2079,11 +2149,19 @@ private final class ScheduledTrigger implements Runnable { @Override public void run() { +long currentTime = clock.relativeTimeMillis(); +if (lastCheckpointTriggeringRelativeTime != NO_CHECKPOINT +&& currentTime - lastCheckpointTriggeringRelativeTime < baseInterval) { +return; +} +lastCheckpointTriggeringRelativeTime = currentTime; + try { triggerCheckpoint(checkpointProperties, null, true); } catch (Exception e) { LOG.error("Exception while triggering checkpoint for job {}.", job, e); } +timer.schedule(this, baseInterval, TimeUnit.MILLISECONDS); Review Comment: I'm not sure I understand the expected behavior clearly. The current implementation ensures that the next checkpoint will be <= T2+ X, depending on the random init delay. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on a diff in pull request #22997: [FLINK-32592] Fix (Stream)ExEnv#initializeContextEnvironment thread-safety
zentol commented on code in PR #22997: URL: https://github.com/apache/flink/pull/22997#discussion_r1267872976 ## flink-java/src/test/java/org/apache/flink/api/java/ExecutionEnvironmentTest.java: ## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java; + +import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.core.testutils.OneShotLatch; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import static org.assertj.core.api.Assertions.assertThat; + +class ExecutionEnvironmentTest { + +@Test +void testConcurrentSetContext() throws Exception { Review Comment: yes! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] RanJinh commented on a diff in pull request #23000: [FLINK-32594][runtime] Use blocking ResultPartitionType if operator only outputs records on EOF
RanJinh commented on code in PR #23000: URL: https://github.com/apache/flink/pull/23000#discussion_r1267868354 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java: ## @@ -184,6 +184,9 @@ public static JobGraph createJobGraph( private final Map chainedInputOutputFormats; +// The node id would be in this Set if it is an upstream node of an outputEOF node. Review Comment: Thanks for your detailed comments! I will check all of the code comments according to your suggestion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dmvk commented on pull request #22997: [FLINK-32592] Fix (Stream)ExEnv#initializeContextEnvironment thread-safety
dmvk commented on PR #22997: URL: https://github.com/apache/flink/pull/22997#issuecomment-1641815189 I'm curious about the fallback behavior in case you don't have thread local context. 樂 Especially since the `contextEnvironmentFactory` is not marked as volatile. I guess the correct behavior would be simply using atomicReference there and only allow setting it once. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] RanJinh commented on a diff in pull request #23000: [FLINK-32594][runtime] Use blocking ResultPartitionType if operator only outputs records on EOF
RanJinh commented on code in PR #23000: URL: https://github.com/apache/flink/pull/23000#discussion_r1267862410 ## flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java: ## @@ -27,4 +28,16 @@ * method) interfaces that can be implemented via Java 8 lambdas. */ @Public -public interface Function extends java.io.Serializable {} +public interface Function extends java.io.Serializable { +/** Returns true iff the operator can only emit records after inputs have reached EOF. */ +@Internal +default boolean isOutputEOF() { Review Comment: Thanks for your comments! I simply want to provide a method for user defined function to determine the OperatorAttribute directly. Indeed, it does not make sense to connect `Function` and `OperatorAttribute` together. I will modify this later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dmvk commented on a diff in pull request #22997: [FLINK-32592] Fix (Stream)ExEnv#initializeContextEnvironment thread-safety
dmvk commented on code in PR #22997: URL: https://github.com/apache/flink/pull/22997#discussion_r1267858802 ## flink-java/src/test/java/org/apache/flink/api/java/ExecutionEnvironmentTest.java: ## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java; + +import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.core.testutils.OneShotLatch; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import static org.assertj.core.api.Assertions.assertThat; + +class ExecutionEnvironmentTest { + +@Test +void testConcurrentSetContext() throws Exception { Review Comment: Is it correct that without the fix, the test wasn't failing reliably but was just flaky? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] yunfengzhou-hub commented on a diff in pull request #22931: [FLINK-32514] Support configuring checkpointing interval during process backlog
yunfengzhou-hub commented on code in PR #22931: URL: https://github.com/apache/flink/pull/22931#discussion_r1267857378 ## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java: ## @@ -164,6 +167,15 @@ public void lazyInitialize( context.lazyInitialize( globalFailureHandler, mainThreadExecutor, operatorCoordinatorMetricGroup); +OperatorCoordinator rootCoordinator = coordinator; +if (coordinator instanceof RecreateOnResetOperatorCoordinator) { +rootCoordinator = +((RecreateOnResetOperatorCoordinator) rootCoordinator).getInternalCoordinator(); +} +if (rootCoordinator instanceof SourceCoordinator) { +((SourceCoordinator) rootCoordinator).lazyInitialize(checkpointCoordinator); Review Comment: Below is part of the invocation stack of SourceCoordinatorContext's constructor. ``` :142, SourceCoordinatorContext (org.apache.flink.runtime.source.coordinator) :119, SourceCoordinatorContext (org.apache.flink.runtime.source.coordinator) getCoordinator:89, SourceCoordinatorProvider (org.apache.flink.runtime.source.coordinator) createNewInternalCoordinator:337, RecreateOnResetOperatorCoordinator$DeferrableCoordinator (org.apache.flink.runtime.operators.coordination) :60, RecreateOnResetOperatorCoordinator (org.apache.flink.runtime.operators.coordination) :43, RecreateOnResetOperatorCoordinator (org.apache.flink.runtime.operators.coordination) create:200, RecreateOnResetOperatorCoordinator$Provider (org.apache.flink.runtime.operators.coordination) create:194, RecreateOnResetOperatorCoordinator$Provider (org.apache.flink.runtime.operators.coordination) create:546, OperatorCoordinatorHolder (org.apache.flink.runtime.operators.coordination) create:509, OperatorCoordinatorHolder (org.apache.flink.runtime.operators.coordination) createOperatorCoordinatorHolder:286, ExecutionJobVertex (org.apache.flink.runtime.executiongraph) initialize:223, ExecutionJobVertex (org.apache.flink.runtime.executiongraph) initializeJobVertex:888, DefaultExecutionGraph (org.apache.flink.runtime.executiongraph) initializeJobVertex:218, ExecutionGraph (org.apache.flink.runtime.executiongraph) initializeJobVertices:870, DefaultExecutionGraph (org.apache.flink.runtime.executiongraph) attachJobGraph:826, DefaultExecutionGraph (org.apache.flink.runtime.executiongraph) buildGraph:219, DefaultExecutionGraphBuilder (org.apache.flink.runtime.executiongraph) createAndRestoreExecutionGraph:163, DefaultExecutionGraphFactory (org.apache.flink.runtime.scheduler) createAndRestoreExecutionGraph:368, SchedulerBase (org.apache.flink.runtime.scheduler) :210, SchedulerBase (org.apache.flink.runtime.scheduler) :140, DefaultScheduler (org.apache.flink.runtime.scheduler) ``` In order to pass `checkpointCoordinator` through this stack to SourceCoordinatorContext's constructor, we may need to add a `getCheckpointCoordinator` method to `OperatorCoordinator.Context` class. This is a public API change that has not been agreed in the FLIP discussion, and I also personally think this API will yield too much freedom to OperatorCoordinator's developers so should not be introduced. Thus I did not choose this path. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-pulsar] tisonkun commented on pull request #55: [FLINK-24302] Test coverage for JDK 11
tisonkun commented on PR #55: URL: https://github.com/apache/flink-connector-pulsar/pull/55#issuecomment-1641787804 Pending to merge... @syhily do we have a ticket for supporting table connector already? Or I should open a new one? It should be my next step to track. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #23026: [FLINK-32558][flink-java] Deprecate all DataSet API
flinkbot commented on PR #23026: URL: https://github.com/apache/flink/pull/23026#issuecomment-1641769442 ## CI report: * fcbf13064bc547149c674d4b1c0ceaa265176221 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WencongLiu closed pull request #23005: Test for path
WencongLiu closed pull request #23005: Test for path URL: https://github.com/apache/flink/pull/23005 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WencongLiu closed pull request #23002: Mark dataset related classes deprecated
WencongLiu closed pull request #23002: Mark dataset related classes deprecated URL: https://github.com/apache/flink/pull/23002 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32558) Properly deprecate DataSet API
[ https://issues.apache.org/jira/browse/FLINK-32558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32558: --- Labels: pull-request-available (was: ) > Properly deprecate DataSet API > -- > > Key: FLINK-32558 > URL: https://issues.apache.org/jira/browse/FLINK-32558 > Project: Flink > Issue Type: Sub-task > Components: API / DataSet >Reporter: Xintong Song >Assignee: Wencong Liu >Priority: Blocker > Labels: pull-request-available > Fix For: 1.18.0 > > > DataSet API is described as "legacy", "soft deprecated" in user documentation > [1]. The required tasks for formally deprecating / removing it, according to > FLIP-131 [2], are all completed. > This task include marking all related API classes as `@Deprecated` and update > the user documentation. > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/dataset/overview/ > [2] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] WencongLiu opened a new pull request, #23026: [FLINK-32558][flink-java] Deprecate all DataSet API
WencongLiu opened a new pull request, #23026: URL: https://github.com/apache/flink/pull/23026 ## What is the purpose of the change The entry points for using the DataSet API were deprecated. This PR ensures that all @Public, @PublicEvolving and @Experimental APIs are also marked as @Deprecated. This PR doesn't have any code changes outside of deprecating these classes. Classes that are not scheduled for removal might still need to be migrated. ## Brief change log - All classes that contain one of those methods are also marked as deprecated with a link to [FLIP-131](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741) . ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32627) Add support for dynamic time window function
[ https://issues.apache.org/jira/browse/FLINK-32627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744499#comment-17744499 ] 张一帆 commented on FLINK-32627: - [~martijnvisser] Thank you > Add support for dynamic time window function > > > Key: FLINK-32627 > URL: https://issues.apache.org/jira/browse/FLINK-32627 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Affects Versions: 1.18.0 >Reporter: 张一帆 >Assignee: 张一帆 >Priority: Major > Labels: pull-request-available > > When using windows for calculations, when the logic is frequently modified > and adjusted, the entire program needs to be stopped, the code is modified, > the program is repackaged and then submitted to the cluster. It is impossible > to achieve logic dynamic modification and external dynamic injection. The > window information can be obtained from the data to trigger Redistribution of > windows to achieve the effect of dynamic windows -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32627) Add support for dynamic time window function
[ https://issues.apache.org/jira/browse/FLINK-32627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744497#comment-17744497 ] Martijn Visser commented on FLINK-32627: [~zhangyf] I've just granted you the permissions > Add support for dynamic time window function > > > Key: FLINK-32627 > URL: https://issues.apache.org/jira/browse/FLINK-32627 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Affects Versions: 1.18.0 >Reporter: 张一帆 >Assignee: 张一帆 >Priority: Major > Labels: pull-request-available > > When using windows for calculations, when the logic is frequently modified > and adjusted, the entire program needs to be stopped, the code is modified, > the program is repackaged and then submitted to the cluster. It is impossible > to achieve logic dynamic modification and external dynamic injection. The > window information can be obtained from the data to trigger Redistribution of > windows to achieve the effect of dynamic windows -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32627) Add support for dynamic time window function
[ https://issues.apache.org/jira/browse/FLINK-32627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744494#comment-17744494 ] 张一帆 commented on FLINK-32627: - [~martijnvisser] How do I create a FLIP, I don't seem to have permission > Add support for dynamic time window function > > > Key: FLINK-32627 > URL: https://issues.apache.org/jira/browse/FLINK-32627 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Affects Versions: 1.18.0 >Reporter: 张一帆 >Assignee: 张一帆 >Priority: Major > Labels: pull-request-available > > When using windows for calculations, when the logic is frequently modified > and adjusted, the entire program needs to be stopped, the code is modified, > the program is repackaged and then submitted to the cluster. It is impossible > to achieve logic dynamic modification and external dynamic injection. The > window information can be obtained from the data to trigger Redistribution of > windows to achieve the effect of dynamic windows -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-29039) RowData produced by LineBytesInputFormat is reused, but DeserializationSchemaAdapter#Reader only shallow copies produced data, thus result will always be the last row v
[ https://issues.apache.org/jira/browse/FLINK-29039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser reassigned FLINK-29039: -- Assignee: Hang Ruan > RowData produced by LineBytesInputFormat is reused, but > DeserializationSchemaAdapter#Reader only shallow copies produced data, thus > result will always be the last row value > > > Key: FLINK-29039 > URL: https://issues.apache.org/jira/browse/FLINK-29039 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.15.1 > Environment: This issue was discovered on MacOS Big Sur. >Reporter: Marco A. Villalobos >Assignee: Hang Ruan >Priority: Major > > RowData produced by LineBytesInputFormat is reused, but > DeserializationSchemaAdapter#Reader only shallow copies produced data, thus > result will always be the last row value. > > Given this program: > {code:java} > package mvillalobos.bug; > import org.apache.flink.api.common.RuntimeExecutionMode; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.TableResult; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import static org.apache.flink.table.api.Expressions.$; > public class IsThisABatchSQLBug { >public static void main(String[] args) { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setRuntimeMode(RuntimeExecutionMode.BATCH); > final StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env); > tableEnv.executeSql("CREATE TABLE historical_raw_source_template(\n" + > " `file.path` STRING NOT NULL METADATA,\n" + > " `file.name` STRING NOT NULL METADATA,\n" + > " `file.size` BIGINT NOT NULL METADATA,\n" + > " `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL > METADATA,\n" + > " line STRING\n" + > " ) WITH (\n" + > " 'connector' = 'filesystem', \n" + > " 'format' = 'raw'\n" + > " );"); > tableEnv.executeSql("CREATE TABLE historical_raw_source\n" + > " WITH (\n" + > " 'path' = > '/Users/minmay/dev/mvillalobos/historical/data'\n" + > " ) LIKE historical_raw_source_template;"); final > TableResult output = > tableEnv.from("historical_raw_source").select($("line")).execute(); > output.print(); > } > } {code} > and this sample.csv file in the > '/Users/minmay/dev/mvillalobos/historical/data' directory: > {code:java} > one > two > three > four > five > six > seven > eight > nine > ten {code} > {{The print results are:}} > {code:java} > +++ > | +I | ten | > | +I | ten | > | +I | ten | > | +I | ten | > | +I | ten | > | +I | ten | > | +I | ten | > | +I | ten | > | +I | ten | > | +I | ten | > +++ > 10 rows in set {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] yunfengzhou-hub commented on a diff in pull request #22931: [FLINK-32514] Support configuring checkpointing interval during process backlog
yunfengzhou-hub commented on code in PR #22931: URL: https://github.com/apache/flink/pull/22931#discussion_r1267760838 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java: ## @@ -2079,11 +2149,19 @@ private final class ScheduledTrigger implements Runnable { @Override public void run() { +long currentTime = clock.relativeTimeMillis(); +if (lastCheckpointTriggeringRelativeTime != NO_CHECKPOINT +&& currentTime - lastCheckpointTriggeringRelativeTime < baseInterval) { Review Comment: With the current implementation in which `lastCheckpointTriggeringRelativeTime` could be `Long.MIN_VALUE`, `currentTime - lastCheckpointTriggeringRelativeTime` could cause stack overflow. Thus I treat `Long.MIN_VALUE` specially here. This would be resolved after I replace the default value with 0 or -1 according to the previous comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] yunfengzhou-hub commented on a diff in pull request #22931: [FLINK-32514] Support configuring checkpointing interval during process backlog
yunfengzhou-hub commented on code in PR #22931: URL: https://github.com/apache/flink/pull/22931#discussion_r1267760838 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java: ## @@ -2079,11 +2149,19 @@ private final class ScheduledTrigger implements Runnable { @Override public void run() { +long currentTime = clock.relativeTimeMillis(); +if (lastCheckpointTriggeringRelativeTime != NO_CHECKPOINT +&& currentTime - lastCheckpointTriggeringRelativeTime < baseInterval) { Review Comment: With the current implementation in which `lastCheckpointTriggeringRelativeTime` could be `Long.MIN_VALUE`, `currentTime - lastCheckpointTriggeringRelativeTime` could cause stack overflow. Thus I treat `Long.MIN_VALUE` specially here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] yunfengzhou-hub commented on a diff in pull request #22931: [FLINK-32514] Support configuring checkpointing interval during process backlog
yunfengzhou-hub commented on code in PR #22931: URL: https://github.com/apache/flink/pull/22931#discussion_r1267760838 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java: ## @@ -2079,11 +2149,19 @@ private final class ScheduledTrigger implements Runnable { @Override public void run() { +long currentTime = clock.relativeTimeMillis(); +if (lastCheckpointTriggeringRelativeTime != NO_CHECKPOINT +&& currentTime - lastCheckpointTriggeringRelativeTime < baseInterval) { Review Comment: With the current implementation in which `lastCheckpointTriggeringRelativeTime` could be `Long.MIN_VALUE`, `currentTime - lastCheckpointTriggeringRelativeTime` could cause stack overflow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #23025: [FLINK-7129]Support dynamically changing CEP patterns
flinkbot commented on PR #23025: URL: https://github.com/apache/flink/pull/23025#issuecomment-1641670271 ## CI report: * 1e5b0a3d55605fe1e6de27331a6c204b9824dc62 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org