[jira] [Commented] (FLINK-10392) Remove legacy mode
[ https://issues.apache.org/jira/browse/FLINK-10392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16628234#comment-16628234 ] tison commented on FLINK-10392: --- [~till.rohrmann] I am afraid that if we add every test porting job a sub task then we will get a long list even before we start the removal of legacy project production file. It is acceptable or we can do some squash work? > Remove legacy mode > -- > > Key: FLINK-10392 > URL: https://issues.apache.org/jira/browse/FLINK-10392 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Fix For: 1.7.0 > > > This issue is the umbrella issue to remove the legacy mode code from Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10406) Port JobManagerTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16628197#comment-16628197 ] tison edited comment on FLINK-10406 at 9/26/18 3:41 AM: - {{testCancelWithSavepoint}} is covered by {{JobMasterTriggerSavepointIT#testStopJobAfterSavepoint}} - {{testCancelWithSavepointNoDirectoriesConfigured}} is somehow covered by {{JobMasterTriggerSavepointIT#testDoNotCancelJobIfSavepointFails}}. Now we don't provide detail error message to dig out the cause a savepoint fails. {{testDoNotCancelJobIfSavepointFails}} tests if the savepoint path permission denied, but change it to a /not/exist/path provide the same process. the exception stringified as "java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed to trigger savepoint. Decline reason: An Exception occurred while triggering the checkpoint." - {{testCancelJobWithSavepointFailurePeriodicCheckpoints}} is covered by {{JobMasterTriggerSavepointIT#testDoNotCancelJobIfSavepointFails}}. was (Author: tison): - {{testCancelWithSavepoint}} is covered by {{JobMasterTriggerSavepointIT#testStopJobAfterSavepoint}} - {{testCancelWithSavepointNoDirectoriesConfigured}} is somehow covered by {{JobMasterTriggerSavepointIT#testDoNotCancelJobIfSavepointFails}}. Now we don't provide detail error message to dig out the cause a savepoint fails. {{testDoNotCancelJobIfSavepointFails}} tests if the savepoint path permission denied, but change it to a /not/exist/path provide the same process. the exception stringified as "java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed to trigger savepoint. Decline reason: An Exception occurred while triggering the checkpoint." > Port JobManagerTest to new code base > > > Key: FLINK-10406 > URL: https://issues.apache.org/jira/browse/FLINK-10406 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Fix For: 1.7.0 > > > Port {{JobManagerTest}} to new code base > Not all of its tests should be ported, since some of them are covered by > {{JobMasterTest}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10406) Port JobManagerTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16628197#comment-16628197 ] tison edited comment on FLINK-10406 at 9/26/18 3:27 AM: - {{testCancelWithSavepoint}} is covered by {{JobMasterTriggerSavepointIT#testStopJobAfterSavepoint}} - {{testCancelWithSavepointNoDirectoriesConfigured}} is somehow covered by {{JobMasterTriggerSavepointIT#testDoNotCancelJobIfSavepointFails}}. Now we don't provide detail error message to dig out the cause a savepoint fails. {{testDoNotCancelJobIfSavepointFails}} tests if the savepoint path permission denied, but change it to a /not/exist/path provide the same process. the exception stringified as "java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed to trigger savepoint. Decline reason: An Exception occurred while triggering the checkpoint." was (Author: tison): - {{testCancelWithSavepoint}} is covered by {{JobMasterTriggerSavepointIT#testStopJobAfterSavepoint}} - {{testCancelWithSavepointNoDirectoriesConfigured}} is somehow covered by {{JobMasterTriggerSavepointIT#testDoNotCancelJobIfSavepointFails}}. Now we don't provide detail error message to dig out the cause a savepoint fails. {{testDoNotCancelJobIfSavepointFails}} tests if the savepoint path permission denied, but change it to a /not/exist/path provide the same process. > Port JobManagerTest to new code base > > > Key: FLINK-10406 > URL: https://issues.apache.org/jira/browse/FLINK-10406 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Fix For: 1.7.0 > > > Port {{JobManagerTest}} to new code base > Not all of its tests should be ported, since some of them are covered by > {{JobMasterTest}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10406) Port JobManagerTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16628197#comment-16628197 ] tison edited comment on FLINK-10406 at 9/26/18 3:26 AM: - {{testCancelWithSavepoint}} is covered by {{JobMasterTriggerSavepointIT#testStopJobAfterSavepoint}} - {{testCancelWithSavepointNoDirectoriesConfigured}} is somehow covered by {{JobMasterTriggerSavepointIT#testDoNotCancelJobIfSavepointFails}}. Now we don't provide detail error message to dig out the cause a savepoint fails. {{testDoNotCancelJobIfSavepointFails}} tests if the savepoint path permission denied, but change it to a /not/exist/path provide the same process. was (Author: tison): - {{testCancelWithSavepoint}} is covered by {{JobMasterTriggerSavepointIT#testStopJobAfterSavepoint}} > Port JobManagerTest to new code base > > > Key: FLINK-10406 > URL: https://issues.apache.org/jira/browse/FLINK-10406 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Fix For: 1.7.0 > > > Port {{JobManagerTest}} to new code base > Not all of its tests should be ported, since some of them are covered by > {{JobMasterTest}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10420) Create and drop view in sql client should check the view created based on the configuration.
[ https://issues.apache.org/jira/browse/FLINK-10420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16628208#comment-16628208 ] vinoyang commented on FLINK-10420: -- [~hequn8128] What do you think about this issue? > Create and drop view in sql client should check the view created based on the > configuration. > > > Key: FLINK-10420 > URL: https://issues.apache.org/jira/browse/FLINK-10420 > Project: Flink > Issue Type: Bug > Components: SQL Client >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > > Currently, just checked current session : > {code:java} > private void callCreateView(SqlCommandCall cmdCall) { >final String name = cmdCall.operands[0]; >final String query = cmdCall.operands[1]; >//here >final String previousQuery = context.getViews().get(name); >if (previousQuery != null) { > printExecutionError(CliStrings.MESSAGE_VIEW_ALREADY_EXISTS); > return; >} > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10406) Port JobManagerTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16628197#comment-16628197 ] tison commented on FLINK-10406: --- - {{testCancelWithSavepoint}} is covered by {{JobMasterTriggerSavepointIT#testStopJobAfterSavepoint}} > Port JobManagerTest to new code base > > > Key: FLINK-10406 > URL: https://issues.apache.org/jira/browse/FLINK-10406 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Fix For: 1.7.0 > > > Port {{JobManagerTest}} to new code base > Not all of its tests should be ported, since some of them are covered by > {{JobMasterTest}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10427) Port JobSubmitTest to new code base
tison created FLINK-10427: - Summary: Port JobSubmitTest to new code base Key: FLINK-10427 URL: https://issues.apache.org/jira/browse/FLINK-10427 Project: Flink Issue Type: Sub-task Components: Tests Affects Versions: 1.7.0 Reporter: tison Assignee: tison Fix For: 1.7.0 Port {{JobSubmitTest}} to new code base. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10406) Port JobManagerTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16628148#comment-16628148 ] tison edited comment on FLINK-10406 at 9/26/18 2:57 AM: - {{testSavepointRestoreSettings}} is covered by {{JobMaster#testRestoringFromSavepoint}} the {{triggerSavepoint}} part is covered by {{JobMasterTriggerSavepointIT}}, and the submit failure part should be taken care of when port {{JobSubmitTest}}, which has a test {{testAnswerFailureWhenSavepointReadFails}} was (Author: tison): - {{testSavepointRestoreSettings}} is covered by {{JobMaster#testRestoringFromSavepoint}} > Port JobManagerTest to new code base > > > Key: FLINK-10406 > URL: https://issues.apache.org/jira/browse/FLINK-10406 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Fix For: 1.7.0 > > > Port {{JobManagerTest}} to new code base > Not all of its tests should be ported, since some of them are covered by > {{JobMasterTest}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16628183#comment-16628183 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars commented on issue #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#issuecomment-424565714 @StephanEwen Hello, regarding the two questions you raised yesterday, I have some opinions about myself and I don’t know if it’s right. 1.Where should the BOM be read?I think it is still necessary to increase the logic for processing the bom when the file is started at the beginning of the file. Add an attribute to the read bom encoding logic to record the file bom encoding.For example: put it in the function `createInputSplits`. 2.Regarding the second performance problem, you can use the previously generated bom code to judge UTF8 with bom, UTF16 wuth bom, UTF32 with bom, and control the byte size to process the end of each line, because I found The previous bug garbled is actually a coding problem, one is caused by improper processing of the end byte of each line. I have done the following for this problem: `String utf8 = "UTF-8";` `String utf16 = "UTF-16";` `String utf32 = "UTF-32";` `int stepSize = 0;` `String charsetName = this.getCharsetName();` `if (charsetName.contains(utf8)) {` `stepSize = 1;` `} else if (charsetName.contains(utf16)) {` `stepSize = 2;` `} else if (charsetName.contains(utf32)) {` `stepSize = 4;` `}` `//Check if \n is used as delimiter and the end of this line is a \r, then remove \r from the line` `if (this.getDelimiter() != null && this.getDelimiter().length == 1` `&& this.getDelimiter()[0] == NEW_LINE && offset + numBytes >= stepSize` `&& bytes[offset + numBytes - stepSize] == CARRIAGE_RETURN) {` `numBytes -= stepSize;` `}` `numBytes = numBytes - stepSize + 1;` `return new String(bytes, offset, numBytes, this.getCharsetName());` These are some of my own ideas. I hope that you can give some better suggestions and handle this jira better. Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or >
[GitHub] XuQianJin-Stars commented on issue #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed
XuQianJin-Stars commented on issue #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#issuecomment-424565714 @StephanEwen Hello, regarding the two questions you raised yesterday, I have some opinions about myself and I don’t know if it’s right. 1.Where should the BOM be read?I think it is still necessary to increase the logic for processing the bom when the file is started at the beginning of the file. Add an attribute to the read bom encoding logic to record the file bom encoding.For example: put it in the function `createInputSplits`. 2.Regarding the second performance problem, you can use the previously generated bom code to judge UTF8 with bom, UTF16 wuth bom, UTF32 with bom, and control the byte size to process the end of each line, because I found The previous bug garbled is actually a coding problem, one is caused by improper processing of the end byte of each line. I have done the following for this problem: `String utf8 = "UTF-8";` `String utf16 = "UTF-16";` `String utf32 = "UTF-32";` `int stepSize = 0;` `String charsetName = this.getCharsetName();` `if (charsetName.contains(utf8)) {` `stepSize = 1;` `} else if (charsetName.contains(utf16)) {` `stepSize = 2;` `} else if (charsetName.contains(utf32)) {` `stepSize = 4;` `}` `//Check if \n is used as delimiter and the end of this line is a \r, then remove \r from the line` `if (this.getDelimiter() != null && this.getDelimiter().length == 1` `&& this.getDelimiter()[0] == NEW_LINE && offset + numBytes >= stepSize` `&& bytes[offset + numBytes - stepSize] == CARRIAGE_RETURN) {` `numBytes -= stepSize;` `}` `numBytes = numBytes - stepSize + 1;` `return new String(bytes, offset, numBytes, this.getCharsetName());` These are some of my own ideas. I hope that you can give some better suggestions and handle this jira better. Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9328) RocksDBStateBackend might use PlaceholderStreamStateHandle to restor due to StateBackendTestBase class not register snapshots in some UTs
[ https://issues.apache.org/jira/browse/FLINK-9328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16628169#comment-16628169 ] ASF GitHub Bot commented on FLINK-9328: --- Myasuka commented on issue #5984: [FLINK-9328][state] Fix RocksDBStateBackend restore problem due to StateBackendTestBase not registering snapshots URL: https://github.com/apache/flink/pull/5984#issuecomment-424561812 @azagrebin Sure, since the test code already fixed, just close this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > RocksDBStateBackend might use PlaceholderStreamStateHandle to restor due to > StateBackendTestBase class not register snapshots in some UTs > - > > Key: FLINK-9328 > URL: https://issues.apache.org/jira/browse/FLINK-9328 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Yun Tang >Priority: Minor > Labels: pull-request-available > Fix For: 1.5.5 > > > Currently, StateBackendTestBase class does not register snapshots to > SharedStateRegistry in testValueState, testListState, testReducingState, > testFoldingState and testMapState UTs, which may cause RocksDBStateBackend to > restore from PlaceholderStreamStateHandle during the 2nd restore procedure if > one specific sst file both existed in the 1st snapshot and the 2nd snapshot > handle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Myasuka commented on issue #5984: [FLINK-9328][state] Fix RocksDBStateBackend restore problem due to StateBackendTestBase not registering snapshots
Myasuka commented on issue #5984: [FLINK-9328][state] Fix RocksDBStateBackend restore problem due to StateBackendTestBase not registering snapshots URL: https://github.com/apache/flink/pull/5984#issuecomment-424561812 @azagrebin Sure, since the test code already fixed, just close this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10406) Port JobManagerTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16628148#comment-16628148 ] tison commented on FLINK-10406: --- - {{testSavepointRestoreSettings}} is covered by {{JobMaster#testRestoringFromSavepoint}} > Port JobManagerTest to new code base > > > Key: FLINK-10406 > URL: https://issues.apache.org/jira/browse/FLINK-10406 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Fix For: 1.7.0 > > > Port {{JobManagerTest}} to new code base > Not all of its tests should be ported, since some of them are covered by > {{JobMasterTest}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10292) Generate JobGraph in StandaloneJobClusterEntrypoint only once
[ https://issues.apache.org/jira/browse/FLINK-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16628139#comment-16628139 ] vinoyang commented on FLINK-10292: -- Hi [~uce] Thank you for your clarification on "image", that doesn't have much impact. My general idea is that Flink runs in a virtualized environment and the standalone environment is not much different in nature, and can be treated equally. [~till.rohrmann], what do you think about our discussion, if you have time, please help to review the PR of FLINK-10291, it should be no dispute. > Generate JobGraph in StandaloneJobClusterEntrypoint only once > - > > Key: FLINK-10292 > URL: https://issues.apache.org/jira/browse/FLINK-10292 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Fix For: 1.7.0, 1.6.2 > > > Currently the {{StandaloneJobClusterEntrypoint}} generates the {{JobGraph}} > from the given user code every time it starts/is restarted. This can be > problematic if the the {{JobGraph}} generation has side effects. Therefore, > it would be better to generate the {{JobGraph}} only once and store it in HA > storage instead from where to retrieve. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask
[ https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16628084#comment-16628084 ] ASF GitHub Bot commented on FLINK-10205: chunhui-shi commented on issue #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource… URL: https://github.com/apache/flink/pull/6684#issuecomment-424544286 Nice work. Just that the stored history of split 'inputSplits' does not have to be a full history. If we understand that the list is short enough, then it is fine to ship it as is. +1. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Batch Job: InputSplit Fault tolerant for DataSourceTask > --- > > Key: FLINK-10205 > URL: https://issues.apache.org/jira/browse/FLINK-10205 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > Labels: pull-request-available > Original Estimate: 168h > Remaining Estimate: 168h > > Today DataSource Task pull InputSplits from JobManager to achieve better > performance, however, when a DataSourceTask failed and rerun, it will not get > the same splits as its previous version. this will introduce inconsistent > result or even data corruption. > Furthermore, if there are two executions run at the same time (in batch > scenario), this two executions should process same splits. > we need to fix the issue to make the inputs of a DataSourceTask > deterministic. The propose is save all splits into ExecutionVertex and > DataSourceTask will pull split from there. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] chunhui-shi commented on issue #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource…
chunhui-shi commented on issue #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource… URL: https://github.com/apache/flink/pull/6684#issuecomment-424544286 Nice work. Just that the stored history of split 'inputSplits' does not have to be a full history. If we understand that the list is short enough, then it is fine to ship it as is. +1. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-9363) Bump up the Jackson version
[ https://issues.apache.org/jira/browse/FLINK-9363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-9363: -- Description: CVE's for Jackson : CVE-2017-17485 CVE-2018-5968 CVE-2018-7489 We can upgrade to 2.9.5 was: CVE's for Jackson : CVE-2017-17485 CVE-2018-5968 CVE-2018-7489 We can upgrade to 2.9.5 > Bump up the Jackson version > --- > > Key: FLINK-9363 > URL: https://issues.apache.org/jira/browse/FLINK-9363 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Assignee: vinoyang >Priority: Major > Labels: security > > CVE's for Jackson : > CVE-2017-17485 > CVE-2018-5968 > CVE-2018-7489 > We can upgrade to 2.9.5 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10423) Forward RocksDB memory metrics to Flink metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627866#comment-16627866 ] Monal Daxini commented on FLINK-10423: -- It will be great to have this ported to 1.6 release as well. Spoke with [~srichter] about this offline. > Forward RocksDB memory metrics to Flink metrics reporter > - > > Key: FLINK-10423 > URL: https://issues.apache.org/jira/browse/FLINK-10423 > Project: Flink > Issue Type: New Feature > Components: Metrics, State Backends, Checkpointing >Reporter: Seth Wiesman >Assignee: Seth Wiesman >Priority: Major > > RocksDB contains a number of metrics at the column family level about current > memory usage, open memtables, etc that would be useful to users wishing > greater insight what rocksdb is doing. This work is inspired heavily by the > comments on this rocksdb issue thread > (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10423) Forward RocksDB memory metrics to Flink metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627866#comment-16627866 ] Monal Daxini edited comment on FLINK-10423 at 9/25/18 8:15 PM: --- It will be great to have this ported to 1.6 release. Spoke with [~srichter] about this offline. was (Author: mdaxini): It will be great to have this ported to 1.6 release as well. Spoke with [~srichter] about this offline. > Forward RocksDB memory metrics to Flink metrics reporter > - > > Key: FLINK-10423 > URL: https://issues.apache.org/jira/browse/FLINK-10423 > Project: Flink > Issue Type: New Feature > Components: Metrics, State Backends, Checkpointing >Reporter: Seth Wiesman >Assignee: Seth Wiesman >Priority: Major > > RocksDB contains a number of metrics at the column family level about current > memory usage, open memtables, etc that would be useful to users wishing > greater insight what rocksdb is doing. This work is inspired heavily by the > comments on this rocksdb issue thread > (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10406) Port JobManagerTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627806#comment-16627806 ] tison edited comment on FLINK-10406 at 9/25/18 7:28 PM: * {{testStopSignal}} and {{testStopSignalFail}} are covered by {{ExecutionGraphStopTest}}. High level invocation at {{Dispatcher}} and {{JobMaster}} level are trivial. * {{testNullHostnameGoesToLocalhost}} is ported to {{AkkaUtilsTest#"null hostname should go to localhost"}} * {{testRequestPartitionState*}} I would propose to ignore all of them since we have FLINK-10319. It proposed to disable {{JobMaster#requestPartitionState}} and have one approval and no objection yet. Also cc [~trohrm...@apache.org], could you take a look at FLINK-10319 so that we could make the decision of this removal? (UPDATE: even without FLINK-10319 accepted, these tests should be covered by {{JobMasterTest#testRequestPartitionState}} and {{TaskTest#...}}) was (Author: tison): * {{testStopSignal}} and {{testStopSignalFail}} are covered by {{ExecutionGraphStopTest}}. High level invocation at {{Dispatcher}} and {{JobMaster}} level are trivial. * {{testNullHostnameGoesToLocalhost}} is ported to {{AkkaUtilsTest#"null hostname should go to localhost"}} * {{testRequestPartitionState*}} I would propose to ignore all of them since we have FLINK-10319. It proposed to disable {{JobMaster#requestPartitionState}} and have one approval and no objection yet. Also cc [~trohrm...@apache.org], could you take a look at FLINK-10319 so that we could make the decision of this removal? > Port JobManagerTest to new code base > > > Key: FLINK-10406 > URL: https://issues.apache.org/jira/browse/FLINK-10406 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Fix For: 1.7.0 > > > Port {{JobManagerTest}} to new code base > Not all of its tests should be ported, since some of them are covered by > {{JobMasterTest}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10406) Port JobManagerTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627806#comment-16627806 ] tison edited comment on FLINK-10406 at 9/25/18 7:22 PM: * {{testStopSignal}} and {{testStopSignalFail}} are covered by {{ExecutionGraphStopTest}}. High level invocation at {{Dispatcher}} and {{JobMaster}} level are trivial. * {{testNullHostnameGoesToLocalhost}} is ported to {{AkkaUtilsTest#"null hostname should go to localhost"}} * {{testRequestPartitionState*}} I would propose to ignore all of them since we have FLINK-10319. It proposed to disable {{JobMaster#requestPartitionState}} and have one approval and no objection yet. Also cc [~trohrm...@apache.org], could you take a look at FLINK-10319 so that we could make the decision of this removal? was (Author: tison): * {{testStopSignal}} and {{testStopSignalFail}} are covered by {{ExecutionGraphStopTest}}. High level invocation at {{Dispatcher}} and {{JobMaster}} level are trivial. * {{testNullHostnameGoesToLocalhost}} is ported to {{AkkaUtilsTest#"null hostname should go to localhost"}} > Port JobManagerTest to new code base > > > Key: FLINK-10406 > URL: https://issues.apache.org/jira/browse/FLINK-10406 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Fix For: 1.7.0 > > > Port {{JobManagerTest}} to new code base > Not all of its tests should be ported, since some of them are covered by > {{JobMasterTest}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10406) Port JobManagerTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627806#comment-16627806 ] tison edited comment on FLINK-10406 at 9/25/18 7:13 PM: * {{testStopSignal}} and {{testStopSignalFail}} are covered by {{ExecutionGraphStopTest}}. High level invocation at {{Dispatcher}} and {{JobMaster}} level are trivial. * {{testNullHostnameGoesToLocalhost}} is ported to {{AkkaUtilsTest#"null hostname should go to localhost"}} was (Author: tison): * {{testStopSignal}} and {{testStopSignalFail}} are covered by {{ExecutionGraphStopTest}}. High level invocation at {{Dispatcher}} and {{JobMaster}} level are trivial. * > Port JobManagerTest to new code base > > > Key: FLINK-10406 > URL: https://issues.apache.org/jira/browse/FLINK-10406 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Fix For: 1.7.0 > > > Port {{JobManagerTest}} to new code base > Not all of its tests should be ported, since some of them are covered by > {{JobMasterTest}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10406) Port JobManagerTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627806#comment-16627806 ] tison commented on FLINK-10406: --- * {{testStopSignal}} and {{testStopSignalFail}} are covered by {{ExecutionGraphStopTest}}. High level invocation at {{Dispatcher}} and {{JobMaster}} level are trivial. * > Port JobManagerTest to new code base > > > Key: FLINK-10406 > URL: https://issues.apache.org/jira/browse/FLINK-10406 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Fix For: 1.7.0 > > > Port {{JobManagerTest}} to new code base > Not all of its tests should be ported, since some of them are covered by > {{JobMasterTest}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10225) Cannot access state from a empty taskmanager
[ https://issues.apache.org/jira/browse/FLINK-10225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16620743#comment-16620743 ] Pierre Zemb edited comment on FLINK-10225 at 9/25/18 6:40 PM: -- Hi! I've been reading the code a bit, and I have a question. >From what I can see, only the TaskExecutor has enough knowledge to make a call >to the ResourceManager. He is also the only one that is updating the >ConcurrentHashMap used by the RPC handler. Meaning that when I'm inside the >RPC handler, I cannot find a way to nicely trigger a method of TaskExecutor. I like the way the interface for KvStateClientProxy is implemented, and I don't want to change , do you have an idea on how could I implement this? cc [~till.rohrmann] was (Author: pierrez): Hi! I've been reading the code a bit, and I have a question. >From what I can see, only the TaskExecutor has enough knowledge to make a call >to the ResourceManager. He is also the only one that is updating the >ConcurrentHashMap used by the RPC handler. Meaning that when I'm inside the >RPC handler, I cannot find a way to nicely trigger a method of TaskExecutor. I like the way the interface for KvStateClientProxy is implemented, and I don't want to change , do you have an idea on how could I implement this? > Cannot access state from a empty taskmanager > > > Key: FLINK-10225 > URL: https://issues.apache.org/jira/browse/FLINK-10225 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.5.3, 1.6.0 > Environment: 4tm and 1jm for now on 1.6.0 >Reporter: Pierre Zemb >Priority: Critical > > Hi! > I've started to deploy a small Flink cluster (4tm and 1jm for now on 1.6.0), > and deployed a small job on it. Because of the current load, job is > completely handled by a single tm. I've created a small proxy that is using > [QueryableStateClient|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/queryablestate/client/QueryableStateClient.html] > to access the current state. It is working nicely, except under certain > circumstances. It seems to me that I can only access the state through a node > that is holding a part of the job. Here's an example: > * job on tm1. Pointing QueryableStateClient to tm1. State accessible > * job still on tm1. Pointing QueryableStateClient to tm2 (for example). > State inaccessible > * killing tm1, job is now on tm2. State accessible > * job still on tm2. Pointing QueryableStateClient to tm3. State inaccessible > * adding some parallelism to spread job on tm1 and tm2. Pointing > QueryableStateClient to either tm1 and tm2 is working > * job still on tm1 and tm2. Pointing QueryableStateClient to tm3. State > inaccessible > When the state is inaccessible, I can see this (generated > [here|https://github.com/apache/flink/blob/release-1.6/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java#L228]): > {code:java} > java.lang.RuntimeException: Failed request 0. Caused by: > org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could > not retrieve location of state=repo-status of > job=3ac3bc00b2d5bc0752917186a288d40a. Potential reasons are: i) the state is > not ready, or ii) the job does not exist. at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Went a bit through the (master branch) code. Class KvStateClientProxy is > holding {color:#33}kvStateLocationOracle the key-value state location > oracle for the given JobID. Here's the usage{color}{color:#33}:{color} > * {color:#33}updateKvStateLocationOracle() in
[jira] [Commented] (FLINK-10292) Generate JobGraph in StandaloneJobClusterEntrypoint only once
[ https://issues.apache.org/jira/browse/FLINK-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627766#comment-16627766 ] Ufuk Celebi commented on FLINK-10292: - [~yanghua] I didn't mean to confuse you with the image and Kubernetes example. I just took it as *one* example for when the {{StandaloneJobClusterEntryPoint}} is used. My main question was independent of this as you also pointed out. Thanks for your input. To summarize your answer, you think that in such a scenario, it should be the **responsibility of the user** to clean up the HA data. > Generate JobGraph in StandaloneJobClusterEntrypoint only once > - > > Key: FLINK-10292 > URL: https://issues.apache.org/jira/browse/FLINK-10292 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Fix For: 1.7.0, 1.6.2 > > > Currently the {{StandaloneJobClusterEntrypoint}} generates the {{JobGraph}} > from the given user code every time it starts/is restarted. This can be > problematic if the the {{JobGraph}} generation has side effects. Therefore, > it would be better to generate the {{JobGraph}} only once and store it in HA > storage instead from where to retrieve. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10292) Generate JobGraph in StandaloneJobClusterEntrypoint only once
[ https://issues.apache.org/jira/browse/FLINK-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626190#comment-16626190 ] Ufuk Celebi edited comment on FLINK-10292 at 9/25/18 6:24 PM: -- I understand that non-determinism may be an issue when generating the {{JobGraph}}, but do we have some data about how common that is for applications? Would it be possible to keep a fixed JobGraph in the image instead of persisting one in the {{SubmittedJobGraphStore}}? I like our current approach, because it keeps the source of truth with image-based deployments such as Kubernetes *in* the image instead of the {{SubmittedJobGraphStore}}. I'm wondering about the following scenario in particular (this is independent of the question whether it runs on Kubernetes or not and can be reproduced in an other way as well): * A user creates a job cluster with high availability enabled (cluster ID for the logical application, e.g. myapp) ** This will persist the job with a fixed ID (after FLINK-10291) on first submission * The user kills the application *without* cancelling ** This will leave all data in the high availability store(s) such as job graphs or checkpoints * The user updates the image with a modified application and keeps the high availability configuration (e.g. cluster ID stays myapp) ** This will result in the job in the image to be ignored since we already have a job graph with the same (fixed) ID I think in such a scenario it can be desirable to still have the checkpoints available, but it might be problematic if the job graph is recovered from the {{SubmittedJobGraphStore}} instead of using the job that is part of the image. What do you think about this scenario? Is it the responsibility of the user to handle this? If so, I think that the approach outlined in this ticket makes sense. If not, we may want to consider alternatives or ignore potential non-determinism. was (Author: uce): I understand that non-determinism may be an issue when generating the {{JobGraph}}, but do we have some data about how common that is for applications? Would it be possible to keep a fixed JobGraph in the image instead of persisting one in the {{SubmittedJobGraphStore}}? I like our current approach, because it keeps the source of truth for the job in the image instead of the {{SubmittedJobGraphStore}}. I'm wondering about the following scenario: * A user creates a job cluster with high availability enabled (cluster ID for the logical application, e.g. myapp) ** This will persist the job with a fixed ID (after FLINK-10291) on first submission * The user kills the application *without* cancelling ** This will leave all data in the high availability store(s) such as job graphs or checkpoints * The user updates the image with a modified application and keeps the high availability configuration (e.g. cluster ID stays myapp) ** This will result in the job in the image to be ignored since we already have a job graph with the same (fixed) ID I think in such a scenario it can be desirable to still have the checkpoints available, but it might be problematic if the job graph is recovered from the {{SubmittedJobGraphStore}} instead of using the job that is part of the image. What do you think about this scenario? Is it the responsibility of the user to handle this? If so, I think that the approach outlined in this ticket makes sense. If not, we may want to consider alternatives or ignore potential non-determinism. > Generate JobGraph in StandaloneJobClusterEntrypoint only once > - > > Key: FLINK-10292 > URL: https://issues.apache.org/jira/browse/FLINK-10292 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Fix For: 1.7.0, 1.6.2 > > > Currently the {{StandaloneJobClusterEntrypoint}} generates the {{JobGraph}} > from the given user code every time it starts/is restarted. This can be > problematic if the the {{JobGraph}} generation has side effects. Therefore, > it would be better to generate the {{JobGraph}} only once and store it in HA > storage instead from where to retrieve. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10279) Make jython limitations more obvious in documentation
[ https://issues.apache.org/jira/browse/FLINK-10279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10279: --- Labels: pull-request-available (was: ) > Make jython limitations more obvious in documentation > - > > Key: FLINK-10279 > URL: https://issues.apache.org/jira/browse/FLINK-10279 > Project: Flink > Issue Type: Task > Components: Documentation, Python API >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > > The "Python Programming Guide (Streaming) Beta" at > [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/python.html] > does not sufficiently highlight limitations of the API. It should probably > have a prominent disclaimer right at the top stating that this actually isn't > a "Python" API but Jython, which likely means that the user looking for a > solution to run native Python code won't be able to use many important > libraries, which is often the reason to look for Python support in first > place. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10279) Make jython limitations more obvious in documentation
[ https://issues.apache.org/jira/browse/FLINK-10279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627725#comment-16627725 ] ASF GitHub Bot commented on FLINK-10279: tweise opened a new pull request #6761: [FLINK-10279] [documentation] Make jython limitations more obvious in documentation. URL: https://github.com/apache/flink/pull/6761 Purpose of this change is to better highlight the potential restrictions of the Jython based streaming API to avoid user confusion. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make jython limitations more obvious in documentation > - > > Key: FLINK-10279 > URL: https://issues.apache.org/jira/browse/FLINK-10279 > Project: Flink > Issue Type: Task > Components: Documentation, Python API >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > > The "Python Programming Guide (Streaming) Beta" at > [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/python.html] > does not sufficiently highlight limitations of the API. It should probably > have a prominent disclaimer right at the top stating that this actually isn't > a "Python" API but Jython, which likely means that the user looking for a > solution to run native Python code won't be able to use many important > libraries, which is often the reason to look for Python support in first > place. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tweise commented on issue #6761: [FLINK-10279] [documentation] Make jython limitations more obvious in documentation.
tweise commented on issue #6761: [FLINK-10279] [documentation] Make jython limitations more obvious in documentation. URL: https://github.com/apache/flink/pull/6761#issuecomment-424442918 CC: @StephanEwen @aljoscha @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10279) Make jython limitations more obvious in documentation
[ https://issues.apache.org/jira/browse/FLINK-10279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627726#comment-16627726 ] ASF GitHub Bot commented on FLINK-10279: tweise commented on issue #6761: [FLINK-10279] [documentation] Make jython limitations more obvious in documentation. URL: https://github.com/apache/flink/pull/6761#issuecomment-424442918 CC: @StephanEwen @aljoscha @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make jython limitations more obvious in documentation > - > > Key: FLINK-10279 > URL: https://issues.apache.org/jira/browse/FLINK-10279 > Project: Flink > Issue Type: Task > Components: Documentation, Python API >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > > The "Python Programming Guide (Streaming) Beta" at > [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/python.html] > does not sufficiently highlight limitations of the API. It should probably > have a prominent disclaimer right at the top stating that this actually isn't > a "Python" API but Jython, which likely means that the user looking for a > solution to run native Python code won't be able to use many important > libraries, which is often the reason to look for Python support in first > place. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tweise opened a new pull request #6761: [FLINK-10279] [documentation] Make jython limitations more obvious in documentation.
tweise opened a new pull request #6761: [FLINK-10279] [documentation] Make jython limitations more obvious in documentation. URL: https://github.com/apache/flink/pull/6761 Purpose of this change is to better highlight the potential restrictions of the Jython based streaming API to avoid user confusion. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] asfgit closed pull request #6725: [FLINK-10263] [sql-client] Fix classloader issues in SQL Client
asfgit closed pull request #6725: [FLINK-10263] [sql-client] Fix classloader issues in SQL Client URL: https://github.com/apache/flink/pull/6725 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh b/flink-end-to-end-tests/test-scripts/test_sql_client.sh index b5830725db1..ca0251365e6 100755 --- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh +++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh @@ -212,6 +212,8 @@ tables: type: VARCHAR - name: duplicate_count type: BIGINT + - name: constant +type: VARCHAR connector: type: filesystem path: $RESULT @@ -226,6 +228,8 @@ tables: type: VARCHAR - name: duplicate_count type: BIGINT +- name: constant + type: VARCHAR functions: - name: RegReplace @@ -261,7 +265,7 @@ $FLINK_DIR/bin/sql-client.sh embedded \ read -r -d '' SQL_STATEMENT_2 << EOF INSERT INTO CsvSinkTable - SELECT * + SELECT AvroBothTable.*, RegReplace('Test constant folding.', 'Test', 'Success') AS constant FROM AvroBothTable EOF @@ -285,4 +289,4 @@ for i in {1..10}; do sleep 5 done -check_result_hash "SQLClient" $RESULT "dca08a82cc09f6b19950291dbbef16bb" +check_result_hash "SQLClient" $RESULT "0a1bf8bf716069b7269f575f87a802c0" diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java index 85b3e9265a8..552d0b37dca 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java @@ -75,6 +75,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Supplier; /** * Context for executing table programs. This class caches everything that can be cached across @@ -183,6 +184,19 @@ public EnvironmentInstance createEnvironmentInstance() { return tableSinks; } + /** +* Executes the given supplier using the execution context's classloader as thread classloader. +*/ + public R wrapClassLoader(Supplier supplier) { + final ClassLoader previousClassloader = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(classLoader); + try { + return supplier.get(); + } finally { + Thread.currentThread().setContextClassLoader(previousClassloader); + } + } + // private static CommandLine createCommandLine(Deployment deployment, Options commandLineOptions) { diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java index 3b9e8e99b82..1318043faf1 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java @@ -219,14 +219,16 @@ public TableSchema getTableSchema(SessionContext session, String name) throws Sq @Override public String explainStatement(SessionContext session, String statement) throws SqlExecutionException { - final TableEnvironment tableEnv = getOrCreateExecutionContext(session) + final ExecutionContext context = getOrCreateExecutionContext(session); + final TableEnvironment tableEnv = context .createEnvironmentInstance() .getTableEnvironment(); // translate try { final Table table = createTable(tableEnv, statement); - return tableEnv.explain(table); + // explanation requires an optimization step that might reference UDFs during code compilation + return context.wrapClassLoader(() -> tableEnv.explain(table)); } catch (Throwable t) { // catch everything such that the query does not crash the executor throw new SqlExecutionException("Invalid SQL statement.", t); @@ -242,7 +244,7 @@ public
[jira] [Commented] (FLINK-10263) User-defined function with LITERAL paramters yields CompileException
[ https://issues.apache.org/jira/browse/FLINK-10263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627711#comment-16627711 ] ASF GitHub Bot commented on FLINK-10263: asfgit closed pull request #6725: [FLINK-10263] [sql-client] Fix classloader issues in SQL Client URL: https://github.com/apache/flink/pull/6725 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh b/flink-end-to-end-tests/test-scripts/test_sql_client.sh index b5830725db1..ca0251365e6 100755 --- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh +++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh @@ -212,6 +212,8 @@ tables: type: VARCHAR - name: duplicate_count type: BIGINT + - name: constant +type: VARCHAR connector: type: filesystem path: $RESULT @@ -226,6 +228,8 @@ tables: type: VARCHAR - name: duplicate_count type: BIGINT +- name: constant + type: VARCHAR functions: - name: RegReplace @@ -261,7 +265,7 @@ $FLINK_DIR/bin/sql-client.sh embedded \ read -r -d '' SQL_STATEMENT_2 << EOF INSERT INTO CsvSinkTable - SELECT * + SELECT AvroBothTable.*, RegReplace('Test constant folding.', 'Test', 'Success') AS constant FROM AvroBothTable EOF @@ -285,4 +289,4 @@ for i in {1..10}; do sleep 5 done -check_result_hash "SQLClient" $RESULT "dca08a82cc09f6b19950291dbbef16bb" +check_result_hash "SQLClient" $RESULT "0a1bf8bf716069b7269f575f87a802c0" diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java index 85b3e9265a8..552d0b37dca 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java @@ -75,6 +75,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Supplier; /** * Context for executing table programs. This class caches everything that can be cached across @@ -183,6 +184,19 @@ public EnvironmentInstance createEnvironmentInstance() { return tableSinks; } + /** +* Executes the given supplier using the execution context's classloader as thread classloader. +*/ + public R wrapClassLoader(Supplier supplier) { + final ClassLoader previousClassloader = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(classLoader); + try { + return supplier.get(); + } finally { + Thread.currentThread().setContextClassLoader(previousClassloader); + } + } + // private static CommandLine createCommandLine(Deployment deployment, Options commandLineOptions) { diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java index 3b9e8e99b82..1318043faf1 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java @@ -219,14 +219,16 @@ public TableSchema getTableSchema(SessionContext session, String name) throws Sq @Override public String explainStatement(SessionContext session, String statement) throws SqlExecutionException { - final TableEnvironment tableEnv = getOrCreateExecutionContext(session) + final ExecutionContext context = getOrCreateExecutionContext(session); + final TableEnvironment tableEnv = context .createEnvironmentInstance() .getTableEnvironment(); // translate try { final Table table = createTable(tableEnv, statement); - return tableEnv.explain(table); + // explanation requires an optimization step that might reference UDFs during code compilation + return context.wrapClassLoader(() -> tableEnv.explain(table));
[jira] [Resolved] (FLINK-6813) Add TIMESTAMPDIFF supported in SQL
[ https://issues.apache.org/jira/browse/FLINK-6813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-6813. - Resolution: Fixed Fix Version/s: 1.7.0 Fixed as part of FLINK-6847. > Add TIMESTAMPDIFF supported in SQL > -- > > Key: FLINK-6813 > URL: https://issues.apache.org/jira/browse/FLINK-6813 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Fix For: 1.7.0 > > > TIMESTAMPDIFF(unit,datetime_expr1,datetime_expr2) Returns datetime_expr2 − > datetime_expr1, where datetime_expr1 and datetime_expr2 are date or datetime > expressions. One expression may be a date and the other a datetime; a date > value is treated as a datetime having the time part '00:00:00' where > necessary. The unit for the result (an integer) is given by the unit > argument. The legal values for unit are the same as those listed in the > description of the TIMESTAMPADD() function. > * Syntax > TIMESTAMPDIFF(unit,datetime_expr1,datetime_expr2) > -unit > Is the part of datetime_expr1 and datetime_expr2 that specifies the type of > boundary crossed. > -datetime_expr1 > Is an expression that can be resolved to a time, date. > -datetime_expr2 > Same with startdate. > * Example > SELECT TIMESTAMPDIFF(year, '2015-12-31 23:59:59.999', '2017-01-01 > 00:00:00.000') from tab; --> 2 > * See more: > > [MySQL|https://dev.mysql.com/doc/refman/5.7/en/date-and-time-functions.html#function_timestampdiff] > CALCITE: > {code} > SELECT timestampdiff(YEAR, timestamp '2019-06-01 07:01:11', timestamp > '2020-06-01 07:01:11'),timestampdiff(QUARTER, timestamp '2019-06-01 > 07:01:11', timestamp '2020-06-01 07:01:11'),timestampdiff(MONTH, timestamp > '2019-06-01 07:01:11',timestamp '2020-06-01 07:01:11'),timestampdiff(WEEK, > timestamp '2019-06-01 07:01:11',timestamp '2020-06-01 > 07:01:11'),timestampdiff(DAY, timestamp '2019-06-01 07:01:11',timestamp > '2020-06-01 07:01:11'),timestampdiff(HOUR, timestamp '2019-06-01 > 07:01:11',timestamp '2020-06-01 07:01:11'),timestampdiff(MINUTE, timestamp > '2019-06-01 07:01:11',timestamp '2020-06-01 07:01:11'),timestampdiff(SECOND, > timestamp '2019-06-01 07:01:11',timestamp '2020-06-01 07:01:11') FROM depts; > | 1 | 4 | 12 | **52** | 366| 8784| 527040 | > 31622400 > {code} > MSSQL: > {code} > SELECT > datediff(YEAR, '2019-06-01 07:01:11','2020-06-01 07:01:11'), > datediff(QUARTER, '2019-06-01 07:01:11', '2020-06-01 07:01:11'), > datediff(MONTH, '2019-06-01 07:01:11','2020-06-01 07:01:11'), > datediff(WEEK, '2019-06-01 07:01:11', '2020-06-01 07:01:11'), > datediff(DAY, '2019-06-01 07:01:11','2020-06-01 07:01:11'), > datediff(HOUR, '2019-06-01 07:01:11','2020-06-01 07:01:11'), > datediff(MINUTE, '2019-06-01 07:01:11','2020-06-01 07:01:11'), > datediff(SECOND, '2019-06-01 07:01:11', '2020-06-01 07:01:11') > FROM stu; > |1|4 |12 |**53** |366|8784 |527040 |31622400 > {code} > The differences I have discussed with the calcite community. And find the > reason: > https://stackoverflow.com/questions/26138167/is-timestampdiff-in-mysql-equivalent-to-datediff-in-sql-server. > So, In this JIRA. we will keep consistency with calcite. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-6847) Add TIMESTAMPDIFF supported in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-6847. - Resolution: Fixed Fix Version/s: 1.7.0 Fixed in 1.7.0: c5ce970e781df60eb27b62446853eaa0579c8706 > Add TIMESTAMPDIFF supported in TableAPI > --- > > Key: FLINK-6847 > URL: https://issues.apache.org/jira/browse/FLINK-6847 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Priority: Major > Labels: pull-request-available, starter > Fix For: 1.7.0 > > > see FLINK-6813 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6847) Add TIMESTAMPDIFF supported in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627664#comment-16627664 ] ASF GitHub Bot commented on FLINK-6847: --- asfgit closed pull request #6282: [FLINK-6847][FLINK-6813] [table] TimestampDiff table api and sql support URL: https://github.com/apache/flink/pull/6282 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md index eac6d16eaf8..7d9aee22956 100644 --- a/docs/dev/table/functions.md +++ b/docs/dev/table/functions.md @@ -3176,6 +3176,18 @@ TIMESTAMPADD(unit, interval, timevalue) E.g., TIMESTAMPADD(WEEK, 1, DATE '2003-01-02') returns 2003-01-09. + + + +{% highlight text %} +TIMESTAMPDIFF(unit, timestamp1, timestamp2) +{% endhighlight %} + + +Returns the (signed) number of timeUnit intervals between timestamp1 and timestamp2. The unit for the interval is given by the unit argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, WEEK, MONTH, QUARTER, or YEAR. The unit for the interval could refer Time Interval and Point Unit Specifiers table. E.g. TIMESTAMPDIFF(DAY, TIMESTAMP '2003-01-02 10:00:00', TIMESTAMP '2003-01-03 10:00:00') leads to 1. + + + @@ -3421,6 +3433,18 @@ dateFormat(TIMESTAMP, STRING) E.g., dateFormat(ts, '%Y, %d %M') results in strings formatted as "2017, 05 May". + + + +{% highlight java %} +timestampDiff(TimeIntervalUnit, datetime1, datetime2) +{% endhighlight %} + + +Returns the (signed) number of timeUnit intervals between datetime1 and datetime2. The unit for the interval is given by the unit argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. The unit for the interval could refer Time Interval and Point Unit Specifiers table. E.g. timestampDiff(TimeIntervalUnit.DAY, '2003-01-02 10:00:00'.toTimestamp, '2003-01-03 10:00:00'.toTimestamp) leads to 1. + + + @@ -3666,6 +3690,18 @@ dateFormat(TIMESTAMP, STRING) E.g., dateFormat('ts, "%Y, %d %M") results in strings formatted as "2017, 05 May". + + + +{% highlight scala %} +timestampDiff(TimeIntervalUnit, datetime1, datetime2) +{% endhighlight %} + + +Returns the (signed) number of timeUnit intervals between datetime1 and datetime2. The unit for the interval is given by the unit argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. The unit for the interval could refer Time Interval and Point Unit Specifiers table. E.g. timestampDiff(TimeIntervalUnit.DAY, '2003-01-02 10:00:00'.toTimestamp, '2003-01-03 10:00:00'.toTimestamp) leads to 1. + + + @@ -5319,4 +5355,65 @@ The following table lists specifiers for date format functions. +Time Interval and Point Unit Specifiers +-- + +The following table lists specifiers for time interval and point unit. + + + + + Interval Unit + Point Unit + + + + YEAR + YEAR + + QUARTER + QUARTER + + MONTH + MONTH + + WEEK + WEEK + + DAY + DAY + + HOUR + HOUR + + MINUTE + MINUTE + + SECOND + SECOND + + YEAR_TO_MONTH + MICROSECOND + + DAY_TO_HOUR + MILLISECOND + + + + + DAY_TO_SECOND + + + HOUR_TO_MINUTE + + + HOUR_TO_SECOND + + + MINUTE_TO_SECOND + + + + + {% top %} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala index dfe69cb0411..eb4aad7e40a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala @@ -26,6 +26,7 @@ import org.apache.flink.table.api.{TableException, CurrentRow, CurrentRange, Unb import org.apache.flink.table.expressions.ExpressionUtils.{convertArray, toMilliInterval, toMonthInterval, toRowInterval} import org.apache.flink.table.api.Table import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit +import org.apache.flink.table.expressions.TimePointUnit.TimePointUnit import org.apache.flink.table.expressions._ import org.apache.flink.table.functions.AggregateFunction @@ -1104,6 +1105,34 @@ object dateFormat { } } +/** + * Returns the (signed) number of timeUnit intervals between timestamp1 and timestamp2. + * + * For example
[GitHub] asfgit closed pull request #6282: [FLINK-6847][FLINK-6813] [table] TimestampDiff table api and sql support
asfgit closed pull request #6282: [FLINK-6847][FLINK-6813] [table] TimestampDiff table api and sql support URL: https://github.com/apache/flink/pull/6282 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md index eac6d16eaf8..7d9aee22956 100644 --- a/docs/dev/table/functions.md +++ b/docs/dev/table/functions.md @@ -3176,6 +3176,18 @@ TIMESTAMPADD(unit, interval, timevalue) E.g., TIMESTAMPADD(WEEK, 1, DATE '2003-01-02') returns 2003-01-09. + + + +{% highlight text %} +TIMESTAMPDIFF(unit, timestamp1, timestamp2) +{% endhighlight %} + + +Returns the (signed) number of timeUnit intervals between timestamp1 and timestamp2. The unit for the interval is given by the unit argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, WEEK, MONTH, QUARTER, or YEAR. The unit for the interval could refer Time Interval and Point Unit Specifiers table. E.g. TIMESTAMPDIFF(DAY, TIMESTAMP '2003-01-02 10:00:00', TIMESTAMP '2003-01-03 10:00:00') leads to 1. + + + @@ -3421,6 +3433,18 @@ dateFormat(TIMESTAMP, STRING) E.g., dateFormat(ts, '%Y, %d %M') results in strings formatted as "2017, 05 May". + + + +{% highlight java %} +timestampDiff(TimeIntervalUnit, datetime1, datetime2) +{% endhighlight %} + + +Returns the (signed) number of timeUnit intervals between datetime1 and datetime2. The unit for the interval is given by the unit argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. The unit for the interval could refer Time Interval and Point Unit Specifiers table. E.g. timestampDiff(TimeIntervalUnit.DAY, '2003-01-02 10:00:00'.toTimestamp, '2003-01-03 10:00:00'.toTimestamp) leads to 1. + + + @@ -3666,6 +3690,18 @@ dateFormat(TIMESTAMP, STRING) E.g., dateFormat('ts, "%Y, %d %M") results in strings formatted as "2017, 05 May". + + + +{% highlight scala %} +timestampDiff(TimeIntervalUnit, datetime1, datetime2) +{% endhighlight %} + + +Returns the (signed) number of timeUnit intervals between datetime1 and datetime2. The unit for the interval is given by the unit argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. The unit for the interval could refer Time Interval and Point Unit Specifiers table. E.g. timestampDiff(TimeIntervalUnit.DAY, '2003-01-02 10:00:00'.toTimestamp, '2003-01-03 10:00:00'.toTimestamp) leads to 1. + + + @@ -5319,4 +5355,65 @@ The following table lists specifiers for date format functions. +Time Interval and Point Unit Specifiers +-- + +The following table lists specifiers for time interval and point unit. + + + + + Interval Unit + Point Unit + + + + YEAR + YEAR + + QUARTER + QUARTER + + MONTH + MONTH + + WEEK + WEEK + + DAY + DAY + + HOUR + HOUR + + MINUTE + MINUTE + + SECOND + SECOND + + YEAR_TO_MONTH + MICROSECOND + + DAY_TO_HOUR + MILLISECOND + + + + + DAY_TO_SECOND + + + HOUR_TO_MINUTE + + + HOUR_TO_SECOND + + + MINUTE_TO_SECOND + + + + + {% top %} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala index dfe69cb0411..eb4aad7e40a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala @@ -26,6 +26,7 @@ import org.apache.flink.table.api.{TableException, CurrentRow, CurrentRange, Unb import org.apache.flink.table.expressions.ExpressionUtils.{convertArray, toMilliInterval, toMonthInterval, toRowInterval} import org.apache.flink.table.api.Table import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit +import org.apache.flink.table.expressions.TimePointUnit.TimePointUnit import org.apache.flink.table.expressions._ import org.apache.flink.table.functions.AggregateFunction @@ -1104,6 +1105,34 @@ object dateFormat { } } +/** + * Returns the (signed) number of timeUnit intervals between timestamp1 and timestamp2. + * + * For example timestampDiff(TimeIntervalUnit.DAY, `2016-06-15`.toDate, + * `2016-06-18`.toDate results in integer as 3 + */ +object timestampDiff { + + /** +* Returns the (signed) number of timeUnit intervals between timestamp1 and timestamp2. +* +* For example
[jira] [Issue Comment Deleted] (FLINK-10240) Pluggable scheduling strategy for batch jobs
[ https://issues.apache.org/jira/browse/FLINK-10240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tison updated FLINK-10240: -- Comment: was deleted (was: Introduce pluggable schedule strategy is an excellent idea that could expand a lot the cases Flink is able to handle. I like this idea and can help. However, the document attached above is read-only. So I remain my comments as a link to a copy of it below. Most of them are layout improvements and minor reword, the body of document is no more than the original design. https://docs.google.com/document/d/15pUYc5_yrY2IwmnADCoNWZwOIYCOcroWmuuZHs-vdlU/edit?usp=sharing Note that this is a EDITABLE document and everyone interest on it can remains comments or edit it directly. As an open source software we just trust our contributors and the document could be frozen and left comment-only if the discussion reaches a consensus.) > Pluggable scheduling strategy for batch jobs > > > Key: FLINK-10240 > URL: https://issues.apache.org/jira/browse/FLINK-10240 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination >Reporter: Zhu Zhu >Priority: Major > Labels: scheduling > > Currently batch jobs are scheduled with LAZY_FROM_SOURCES strategy: source > tasks are scheduled in the beginning, and other tasks are scheduled once > there input data are consumable. > However, input data consumable does not always mean the task can work at > once. > > One example is the hash join operation, where the operator first consumes one > side(we call it build side) to setup a table, then consumes the other side(we > call it probe side) to do the real join work. If the probe side is started > early, it just get stuck on back pressure as the join operator will not > consume data from it before the building stage is done, causing a waste of > resources. > If we have the probe side task started after the build stage is done, both > the build and probe side can have more computing resources as they are > staggered. > > That's why we think a flexible scheduling strategy is needed, allowing job > owners to customize the vertex schedule order and constraints. Better > resource utilization usually means better performance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10292) Generate JobGraph in StandaloneJobClusterEntrypoint only once
[ https://issues.apache.org/jira/browse/FLINK-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627538#comment-16627538 ] vinoyang commented on FLINK-10292: -- [~uce] Thank you for verifying, my understanding of "image". Then I give some of my thoughts about the questions you asked before. In fact, I think we can't help but think about running in image(container), and we can't just consider the image scene. In my opinion, there is no essential difference between standalone and running in docker/k8s (virtualization can be seen as a micro-standalone model? Is it also dependent on Zookeeper in the same environment). * Regarding what you said: Killing rather than canceling the scene, the resulting metadata persists, and the same happens in standalone, which can't be avoided, because this is not a normal operation. * Regarding what you said "update the image with a modified application" may cause the new jobGraph to fail to overwrite the old jobgraph, it should not happen. Because this can be seen as a scenario of a job upgrade. In order to achieve this, our first step should be to execute "cancel with savepoint", then the job will be resubmitted (of course we should recognize it and update the metadata of the jobgraph stored in Zookeeper), then restore based on savepoint . My personal opinion is that Flink on k8s does not seem to need to be considered separately on this issue. > Generate JobGraph in StandaloneJobClusterEntrypoint only once > - > > Key: FLINK-10292 > URL: https://issues.apache.org/jira/browse/FLINK-10292 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Fix For: 1.7.0, 1.6.2 > > > Currently the {{StandaloneJobClusterEntrypoint}} generates the {{JobGraph}} > from the given user code every time it starts/is restarted. This can be > problematic if the the {{JobGraph}} generation has side effects. Therefore, > it would be better to generate the {{JobGraph}} only once and store it in HA > storage instead from where to retrieve. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information
[ https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627516#comment-16627516 ] ASF GitHub Bot commented on FLINK-9377: --- dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints URL: https://github.com/apache/flink/pull/6711#discussion_r220213096 ## File path: flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleConfigSnapshot.java ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Objects; + +/** + * A utility {@link TypeSerializerConfigSnapshot} that is used for backwards compatibility purposes. + * + * In older versions of Flink (<= 1.6), we used to write state serializers into checkpoints, along + * with the serializer's configuration snapshot. Since 1.7.0, we no longer wrote the serializers, but + * instead used the configuration snapshot as a factory to instantiate serializers for restoring state. + * However, since some outdated implementations of configuration snapshots did not contain sufficient Review comment: `did` -> `do` I would use present tense here, as you describe current state, don't you? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove writing serializers as part of the checkpoint meta information > - > > Key: FLINK-9377 > URL: https://issues.apache.org/jira/browse/FLINK-9377 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > When writing meta information of a state in savepoints, we currently write > both the state serializer as well as the state serializer's configuration > snapshot. > Writing both is actually redundant, as most of the time they have identical > information. > Moreover, the fact that we use Java serialization to write the serializer > and rely on it to be re-readable on the restore run, already poses problems > for serializers such as the {{AvroSerializer}} (see discussion in FLINK-9202) > to perform even a compatible upgrade. > The proposal here is to leave only the config snapshot as meta information, > and use that as the single source of truth of information about the schema of > serialized state. > The config snapshot should be treated as a factory (or provided to a > factory) to re-create serializers capable of reading old, serialized state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information
[ https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627521#comment-16627521 ] ASF GitHub Bot commented on FLINK-9377: --- dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints URL: https://github.com/apache/flink/pull/6711#discussion_r220212410 ## File path: flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleConfigSnapshot.java ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Objects; + +/** + * A utility {@link TypeSerializerConfigSnapshot} that is used for backwards compatibility purposes. + * + * In older versions of Flink (<= 1.6), we used to write state serializers into checkpoints, along + * with the serializer's configuration snapshot. Since 1.7.0, we no longer wrote the serializers, but + * instead used the configuration snapshot as a factory to instantiate serializers for restoring state. Review comment: `used` -> `use` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove writing serializers as part of the checkpoint meta information > - > > Key: FLINK-9377 > URL: https://issues.apache.org/jira/browse/FLINK-9377 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > When writing meta information of a state in savepoints, we currently write > both the state serializer as well as the state serializer's configuration > snapshot. > Writing both is actually redundant, as most of the time they have identical > information. > Moreover, the fact that we use Java serialization to write the serializer > and rely on it to be re-readable on the restore run, already poses problems > for serializers such as the {{AvroSerializer}} (see discussion in FLINK-9202) > to perform even a compatible upgrade. > The proposal here is to leave only the config snapshot as meta information, > and use that as the single source of truth of information about the schema of > serialized state. > The config snapshot should be treated as a factory (or provided to a > factory) to re-create serializers capable of reading old, serialized state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information
[ https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627518#comment-16627518 ] ASF GitHub Bot commented on FLINK-9377: --- dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints URL: https://github.com/apache/flink/pull/6711#discussion_r220211873 ## File path: flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleConfigSnapshot.java ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Objects; + +/** + * A utility {@link TypeSerializerConfigSnapshot} that is used for backwards compatibility purposes. + * + * In older versions of Flink (<= 1.6), we used to write state serializers into checkpoints, along + * with the serializer's configuration snapshot. Since 1.7.0, we no longer wrote the serializers, but Review comment: `wrote` -> `write` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove writing serializers as part of the checkpoint meta information > - > > Key: FLINK-9377 > URL: https://issues.apache.org/jira/browse/FLINK-9377 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > When writing meta information of a state in savepoints, we currently write > both the state serializer as well as the state serializer's configuration > snapshot. > Writing both is actually redundant, as most of the time they have identical > information. > Moreover, the fact that we use Java serialization to write the serializer > and rely on it to be re-readable on the restore run, already poses problems > for serializers such as the {{AvroSerializer}} (see discussion in FLINK-9202) > to perform even a compatible upgrade. > The proposal here is to leave only the config snapshot as meta information, > and use that as the single source of truth of information about the schema of > serialized state. > The config snapshot should be treated as a factory (or provided to a > factory) to re-create serializers capable of reading old, serialized state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints
dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints URL: https://github.com/apache/flink/pull/6711#discussion_r220211873 ## File path: flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleConfigSnapshot.java ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Objects; + +/** + * A utility {@link TypeSerializerConfigSnapshot} that is used for backwards compatibility purposes. + * + * In older versions of Flink (<= 1.6), we used to write state serializers into checkpoints, along + * with the serializer's configuration snapshot. Since 1.7.0, we no longer wrote the serializers, but Review comment: `wrote` -> `write` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints
dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints URL: https://github.com/apache/flink/pull/6711#discussion_r220203782 ## File path: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java ## @@ -21,35 +21,131 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.core.io.VersionedIOReadableWritable; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.util.Preconditions; +import java.io.IOException; + /** * A {@code TypeSerializerConfigSnapshot} is a point-in-time view of a {@link TypeSerializer's} configuration. - * The configuration snapshot of a serializer is persisted along with checkpoints of the managed state that the - * serializer is registered to. + * The configuration snapshot of a serializer is persisted within checkpoints + * as a single source of meta information about the schema of serialized data in the checkpoint. + * This serves three purposes: + * + * + * Capturing serializer parameters and schema: a serializer's configuration snapshot + * represents information about the parameters, state, and schema of a serializer. + * This is explained in more detail below. * - * The persisted configuration may later on be used by new serializers to ensure serialization compatibility - * for the same managed state. In order for new serializers to be able to ensure this, the configuration snapshot - * should encode sufficient information about: + * Compatibility checks for new serializers: when new serializers are available, + * they need to be checked whether or not they are compatible to read the data written by the previous serializer. + * This is performed by providing the new serializer to the correspondibng serializer configuration Review comment: `correspondibng` -> `corresponding` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints
dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints URL: https://github.com/apache/flink/pull/6711#discussion_r220232529 ## File path: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSchemaCompatibility.java ## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +/** + * A {@code TypeSerializerSchemaCompatibility} represents information about whether or not a {@link TypeSerializer} + * can be safely used to read data written by a previous type serializer. + * + * Typically, the compatibility of the new serializer is resolved by checking it against the snapshotted + * {@link TypeSerializerConfigSnapshot} of the previous serializer. Depending on the type of the + * resolved compatibility result, migration (i.e., reading bytes with the previous serializer and then writing + * it again with the new serializer) may be required before the new serializer can be used. + * + * @see TypeSerializer + * @see TypeSerializerConfigSnapshot#resolveSchemaCompatibility(TypeSerializer) + */ +@PublicEvolving +public class TypeSerializerSchemaCompatibility { + + /** +* Enum for the type of the compatibility. +*/ + enum Type { + + /** This indicates that the new serializer continued to be used as is. */ + COMPATIBLE_AS_IS, + + /** +* This indicates that it is required to reconfigure the new serializer before +* it can be used. The reconfigured serializer should be provided as part of the +* resolved {@link TypeSerializerSchemaCompatibility} result. +*/ + COMPATIBLE_AFTER_RECONFIGURATION, + + /** +* This indicates that it is possible to use the new serializer after performing a +* full-scan migration over all state, by reading bytes with the previous serializer +* and then writing it again with the new serializer, effectively converting the +* serialization schema to correspond to the new serializer. +*/ + COMPATIBLE_AFTER_MIGRATION, + + /** +* This indicates that the new serializer is incompatible, even with migration. +* This normally implies that the deserialized Java class can not be commonly recognized +* by the previous and new serializer. +*/ + INCOMPATIBLE + } + + /** +* The type of the compatibility. +*/ + private final Type resultType; + + /** +* The reconfigured new serializer to use. This is only relevant +* in the case that the type of the compatibility is {@link Type#COMPATIBLE_AFTER_RECONFIGURATION}. +*/ + private final TypeSerializer reconfiguredNewSerializer; + + /** +* Returns a result that indicates that the new serializer is compatible and no migration is required. +* The new serializer can continued to be used as is. +* +* @return a result that indicates migration is not required for the new serializer. +*/ + public static TypeSerializerSchemaCompatibility compatibleAsIs() { + return new TypeSerializerSchemaCompatibility<>(Type.COMPATIBLE_AS_IS, null); + } + + /** +* Returns a result that indicates that no migration is required, but the new serializer had to be +* reconfigured in order for it to be compatible. A reconfigured serializer is provided and +* should be used instead. +* +* @param reconfiguredSerializer the reconfigured new serializer that should be used. +* +* @return a result that indicates migration is not required, but a reconfigured version of the new +* serializer should be used. +*/ + public static
[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information
[ https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627520#comment-16627520 ] ASF GitHub Bot commented on FLINK-9377: --- dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints URL: https://github.com/apache/flink/pull/6711#discussion_r220203782 ## File path: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java ## @@ -21,35 +21,131 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.core.io.VersionedIOReadableWritable; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.util.Preconditions; +import java.io.IOException; + /** * A {@code TypeSerializerConfigSnapshot} is a point-in-time view of a {@link TypeSerializer's} configuration. - * The configuration snapshot of a serializer is persisted along with checkpoints of the managed state that the - * serializer is registered to. + * The configuration snapshot of a serializer is persisted within checkpoints + * as a single source of meta information about the schema of serialized data in the checkpoint. + * This serves three purposes: + * + * + * Capturing serializer parameters and schema: a serializer's configuration snapshot + * represents information about the parameters, state, and schema of a serializer. + * This is explained in more detail below. * - * The persisted configuration may later on be used by new serializers to ensure serialization compatibility - * for the same managed state. In order for new serializers to be able to ensure this, the configuration snapshot - * should encode sufficient information about: + * Compatibility checks for new serializers: when new serializers are available, + * they need to be checked whether or not they are compatible to read the data written by the previous serializer. + * This is performed by providing the new serializer to the correspondibng serializer configuration Review comment: `correspondibng` -> `corresponding` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove writing serializers as part of the checkpoint meta information > - > > Key: FLINK-9377 > URL: https://issues.apache.org/jira/browse/FLINK-9377 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > When writing meta information of a state in savepoints, we currently write > both the state serializer as well as the state serializer's configuration > snapshot. > Writing both is actually redundant, as most of the time they have identical > information. > Moreover, the fact that we use Java serialization to write the serializer > and rely on it to be re-readable on the restore run, already poses problems > for serializers such as the {{AvroSerializer}} (see discussion in FLINK-9202) > to perform even a compatible upgrade. > The proposal here is to leave only the config snapshot as meta information, > and use that as the single source of truth of information about the schema of > serialized state. > The config snapshot should be treated as a factory (or provided to a > factory) to re-create serializers capable of reading old, serialized state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information
[ https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627519#comment-16627519 ] ASF GitHub Bot commented on FLINK-9377: --- dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints URL: https://github.com/apache/flink/pull/6711#discussion_r220232529 ## File path: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSchemaCompatibility.java ## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +/** + * A {@code TypeSerializerSchemaCompatibility} represents information about whether or not a {@link TypeSerializer} + * can be safely used to read data written by a previous type serializer. + * + * Typically, the compatibility of the new serializer is resolved by checking it against the snapshotted + * {@link TypeSerializerConfigSnapshot} of the previous serializer. Depending on the type of the + * resolved compatibility result, migration (i.e., reading bytes with the previous serializer and then writing + * it again with the new serializer) may be required before the new serializer can be used. + * + * @see TypeSerializer + * @see TypeSerializerConfigSnapshot#resolveSchemaCompatibility(TypeSerializer) + */ +@PublicEvolving +public class TypeSerializerSchemaCompatibility { + + /** +* Enum for the type of the compatibility. +*/ + enum Type { + + /** This indicates that the new serializer continued to be used as is. */ + COMPATIBLE_AS_IS, + + /** +* This indicates that it is required to reconfigure the new serializer before +* it can be used. The reconfigured serializer should be provided as part of the +* resolved {@link TypeSerializerSchemaCompatibility} result. +*/ + COMPATIBLE_AFTER_RECONFIGURATION, + + /** +* This indicates that it is possible to use the new serializer after performing a +* full-scan migration over all state, by reading bytes with the previous serializer +* and then writing it again with the new serializer, effectively converting the +* serialization schema to correspond to the new serializer. +*/ + COMPATIBLE_AFTER_MIGRATION, + + /** +* This indicates that the new serializer is incompatible, even with migration. +* This normally implies that the deserialized Java class can not be commonly recognized +* by the previous and new serializer. +*/ + INCOMPATIBLE + } + + /** +* The type of the compatibility. +*/ + private final Type resultType; + + /** +* The reconfigured new serializer to use. This is only relevant +* in the case that the type of the compatibility is {@link Type#COMPATIBLE_AFTER_RECONFIGURATION}. +*/ + private final TypeSerializer reconfiguredNewSerializer; + + /** +* Returns a result that indicates that the new serializer is compatible and no migration is required. +* The new serializer can continued to be used as is. +* +* @return a result that indicates migration is not required for the new serializer. +*/ + public static TypeSerializerSchemaCompatibility compatibleAsIs() { + return new TypeSerializerSchemaCompatibility<>(Type.COMPATIBLE_AS_IS, null); + } + + /** +* Returns a result that indicates that no migration is required, but the new serializer had to be +* reconfigured in order for it to be compatible. A reconfigured serializer is provided and +* should be used instead. +* +* @param reconfiguredSerializer the reconfigured new
[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information
[ https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627517#comment-16627517 ] ASF GitHub Bot commented on FLINK-9377: --- dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints URL: https://github.com/apache/flink/pull/6711#discussion_r220227910 ## File path: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java ## @@ -21,35 +21,131 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.core.io.VersionedIOReadableWritable; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.util.Preconditions; +import java.io.IOException; + /** * A {@code TypeSerializerConfigSnapshot} is a point-in-time view of a {@link TypeSerializer's} configuration. - * The configuration snapshot of a serializer is persisted along with checkpoints of the managed state that the - * serializer is registered to. + * The configuration snapshot of a serializer is persisted within checkpoints + * as a single source of meta information about the schema of serialized data in the checkpoint. + * This serves three purposes: + * + * + * Capturing serializer parameters and schema: a serializer's configuration snapshot + * represents information about the parameters, state, and schema of a serializer. + * This is explained in more detail below. * - * The persisted configuration may later on be used by new serializers to ensure serialization compatibility - * for the same managed state. In order for new serializers to be able to ensure this, the configuration snapshot - * should encode sufficient information about: + * Compatibility checks for new serializers: when new serializers are available, + * they need to be checked whether or not they are compatible to read the data written by the previous serializer. + * This is performed by providing the new serializer to the correspondibng serializer configuration + * snapshots in checkpoints. + * + * Factory for a read serializer when schema conversion is required: in the case that new + * serializers are not compatible to read previous data, a schema conversion process executed across all data + * is required before the new serializer can be continued to be used. This conversion process requires a compatible + * read serializer to restore serialized bytes as objects, and then written back again using the new serializer. + * In this scenario, the serializer configuration snapshots in checkpoints doubles as a factory for the read + * serializer of the conversion process. + * + * + * Serializer Configuration and Schema + * + * Since serializer configuration snapshots needs to be used to ensure serialization compatibility + * for the same managed state as well as serving as a factory for compatible read serializers, the configuration + * snapshot should encode sufficient information about: * * * Parameter settings of the serializer: parameters of the serializer include settings * required to setup the serializer, or the state of the serializer if it is stateful. If the serializer * has nested serializers, then the configuration snapshot should also contain the parameters of the nested * serializers. * - * Serialization schema of the serializer: the data format used by the serializer. + * Serialization schema of the serializer: the binary format used by the serializer, or + * in other words, the schema of data written by the serializer. * * * NOTE: Implementations must contain the default empty nullary constructor. This is required to be able to * deserialize the configuration snapshot from its binary form. + * + * @param The data type that the originating serializer of this configuration serializes. */ @PublicEvolving -public abstract class TypeSerializerConfigSnapshot extends VersionedIOReadableWritable { +public abstract class TypeSerializerConfigSnapshot extends VersionedIOReadableWritable { /** The user code class loader; only relevant if this configuration instance was deserialized from binary form. */ private ClassLoader userCodeClassLoader; + /** +* The originating serializer of this configuration snapshot. +* +* TODO to allow for incrementally adapting the implementation of serializer config snapshot subclasses, +* TODO we currently have a base implementation for the {@link #restoreSerializer()} +* TODO method which simply returns this serializer instance. The serializer is written +* TODO and read using Java serialization as part of reading / writing the config snapshot +*/ + private TypeSerializer
[GitHub] dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints
dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints URL: https://github.com/apache/flink/pull/6711#discussion_r220212410 ## File path: flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleConfigSnapshot.java ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Objects; + +/** + * A utility {@link TypeSerializerConfigSnapshot} that is used for backwards compatibility purposes. + * + * In older versions of Flink (<= 1.6), we used to write state serializers into checkpoints, along + * with the serializer's configuration snapshot. Since 1.7.0, we no longer wrote the serializers, but + * instead used the configuration snapshot as a factory to instantiate serializers for restoring state. Review comment: `used` -> `use` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints
dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints URL: https://github.com/apache/flink/pull/6711#discussion_r220213096 ## File path: flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleConfigSnapshot.java ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Objects; + +/** + * A utility {@link TypeSerializerConfigSnapshot} that is used for backwards compatibility purposes. + * + * In older versions of Flink (<= 1.6), we used to write state serializers into checkpoints, along + * with the serializer's configuration snapshot. Since 1.7.0, we no longer wrote the serializers, but + * instead used the configuration snapshot as a factory to instantiate serializers for restoring state. + * However, since some outdated implementations of configuration snapshots did not contain sufficient Review comment: `did` -> `do` I would use present tense here, as you describe current state, don't you? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints
dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints URL: https://github.com/apache/flink/pull/6711#discussion_r220227910 ## File path: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java ## @@ -21,35 +21,131 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.core.io.VersionedIOReadableWritable; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.util.Preconditions; +import java.io.IOException; + /** * A {@code TypeSerializerConfigSnapshot} is a point-in-time view of a {@link TypeSerializer's} configuration. - * The configuration snapshot of a serializer is persisted along with checkpoints of the managed state that the - * serializer is registered to. + * The configuration snapshot of a serializer is persisted within checkpoints + * as a single source of meta information about the schema of serialized data in the checkpoint. + * This serves three purposes: + * + * + * Capturing serializer parameters and schema: a serializer's configuration snapshot + * represents information about the parameters, state, and schema of a serializer. + * This is explained in more detail below. * - * The persisted configuration may later on be used by new serializers to ensure serialization compatibility - * for the same managed state. In order for new serializers to be able to ensure this, the configuration snapshot - * should encode sufficient information about: + * Compatibility checks for new serializers: when new serializers are available, + * they need to be checked whether or not they are compatible to read the data written by the previous serializer. + * This is performed by providing the new serializer to the correspondibng serializer configuration + * snapshots in checkpoints. + * + * Factory for a read serializer when schema conversion is required: in the case that new + * serializers are not compatible to read previous data, a schema conversion process executed across all data + * is required before the new serializer can be continued to be used. This conversion process requires a compatible + * read serializer to restore serialized bytes as objects, and then written back again using the new serializer. + * In this scenario, the serializer configuration snapshots in checkpoints doubles as a factory for the read + * serializer of the conversion process. + * + * + * Serializer Configuration and Schema + * + * Since serializer configuration snapshots needs to be used to ensure serialization compatibility + * for the same managed state as well as serving as a factory for compatible read serializers, the configuration + * snapshot should encode sufficient information about: * * * Parameter settings of the serializer: parameters of the serializer include settings * required to setup the serializer, or the state of the serializer if it is stateful. If the serializer * has nested serializers, then the configuration snapshot should also contain the parameters of the nested * serializers. * - * Serialization schema of the serializer: the data format used by the serializer. + * Serialization schema of the serializer: the binary format used by the serializer, or + * in other words, the schema of data written by the serializer. * * * NOTE: Implementations must contain the default empty nullary constructor. This is required to be able to * deserialize the configuration snapshot from its binary form. + * + * @param The data type that the originating serializer of this configuration serializes. */ @PublicEvolving -public abstract class TypeSerializerConfigSnapshot extends VersionedIOReadableWritable { +public abstract class TypeSerializerConfigSnapshot extends VersionedIOReadableWritable { /** The user code class loader; only relevant if this configuration instance was deserialized from binary form. */ private ClassLoader userCodeClassLoader; + /** +* The originating serializer of this configuration snapshot. +* +* TODO to allow for incrementally adapting the implementation of serializer config snapshot subclasses, +* TODO we currently have a base implementation for the {@link #restoreSerializer()} +* TODO method which simply returns this serializer instance. The serializer is written +* TODO and read using Java serialization as part of reading / writing the config snapshot +*/ + private TypeSerializer serializer; + + /** +* Creates a serializer using this configuration, that is capable of reading data +* written by the serializer described by this configuration. +* +* @return the restored serializer. +
[jira] [Commented] (FLINK-10386) Remove legacy class TaskExecutionStateListener
[ https://issues.apache.org/jira/browse/FLINK-10386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627509#comment-16627509 ] ASF GitHub Bot commented on FLINK-10386: TisonKun commented on a change in pull request #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener URL: https://github.com/apache/flink/pull/6729#discussion_r220238017 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ## @@ -429,7 +413,7 @@ public void testFailExternallyDuringInvoke() { awaitLatch.await(); task.failExternally(new Exception("test")); - assertTrue(task.getExecutionState() == ExecutionState.FAILED); Review comment: FYI [FLINK-10426](https://issues.apache.org/jira/browse/FLINK-10426), you can take over it if have idea of that anytime before I start processing it :-) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove legacy class TaskExecutionStateListener > -- > > Key: FLINK-10386 > URL: https://issues.apache.org/jira/browse/FLINK-10386 > Project: Flink > Issue Type: Sub-task > Components: TaskManager >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > After a discussion > [here|https://github.com/apache/flink/commit/0735b5b935b0c0757943e2d58047afcfb9949560#commitcomment-30584257] > with [~trohrm...@apache.org]. I start to analyze the usage of > {{ActorGatewayTaskExecutionStateListener}} and {{TaskExecutionStateListener}}. > In conclusion, we abort {{TaskExecutionStateListener}} strategy and no any > component rely on it. Instead, we introduce {{TaskManagerActions}} to take > the role for the communication of {{Task}} with {{TaskManager}}. No one > except {{TaskManager}} should directly communicate with {{Task}}. So it can > be safely remove legacy class {{TaskExecutionStateListener}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on a change in pull request #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener
TisonKun commented on a change in pull request #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener URL: https://github.com/apache/flink/pull/6729#discussion_r220238017 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ## @@ -429,7 +413,7 @@ public void testFailExternallyDuringInvoke() { awaitLatch.await(); task.failExternally(new Exception("test")); - assertTrue(task.getExecutionState() == ExecutionState.FAILED); Review comment: FYI [FLINK-10426](https://issues.apache.org/jira/browse/FLINK-10426), you can take over it if have idea of that anytime before I start processing it :-) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10386) Remove legacy class TaskExecutionStateListener
[ https://issues.apache.org/jira/browse/FLINK-10386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tison updated FLINK-10386: -- Issue Type: Sub-task (was: Improvement) Parent: FLINK-10392 > Remove legacy class TaskExecutionStateListener > -- > > Key: FLINK-10386 > URL: https://issues.apache.org/jira/browse/FLINK-10386 > Project: Flink > Issue Type: Sub-task > Components: TaskManager >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > After a discussion > [here|https://github.com/apache/flink/commit/0735b5b935b0c0757943e2d58047afcfb9949560#commitcomment-30584257] > with [~trohrm...@apache.org]. I start to analyze the usage of > {{ActorGatewayTaskExecutionStateListener}} and {{TaskExecutionStateListener}}. > In conclusion, we abort {{TaskExecutionStateListener}} strategy and no any > component rely on it. Instead, we introduce {{TaskManagerActions}} to take > the role for the communication of {{Task}} with {{TaskManager}}. No one > except {{TaskManager}} should directly communicate with {{Task}}. So it can > be safely remove legacy class {{TaskExecutionStateListener}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10426) Port TaskTest to new code base
tison created FLINK-10426: - Summary: Port TaskTest to new code base Key: FLINK-10426 URL: https://issues.apache.org/jira/browse/FLINK-10426 Project: Flink Issue Type: Sub-task Components: Tests Affects Versions: 1.7.0 Reporter: tison Assignee: tison Fix For: 1.7.0 Port {{TaskTest}} to new code base -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10425) taskmaster.host is not respected
[ https://issues.apache.org/jira/browse/FLINK-10425?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-10425: Assignee: vinoyang > taskmaster.host is not respected > > > Key: FLINK-10425 > URL: https://issues.apache.org/jira/browse/FLINK-10425 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.6.1 >Reporter: Andrew Kowpak >Assignee: vinoyang >Priority: Major > > The documentation states that taskmanager.host can be set to override the > discovered hostname, however, setting this value has no effect. > Looking at the code, the value never seems to be used. Instead, the > deprecated taskmanager.hostname is still used. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10386) Remove legacy class TaskExecutionStateListener
[ https://issues.apache.org/jira/browse/FLINK-10386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627498#comment-16627498 ] ASF GitHub Bot commented on FLINK-10386: TisonKun commented on a change in pull request #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener URL: https://github.com/apache/flink/pull/6729#discussion_r220236164 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ## @@ -429,7 +413,7 @@ public void testFailExternallyDuringInvoke() { awaitLatch.await(); task.failExternally(new Exception("test")); - assertTrue(task.getExecutionState() == ExecutionState.FAILED); Review comment: @Clark Thanks for your review! Please see also my first comment. `TaskTest` is a test based on legacy code, would be ported to FLIP-6 codebase soon as a sub task of FLINK-10392. All changes under `TaskTest` is temporary. Once the porting job done and if this PR doesn't get merged then, I would do a rebase. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove legacy class TaskExecutionStateListener > -- > > Key: FLINK-10386 > URL: https://issues.apache.org/jira/browse/FLINK-10386 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > After a discussion > [here|https://github.com/apache/flink/commit/0735b5b935b0c0757943e2d58047afcfb9949560#commitcomment-30584257] > with [~trohrm...@apache.org]. I start to analyze the usage of > {{ActorGatewayTaskExecutionStateListener}} and {{TaskExecutionStateListener}}. > In conclusion, we abort {{TaskExecutionStateListener}} strategy and no any > component rely on it. Instead, we introduce {{TaskManagerActions}} to take > the role for the communication of {{Task}} with {{TaskManager}}. No one > except {{TaskManager}} should directly communicate with {{Task}}. So it can > be safely remove legacy class {{TaskExecutionStateListener}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on a change in pull request #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener
TisonKun commented on a change in pull request #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener URL: https://github.com/apache/flink/pull/6729#discussion_r220236164 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ## @@ -429,7 +413,7 @@ public void testFailExternallyDuringInvoke() { awaitLatch.await(); task.failExternally(new Exception("test")); - assertTrue(task.getExecutionState() == ExecutionState.FAILED); Review comment: @Clark Thanks for your review! Please see also my first comment. `TaskTest` is a test based on legacy code, would be ported to FLIP-6 codebase soon as a sub task of FLINK-10392. All changes under `TaskTest` is temporary. Once the porting job done and if this PR doesn't get merged then, I would do a rebase. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10386) Remove legacy class TaskExecutionStateListener
[ https://issues.apache.org/jira/browse/FLINK-10386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627496#comment-16627496 ] ASF GitHub Bot commented on FLINK-10386: TisonKun commented on a change in pull request #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener URL: https://github.com/apache/flink/pull/6729#discussion_r220236164 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ## @@ -429,7 +413,7 @@ public void testFailExternallyDuringInvoke() { awaitLatch.await(); task.failExternally(new Exception("test")); - assertTrue(task.getExecutionState() == ExecutionState.FAILED); Review comment: @Clark Thanks for your review! Please see also my first comment. `TaskTest` is a test based on legacy code, would be ported to FLIP-6 codebase soon as a sub task of FLINK-10392. All changes under `TaskTest` is temporary. Once the porting job done and if this PR doesn't get merged, I would do a rebase. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove legacy class TaskExecutionStateListener > -- > > Key: FLINK-10386 > URL: https://issues.apache.org/jira/browse/FLINK-10386 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > After a discussion > [here|https://github.com/apache/flink/commit/0735b5b935b0c0757943e2d58047afcfb9949560#commitcomment-30584257] > with [~trohrm...@apache.org]. I start to analyze the usage of > {{ActorGatewayTaskExecutionStateListener}} and {{TaskExecutionStateListener}}. > In conclusion, we abort {{TaskExecutionStateListener}} strategy and no any > component rely on it. Instead, we introduce {{TaskManagerActions}} to take > the role for the communication of {{Task}} with {{TaskManager}}. No one > except {{TaskManager}} should directly communicate with {{Task}}. So it can > be safely remove legacy class {{TaskExecutionStateListener}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on a change in pull request #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener
TisonKun commented on a change in pull request #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener URL: https://github.com/apache/flink/pull/6729#discussion_r220236164 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ## @@ -429,7 +413,7 @@ public void testFailExternallyDuringInvoke() { awaitLatch.await(); task.failExternally(new Exception("test")); - assertTrue(task.getExecutionState() == ExecutionState.FAILED); Review comment: @Clark Thanks for your review! Please see also my first comment. `TaskTest` is a test based on legacy code, would be ported to FLIP-6 codebase soon as a sub task of FLINK-10392. All changes under `TaskTest` is temporary. Once the porting job done and if this PR doesn't get merged, I would do a rebase. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10425) taskmaster.host is not respected
Andrew Kowpak created FLINK-10425: - Summary: taskmaster.host is not respected Key: FLINK-10425 URL: https://issues.apache.org/jira/browse/FLINK-10425 Project: Flink Issue Type: Bug Components: TaskManager Affects Versions: 1.6.1 Reporter: Andrew Kowpak The documentation states that taskmanager.host can be set to override the discovered hostname, however, setting this value has no effect. Looking at the code, the value never seems to be used. Instead, the deprecated taskmanager.hostname is still used. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10402) Port AbstractTaskManagerProcessFailureRecoveryTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-10402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627489#comment-16627489 ] ASF GitHub Bot commented on FLINK-10402: StefanRRichter commented on a change in pull request #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base URL: https://github.com/apache/flink/pull/6750#discussion_r220230749 ## File path: flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java ## @@ -173,10 +140,6 @@ public void testTaskManagerProcessFailure() throws Exception { taskManagerProcess2 = new ProcessBuilder(command).start(); new CommonTestUtils.PipeForwarder(taskManagerProcess2.getErrorStream(), processOutput2); - // we wait for the JobManager to have the two TaskManagers available - // since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes) - waitUntilNumTaskManagersAreRegistered(jmActor, 2, 12); Review comment: We can skip this waiting part now because it is a standalone session cluster? Just double checking. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Port AbstractTaskManagerProcessFailureRecoveryTest to new code base > --- > > Key: FLINK-10402 > URL: https://issues.apache.org/jira/browse/FLINK-10402 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Port {{AbstractTaskManagerProcessFailureRecoveryTest}} to new code base. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10402) Port AbstractTaskManagerProcessFailureRecoveryTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-10402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627488#comment-16627488 ] ASF GitHub Bot commented on FLINK-10402: StefanRRichter commented on a change in pull request #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base URL: https://github.com/apache/flink/pull/6750#discussion_r220226695 ## File path: flink-tests/src/test/resources/log4j-test.properties ## @@ -18,7 +18,7 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes -log4j.rootLogger=OFF, testlogger +log4j.rootLogger=INFO, testlogger Review comment: I assume this change was not intentional and should be reverted? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Port AbstractTaskManagerProcessFailureRecoveryTest to new code base > --- > > Key: FLINK-10402 > URL: https://issues.apache.org/jira/browse/FLINK-10402 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Port {{AbstractTaskManagerProcessFailureRecoveryTest}} to new code base. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10402) Port AbstractTaskManagerProcessFailureRecoveryTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-10402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627487#comment-16627487 ] ASF GitHub Bot commented on FLINK-10402: StefanRRichter commented on a change in pull request #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base URL: https://github.com/apache/flink/pull/6750#discussion_r220230036 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ## @@ -397,7 +397,7 @@ private Configuration generateClusterConfiguration(Configuration configuration) return resultConfiguration; } - private CompletableFuture shutDownAsync( + public CompletableFuture shutDownAsync( Review comment: This method should be moved away from the comment section declaring "internal methods" in this class. Or even better, instead of changing the visibility of methods in this class, couldn't you expose them only through `public static` helper methods in a class that is only in the test package like `ClusterEntryPointTestUtils.startCluster(ClusterEntryPoint ep)`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Port AbstractTaskManagerProcessFailureRecoveryTest to new code base > --- > > Key: FLINK-10402 > URL: https://issues.apache.org/jira/browse/FLINK-10402 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Port {{AbstractTaskManagerProcessFailureRecoveryTest}} to new code base. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter commented on a change in pull request #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base
StefanRRichter commented on a change in pull request #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base URL: https://github.com/apache/flink/pull/6750#discussion_r220230036 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ## @@ -397,7 +397,7 @@ private Configuration generateClusterConfiguration(Configuration configuration) return resultConfiguration; } - private CompletableFuture shutDownAsync( + public CompletableFuture shutDownAsync( Review comment: This method should be moved away from the comment section declaring "internal methods" in this class. Or even better, instead of changing the visibility of methods in this class, couldn't you expose them only through `public static` helper methods in a class that is only in the test package like `ClusterEntryPointTestUtils.startCluster(ClusterEntryPoint ep)`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base
StefanRRichter commented on a change in pull request #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base URL: https://github.com/apache/flink/pull/6750#discussion_r220230749 ## File path: flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java ## @@ -173,10 +140,6 @@ public void testTaskManagerProcessFailure() throws Exception { taskManagerProcess2 = new ProcessBuilder(command).start(); new CommonTestUtils.PipeForwarder(taskManagerProcess2.getErrorStream(), processOutput2); - // we wait for the JobManager to have the two TaskManagers available - // since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes) - waitUntilNumTaskManagersAreRegistered(jmActor, 2, 12); Review comment: We can skip this waiting part now because it is a standalone session cluster? Just double checking. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base
StefanRRichter commented on a change in pull request #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base URL: https://github.com/apache/flink/pull/6750#discussion_r220226695 ## File path: flink-tests/src/test/resources/log4j-test.properties ## @@ -18,7 +18,7 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes -log4j.rootLogger=OFF, testlogger +log4j.rootLogger=INFO, testlogger Review comment: I assume this change was not intentional and should be reverted? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10386) Remove legacy class TaskExecutionStateListener
[ https://issues.apache.org/jira/browse/FLINK-10386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627477#comment-16627477 ] ASF GitHub Bot commented on FLINK-10386: Clark commented on a change in pull request #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener URL: https://github.com/apache/flink/pull/6729#discussion_r220221550 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ## @@ -429,7 +413,7 @@ public void testFailExternallyDuringInvoke() { awaitLatch.await(); task.failExternally(new Exception("test")); - assertTrue(task.getExecutionState() == ExecutionState.FAILED); Review comment: Is it necessary to change here? Seems like it's works the same for enum. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove legacy class TaskExecutionStateListener > -- > > Key: FLINK-10386 > URL: https://issues.apache.org/jira/browse/FLINK-10386 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > After a discussion > [here|https://github.com/apache/flink/commit/0735b5b935b0c0757943e2d58047afcfb9949560#commitcomment-30584257] > with [~trohrm...@apache.org]. I start to analyze the usage of > {{ActorGatewayTaskExecutionStateListener}} and {{TaskExecutionStateListener}}. > In conclusion, we abort {{TaskExecutionStateListener}} strategy and no any > component rely on it. Instead, we introduce {{TaskManagerActions}} to take > the role for the communication of {{Task}} with {{TaskManager}}. No one > except {{TaskManager}} should directly communicate with {{Task}}. So it can > be safely remove legacy class {{TaskExecutionStateListener}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10386) Remove legacy class TaskExecutionStateListener
[ https://issues.apache.org/jira/browse/FLINK-10386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627478#comment-16627478 ] ASF GitHub Bot commented on FLINK-10386: Clark commented on a change in pull request #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener URL: https://github.com/apache/flink/pull/6729#discussion_r220230614 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ## @@ -846,6 +817,23 @@ public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable // Test Utilities // + private static void waitUntilExecutionState(Task task, ExecutionState exceptedState, Deadline deadline) { + while (deadline.hasTimeLeft()) { + if (exceptedState == task.getExecutionState()) { + return; + } + + try { + Thread.sleep(Math.min(deadline.timeLeft().toMillis(), 200)); Review comment: Just a little reminder, using Thread.sleep() in Junit Test thread sometimes may leads to some strange behavior. I am not sure if it is safe here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove legacy class TaskExecutionStateListener > -- > > Key: FLINK-10386 > URL: https://issues.apache.org/jira/browse/FLINK-10386 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > After a discussion > [here|https://github.com/apache/flink/commit/0735b5b935b0c0757943e2d58047afcfb9949560#commitcomment-30584257] > with [~trohrm...@apache.org]. I start to analyze the usage of > {{ActorGatewayTaskExecutionStateListener}} and {{TaskExecutionStateListener}}. > In conclusion, we abort {{TaskExecutionStateListener}} strategy and no any > component rely on it. Instead, we introduce {{TaskManagerActions}} to take > the role for the communication of {{Task}} with {{TaskManager}}. No one > except {{TaskManager}} should directly communicate with {{Task}}. So it can > be safely remove legacy class {{TaskExecutionStateListener}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Clarkkkkk commented on a change in pull request #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener
Clark commented on a change in pull request #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener URL: https://github.com/apache/flink/pull/6729#discussion_r220230614 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ## @@ -846,6 +817,23 @@ public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable // Test Utilities // + private static void waitUntilExecutionState(Task task, ExecutionState exceptedState, Deadline deadline) { + while (deadline.hasTimeLeft()) { + if (exceptedState == task.getExecutionState()) { + return; + } + + try { + Thread.sleep(Math.min(deadline.timeLeft().toMillis(), 200)); Review comment: Just a little reminder, using Thread.sleep() in Junit Test thread sometimes may leads to some strange behavior. I am not sure if it is safe here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Clarkkkkk commented on a change in pull request #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener
Clark commented on a change in pull request #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener URL: https://github.com/apache/flink/pull/6729#discussion_r220221550 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ## @@ -429,7 +413,7 @@ public void testFailExternallyDuringInvoke() { awaitLatch.await(); task.failExternally(new Exception("test")); - assertTrue(task.getExecutionState() == ExecutionState.FAILED); Review comment: Is it necessary to change here? Seems like it's works the same for enum. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10401) Port ProcessFailureCancelingITCase to new code base
[ https://issues.apache.org/jira/browse/FLINK-10401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627467#comment-16627467 ] ASF GitHub Bot commented on FLINK-10401: StefanRRichter commented on a change in pull request #6749: [FLINK-10401] Port ProcessFailureCancelingITCase to new code base URL: https://github.com/apache/flink/pull/6749#discussion_r220205505 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterComponent.java ## @@ -130,6 +131,13 @@ public T getDispatcher() { } } + @VisibleForTesting + public WebMonitorEndpoint getWebMonitorEndpoint() { Review comment: Coming back to my longer comment on #6749, I think this is only required because the collaborator is lazily created in a lifecycle method. I wonder if we could not have a building object the will create all collaborators at the place where currently `startComponent` is called because `clusterComponent = createClusterComponent(configuration)` and `clusterComponent.startComponent(...)` are even called in a straight sequence. Tests could then simply pass WebMonitorEndpoint into the component. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Port ProcessFailureCancelingITCase to new code base > --- > > Key: FLINK-10401 > URL: https://issues.apache.org/jira/browse/FLINK-10401 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Port {{ProcessFailureCancelingITCase}} to new code base. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter commented on a change in pull request #6749: [FLINK-10401] Port ProcessFailureCancelingITCase to new code base
StefanRRichter commented on a change in pull request #6749: [FLINK-10401] Port ProcessFailureCancelingITCase to new code base URL: https://github.com/apache/flink/pull/6749#discussion_r220205505 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterComponent.java ## @@ -130,6 +131,13 @@ public T getDispatcher() { } } + @VisibleForTesting + public WebMonitorEndpoint getWebMonitorEndpoint() { Review comment: Coming back to my longer comment on #6749, I think this is only required because the collaborator is lazily created in a lifecycle method. I wonder if we could not have a building object the will create all collaborators at the place where currently `startComponent` is called because `clusterComponent = createClusterComponent(configuration)` and `clusterComponent.startComponent(...)` are even called in a straight sequence. Tests could then simply pass WebMonitorEndpoint into the component. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10422) Follow AWS specs in Kinesis Consumer
[ https://issues.apache.org/jira/browse/FLINK-10422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627458#comment-16627458 ] ASF GitHub Bot commented on FLINK-10422: EugeneYushin opened a new pull request #6760: [FLINK-10422] Follow AWS specs in Kinesis Consumer URL: https://github.com/apache/flink/pull/6760 ## What is the purpose of the change Inline Flink Kinesis connector with AWS specs related to shard id conventions. Related Jira story: https://issues.apache.org/jira/browse/FLINK-10422 Mailing list conversation: https://lists.apache.org/thread.html/96de3bac9761564767cf283b58d664f5ae1b076e0c4431620552af5b@%3Cdev.flink.apache.org%3E ## Brief change log - Remove custom ShardId comparator logic as it's redundant, rely on AWS client libs to get shard list with exclusive start shard id - Remove test related cleaning in production code - Add tests to check correct shards a returned on second run of `listShards` method for 2 cases: new shards are present, no new shards ## Verifying this change This change added tests and can be verified as follows: - Run `testGetShardListWithNewShardsOnSecondRun/testGetShardWithNoNewShards` unit tests in `KinesisProxyTest.class` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Follow AWS specs in Kinesis Consumer > - > > Key: FLINK-10422 > URL: https://issues.apache.org/jira/browse/FLINK-10422 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.6.1 >Reporter: eugen yushin >Priority: Major > Labels: pull-request-available > > *Related conversation in mailing list:* > [https://lists.apache.org/thread.html/96de3bac9761564767cf283b58d664f5ae1b076e0c4431620552af5b@%3Cdev.flink.apache.org%3E] > *Summary:* > Flink Kinesis consumer checks shards id for a particular pattern: > {noformat} > "^shardId-\\d{12}" > {noformat} > [https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle.java#L132] > While this inlines with current Kinesis streams server implementation (all > streams follows this pattern), it confronts with AWS docs: > > {code:java} > ShardId > The unique identifier of the shard within the stream. > Type: String > Length Constraints: Minimum length of 1. Maximum length of 128. > Pattern: [a-zA-Z0-9_.-]+ > Required: Yes > {code} > > [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Shard.html] > *Intention:* > We have no guarantees and can't rely on patterns other than provided in AWS > manifest. > Any custom implementation of Kinesis mock should rely on AWS manifest which > claims ShardID to be alfanums. This prevents anyone to use Flink with such > kind of mocks. > The reason behind the scene to use particular pattern "^shardId-d12" is to > create Flink's custom Shard comparator, filter already seen shards, and pass > latest shard for client.listShards only to limit the scope for RPC call to > AWS. > In the meantime, I think we can get rid of this logic at all. The current > usage in project is: > - fix Kinesalite bug (I've already opened an issue to cover this: > [https://github.com/mhart/kinesalite/issues/76] and opened PR: > [https://github.com/mhart/kinesalite/pull/77]). We can move this logic to > test code base to keep production code clean for now > > [https://github.com/apache/flink/blob/50d076ab6ad325907690a2c115ee2cb1c45775c9/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java#L464] > - adjust last seen shard id. We can simply omit this cause' AWS client won't > return already seen shards and we will have new ids only or nothing. >
[GitHub] EugeneYushin opened a new pull request #6760: [FLINK-10422] Follow AWS specs in Kinesis Consumer
EugeneYushin opened a new pull request #6760: [FLINK-10422] Follow AWS specs in Kinesis Consumer URL: https://github.com/apache/flink/pull/6760 ## What is the purpose of the change Inline Flink Kinesis connector with AWS specs related to shard id conventions. Related Jira story: https://issues.apache.org/jira/browse/FLINK-10422 Mailing list conversation: https://lists.apache.org/thread.html/96de3bac9761564767cf283b58d664f5ae1b076e0c4431620552af5b@%3Cdev.flink.apache.org%3E ## Brief change log - Remove custom ShardId comparator logic as it's redundant, rely on AWS client libs to get shard list with exclusive start shard id - Remove test related cleaning in production code - Add tests to check correct shards a returned on second run of `listShards` method for 2 cases: new shards are present, no new shards ## Verifying this change This change added tests and can be verified as follows: - Run `testGetShardListWithNewShardsOnSecondRun/testGetShardWithNoNewShards` unit tests in `KinesisProxyTest.class` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10424) Inconsistency between JsonSchemaConveerter and FlinkTypeFactory
[ https://issues.apache.org/jira/browse/FLINK-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627440#comment-16627440 ] Dominik Wosiński commented on FLINK-10424: -- Two possible ways of solving this issue are possible: - allow using _BigInteger_ Type Info in _FlinkTypeFactory_ _-_ change _JsonSchemaConverter,_ so it returns Integer Type Info instead. My opinion is that the _FlinkTypeFactory_ should be changed to stay consistent since _JsonSchemaConverter_ does return BigDecimal for __the _number_. So returning BigInteger for _integer_ is more consistent with the current behavior and more natural, > Inconsistency between JsonSchemaConveerter and FlinkTypeFactory > --- > > Key: FLINK-10424 > URL: https://issues.apache.org/jira/browse/FLINK-10424 > Project: Flink > Issue Type: Bug >Affects Versions: 1.6.0 >Reporter: Dominik Wosiński >Assignee: Dominik Wosiński >Priority: Major > > There is still an inconsistency between _JsonSchemaConverter_ and > _FlinkTypeFactory_ in case of using JsonSchema with _integer_ type field. > _JsonSchemaConverter_ will return BigInteger Type Information for _integer_, > but _FlinkTypeFactory_ currently does not support BigInteger Type Info and > thus an exception will be thrown. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10424) Inconsistency between JsonSchemaConveerter and FlinkTypeFactory
[ https://issues.apache.org/jira/browse/FLINK-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dominik Wosiński updated FLINK-10424: - Description: There is still an inconsistency between _JsonSchemaConverter_ and _FlinkTypeFactory_ in case of using JsonSchema with _integer_ type field. _JsonSchemaConverter_ will return BigInteger Type Information for _integer_, but _FlinkTypeFactory_ currently does not support BigInteger Type Info and thus an exception will be thrown. was: There is still an inconsistency between _JsonSchemaConverter_ and _FlinkTypeFactory_ in case of using JsonSchema with _integer_ type field. _JsonSchemaConverter_ will return BigInteger Type Information for _integer_, but _FlinkTypeFactory_ currently does not support BigInteger Type Info and thus an exception will be thrown. Two possible ways of solving this issue are possible: - allow using _BigInteger_ Type Info in _FlinkTypeFactory_ _-_ change _JsonSchemaConverter,_ so it returns Integer Type Info instead. IMHO, the changes should be made in _FlinkTypeFactory._ > Inconsistency between JsonSchemaConveerter and FlinkTypeFactory > --- > > Key: FLINK-10424 > URL: https://issues.apache.org/jira/browse/FLINK-10424 > Project: Flink > Issue Type: Bug >Affects Versions: 1.6.0 >Reporter: Dominik Wosiński >Assignee: Dominik Wosiński >Priority: Major > > There is still an inconsistency between _JsonSchemaConverter_ and > _FlinkTypeFactory_ in case of using JsonSchema with _integer_ type field. > _JsonSchemaConverter_ will return BigInteger Type Information for _integer_, > but _FlinkTypeFactory_ currently does not support BigInteger Type Info and > thus an exception will be thrown. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10424) Inconsistency between JsonSchemaConveerter and FlinkTypeFactory
Dominik Wosiński created FLINK-10424: Summary: Inconsistency between JsonSchemaConveerter and FlinkTypeFactory Key: FLINK-10424 URL: https://issues.apache.org/jira/browse/FLINK-10424 Project: Flink Issue Type: Bug Affects Versions: 1.6.0 Reporter: Dominik Wosiński Assignee: Dominik Wosiński There is still an inconsistency between _JsonSchemaConverter_ and _FlinkTypeFactory_ in case of using JsonSchema with _integer_ type field. _JsonSchemaConverter_ will return BigInteger Type Information for _integer_, but _FlinkTypeFactory_ currently does not support BigInteger Type Info and thus an exception will be thrown. Two possible ways of solving this issue are possible: - allow using _BigInteger_ Type Info in _FlinkTypeFactory_ _-_ change _JsonSchemaConverter,_ so it returns Integer Type Info instead. IMHO, the changes should be made in _FlinkTypeFactory._ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10379) Can not use Table Functions in Java Table API
[ https://issues.apache.org/jira/browse/FLINK-10379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627432#comment-16627432 ] ASF GitHub Bot commented on FLINK-10379: hequn8128 commented on issue #6744: [FLINK-10379][docs,table] Fix Table Function docs URL: https://github.com/apache/flink/pull/6744#issuecomment-424360728 +1 to merge :-) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Can not use Table Functions in Java Table API > - > > Key: FLINK-10379 > URL: https://issues.apache.org/jira/browse/FLINK-10379 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.6.1 >Reporter: Piotr Nowojski >Assignee: Hequn Cheng >Priority: Critical > Labels: pull-request-available > > As stated in the > [documentation|[https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#table-functions],] > this is how table functions should be used in Java Table API: > {code:java} > // Register the function. > tableEnv.registerFunction("split", new Split("#")); > myTable.join("split(a) as (word, length)"); > {code} > However {{Table.join(String)}} was removed sometime ago and now it is > impossible to use Table Functions in Java Table API. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] hequn8128 commented on issue #6744: [FLINK-10379][docs, table] Fix Table Function docs
hequn8128 commented on issue #6744: [FLINK-10379][docs,table] Fix Table Function docs URL: https://github.com/apache/flink/pull/6744#issuecomment-424360728 +1 to merge :-) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9891) Flink cluster is not shutdown in YARN mode when Flink client is stopped
[ https://issues.apache.org/jira/browse/FLINK-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627429#comment-16627429 ] ASF GitHub Bot commented on FLINK-9891: --- GJL edited a comment on issue #6540: [FLINK-9891] Added hook to shutdown cluster if a session was created in per-job mode. URL: https://github.com/apache/flink/pull/6540#issuecomment-424360084 Thank you for your contribution to Apache Flink, @packet23. LGTM, merging. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flink cluster is not shutdown in YARN mode when Flink client is stopped > --- > > Key: FLINK-9891 > URL: https://issues.apache.org/jira/browse/FLINK-9891 > Project: Flink > Issue Type: Bug > Components: Client, YARN >Affects Versions: 1.5.0, 1.5.1 >Reporter: Sergey Krasovskiy >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > We are not using session mode and detached mode. The command to run Flink job > on YARN is: > {code:java} > /bin/flink run -m yarn-cluster -yn 1 -yqu flink -yjm 768 -ytm > 2048 -j ./flink-quickstart-java-1.0-SNAPSHOT.jar -c org.test.WordCount > {code} > Flink CLI logs: > {code:java} > Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set. > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/usr/hdp/2.4.2.10-1/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > 2018-07-18 12:47:03,747 INFO > org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service > address: http://hmaster-1.ipbl.rgcloud.net:8188/ws/v1/timeline/ > 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - > No path for the flink jar passed. Using the location of class > org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - > No path for the flink jar passed. Using the location of class > org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2018-07-18 12:47:04,248 WARN > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Neither the > HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink > YARN Client needs one of these to be set to properly load the Hadoop > configuration for accessing YARN. > 2018-07-18 12:47:04,409 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: > ClusterSpecification{masterMemoryMB=768, taskManagerMemoryMB=2048, > numberTaskManagers=1, slotsPerTaskManager=1} > 2018-07-18 12:47:04,783 WARN > org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit > local reads feature cannot be used because libhadoop cannot be loaded. > 2018-07-18 12:47:04,788 WARN > org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration > directory > ('/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/conf') > contains both LOG4J and Logback configuration files. Please delete or rename > one of them. > 2018-07-18 12:47:07,846 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application > master application_1531474158783_10814 > 2018-07-18 12:47:08,073 INFO > org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application > application_1531474158783_10814 > 2018-07-18 12:47:08,074 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster > to be allocated > 2018-07-18 12:47:08,076 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, > current state ACCEPTED > 2018-07-18 12:47:12,864 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has > been deployed successfully. > {code} > Job Manager logs: > {code:java} > 2018-07-18 12:47:09,913 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - > > 2018-07-18 12:47:09,915 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Starting > YarnSessionClusterEntrypoint (Version:
[jira] [Commented] (FLINK-9891) Flink cluster is not shutdown in YARN mode when Flink client is stopped
[ https://issues.apache.org/jira/browse/FLINK-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627431#comment-16627431 ] ASF GitHub Bot commented on FLINK-9891: --- GJL commented on issue #6718: [FLINK-9891] Add optional hook to shutdown cluster if a session was created in per-job mode in attached mode URL: https://github.com/apache/flink/pull/6718#issuecomment-424360423 Thank you for your contribution to Apache Flink, @azagrebin. LGTM, merging. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flink cluster is not shutdown in YARN mode when Flink client is stopped > --- > > Key: FLINK-9891 > URL: https://issues.apache.org/jira/browse/FLINK-9891 > Project: Flink > Issue Type: Bug > Components: Client, YARN >Affects Versions: 1.5.0, 1.5.1 >Reporter: Sergey Krasovskiy >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > We are not using session mode and detached mode. The command to run Flink job > on YARN is: > {code:java} > /bin/flink run -m yarn-cluster -yn 1 -yqu flink -yjm 768 -ytm > 2048 -j ./flink-quickstart-java-1.0-SNAPSHOT.jar -c org.test.WordCount > {code} > Flink CLI logs: > {code:java} > Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set. > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/usr/hdp/2.4.2.10-1/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > 2018-07-18 12:47:03,747 INFO > org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service > address: http://hmaster-1.ipbl.rgcloud.net:8188/ws/v1/timeline/ > 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - > No path for the flink jar passed. Using the location of class > org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - > No path for the flink jar passed. Using the location of class > org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2018-07-18 12:47:04,248 WARN > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Neither the > HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink > YARN Client needs one of these to be set to properly load the Hadoop > configuration for accessing YARN. > 2018-07-18 12:47:04,409 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: > ClusterSpecification{masterMemoryMB=768, taskManagerMemoryMB=2048, > numberTaskManagers=1, slotsPerTaskManager=1} > 2018-07-18 12:47:04,783 WARN > org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit > local reads feature cannot be used because libhadoop cannot be loaded. > 2018-07-18 12:47:04,788 WARN > org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration > directory > ('/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/conf') > contains both LOG4J and Logback configuration files. Please delete or rename > one of them. > 2018-07-18 12:47:07,846 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application > master application_1531474158783_10814 > 2018-07-18 12:47:08,073 INFO > org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application > application_1531474158783_10814 > 2018-07-18 12:47:08,074 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster > to be allocated > 2018-07-18 12:47:08,076 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, > current state ACCEPTED > 2018-07-18 12:47:12,864 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has > been deployed successfully. > {code} > Job Manager logs: > {code:java} > 2018-07-18 12:47:09,913 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - > > 2018-07-18 12:47:09,915 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Starting >
[GitHub] GJL commented on issue #6718: [FLINK-9891] Add optional hook to shutdown cluster if a session was created in per-job mode in attached mode
GJL commented on issue #6718: [FLINK-9891] Add optional hook to shutdown cluster if a session was created in per-job mode in attached mode URL: https://github.com/apache/flink/pull/6718#issuecomment-424360423 Thank you for your contribution to Apache Flink, @azagrebin. LGTM, merging. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL edited a comment on issue #6540: [FLINK-9891] Added hook to shutdown cluster if a session was created in per-job mode.
GJL edited a comment on issue #6540: [FLINK-9891] Added hook to shutdown cluster if a session was created in per-job mode. URL: https://github.com/apache/flink/pull/6540#issuecomment-424360084 Thank you for your contribution to Apache Flink, @packet23. LGTM, merging. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9891) Flink cluster is not shutdown in YARN mode when Flink client is stopped
[ https://issues.apache.org/jira/browse/FLINK-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627428#comment-16627428 ] ASF GitHub Bot commented on FLINK-9891: --- GJL commented on issue #6540: [FLINK-9891] Added hook to shutdown cluster if a session was created in per-job mode. URL: https://github.com/apache/flink/pull/6540#issuecomment-424360084 LGTM, merging. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flink cluster is not shutdown in YARN mode when Flink client is stopped > --- > > Key: FLINK-9891 > URL: https://issues.apache.org/jira/browse/FLINK-9891 > Project: Flink > Issue Type: Bug > Components: Client, YARN >Affects Versions: 1.5.0, 1.5.1 >Reporter: Sergey Krasovskiy >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > We are not using session mode and detached mode. The command to run Flink job > on YARN is: > {code:java} > /bin/flink run -m yarn-cluster -yn 1 -yqu flink -yjm 768 -ytm > 2048 -j ./flink-quickstart-java-1.0-SNAPSHOT.jar -c org.test.WordCount > {code} > Flink CLI logs: > {code:java} > Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set. > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/usr/hdp/2.4.2.10-1/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > 2018-07-18 12:47:03,747 INFO > org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service > address: http://hmaster-1.ipbl.rgcloud.net:8188/ws/v1/timeline/ > 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - > No path for the flink jar passed. Using the location of class > org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - > No path for the flink jar passed. Using the location of class > org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2018-07-18 12:47:04,248 WARN > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Neither the > HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink > YARN Client needs one of these to be set to properly load the Hadoop > configuration for accessing YARN. > 2018-07-18 12:47:04,409 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: > ClusterSpecification{masterMemoryMB=768, taskManagerMemoryMB=2048, > numberTaskManagers=1, slotsPerTaskManager=1} > 2018-07-18 12:47:04,783 WARN > org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit > local reads feature cannot be used because libhadoop cannot be loaded. > 2018-07-18 12:47:04,788 WARN > org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration > directory > ('/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/conf') > contains both LOG4J and Logback configuration files. Please delete or rename > one of them. > 2018-07-18 12:47:07,846 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application > master application_1531474158783_10814 > 2018-07-18 12:47:08,073 INFO > org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application > application_1531474158783_10814 > 2018-07-18 12:47:08,074 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster > to be allocated > 2018-07-18 12:47:08,076 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, > current state ACCEPTED > 2018-07-18 12:47:12,864 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has > been deployed successfully. > {code} > Job Manager logs: > {code:java} > 2018-07-18 12:47:09,913 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - > > 2018-07-18 12:47:09,915 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Starting > YarnSessionClusterEntrypoint (Version: 1.5.1, Rev:3488f8b, Date:10.07.2018 @ > 11:51:27 GMT) > ... >
[GitHub] GJL commented on issue #6540: [FLINK-9891] Added hook to shutdown cluster if a session was created in per-job mode.
GJL commented on issue #6540: [FLINK-9891] Added hook to shutdown cluster if a session was created in per-job mode. URL: https://github.com/apache/flink/pull/6540#issuecomment-424360084 LGTM, merging. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10423) Forward RocksDB memory metrics to Flink metrics reporter
Seth Wiesman created FLINK-10423: Summary: Forward RocksDB memory metrics to Flink metrics reporter Key: FLINK-10423 URL: https://issues.apache.org/jira/browse/FLINK-10423 Project: Flink Issue Type: New Feature Components: Metrics, State Backends, Checkpointing Reporter: Seth Wiesman Assignee: Seth Wiesman RocksDB contains a number of metrics at the column family level about current memory usage, open memtables, etc that would be useful to users wishing greater insight what rocksdb is doing. This work is inspired heavily by the comments on this rocksdb issue thread (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10379) Can not use Table Functions in Java Table API
[ https://issues.apache.org/jira/browse/FLINK-10379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627403#comment-16627403 ] ASF GitHub Bot commented on FLINK-10379: pnowojski commented on issue #6744: [FLINK-10379][docs,table] Fix Table Function docs URL: https://github.com/apache/flink/pull/6744#issuecomment-424352562 Thanks for the review @hequn8128 and spotting the place that I've missed :) I've fixed it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Can not use Table Functions in Java Table API > - > > Key: FLINK-10379 > URL: https://issues.apache.org/jira/browse/FLINK-10379 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.6.1 >Reporter: Piotr Nowojski >Assignee: Hequn Cheng >Priority: Critical > Labels: pull-request-available > > As stated in the > [documentation|[https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#table-functions],] > this is how table functions should be used in Java Table API: > {code:java} > // Register the function. > tableEnv.registerFunction("split", new Split("#")); > myTable.join("split(a) as (word, length)"); > {code} > However {{Table.join(String)}} was removed sometime ago and now it is > impossible to use Table Functions in Java Table API. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski commented on issue #6744: [FLINK-10379][docs, table] Fix Table Function docs
pnowojski commented on issue #6744: [FLINK-10379][docs,table] Fix Table Function docs URL: https://github.com/apache/flink/pull/6744#issuecomment-424352562 Thanks for the review @hequn8128 and spotting the place that I've missed :) I've fixed it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10411) Make ClusterEntrypoint more modular
[ https://issues.apache.org/jira/browse/FLINK-10411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627399#comment-16627399 ] ASF GitHub Bot commented on FLINK-10411: StefanRRichter commented on a change in pull request #6743: [FLINK-10411] Make ClusterEntrypoint more compositional URL: https://github.com/apache/flink/pull/6743#discussion_r220200492 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ## @@ -141,11 +144,11 @@ protected ClusterEntrypoint(Configuration configuration) { shutDownHook = ShutdownHookUtil.addShutdownHook(this::cleanupDirectories, getClass().getSimpleName(), LOG); } - public CompletableFuture getTerminationFuture() { + public CompletableFuture getTerminationFuture() { return terminationFuture; } - protected void startCluster() { + protected void startCluster() throws ClusterEntrypointException { Review comment: Could currently be `private` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make ClusterEntrypoint more modular > --- > > Key: FLINK-10411 > URL: https://issues.apache.org/jira/browse/FLINK-10411 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently, the {{ClusterEntrypoint}} is not very modular in the sense that it > cannot be really used for testing purposes (e.g. starting a {{Dispatcher}} > with a {{WebMonitorRestEndpoint}}). The problem is that the > {{ClusterEntrypoint}} combines too many responsibilities (creating the > cluster services, starting the cluster components and deciding on when to > terminate the JVM process). > I suggest to make the structure more compositional, meaning to split up the > service generation from the cluster component start up. That way we could > also remove code duplication between the different {{ClusterEntrypoint}} > implementations. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter commented on a change in pull request #6743: [FLINK-10411] Make ClusterEntrypoint more compositional
StefanRRichter commented on a change in pull request #6743: [FLINK-10411] Make ClusterEntrypoint more compositional URL: https://github.com/apache/flink/pull/6743#discussion_r220200492 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ## @@ -141,11 +144,11 @@ protected ClusterEntrypoint(Configuration configuration) { shutDownHook = ShutdownHookUtil.addShutdownHook(this::cleanupDirectories, getClass().getSimpleName(), LOG); } - public CompletableFuture getTerminationFuture() { + public CompletableFuture getTerminationFuture() { return terminationFuture; } - protected void startCluster() { + protected void startCluster() throws ClusterEntrypointException { Review comment: Could currently be `private` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10411) Make ClusterEntrypoint more modular
[ https://issues.apache.org/jira/browse/FLINK-10411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627394#comment-16627394 ] ASF GitHub Bot commented on FLINK-10411: StefanRRichter commented on a change in pull request #6743: [FLINK-10411] Make ClusterEntrypoint more compositional URL: https://github.com/apache/flink/pull/6743#discussion_r220124106 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ## @@ -163,9 +135,6 @@ @GuardedBy("lock") private ClusterInformation clusterInformation; Review comment: I think this field could become just a local variable. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make ClusterEntrypoint more modular > --- > > Key: FLINK-10411 > URL: https://issues.apache.org/jira/browse/FLINK-10411 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently, the {{ClusterEntrypoint}} is not very modular in the sense that it > cannot be really used for testing purposes (e.g. starting a {{Dispatcher}} > with a {{WebMonitorRestEndpoint}}). The problem is that the > {{ClusterEntrypoint}} combines too many responsibilities (creating the > cluster services, starting the cluster components and deciding on when to > terminate the JVM process). > I suggest to make the structure more compositional, meaning to split up the > service generation from the cluster component start up. That way we could > also remove code duplication between the different {{ClusterEntrypoint}} > implementations. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter commented on a change in pull request #6743: [FLINK-10411] Make ClusterEntrypoint more compositional
StefanRRichter commented on a change in pull request #6743: [FLINK-10411] Make ClusterEntrypoint more compositional URL: https://github.com/apache/flink/pull/6743#discussion_r220124106 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ## @@ -163,9 +135,6 @@ @GuardedBy("lock") private ClusterInformation clusterInformation; Review comment: I think this field could become just a local variable. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10411) Make ClusterEntrypoint more modular
[ https://issues.apache.org/jira/browse/FLINK-10411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627390#comment-16627390 ] ASF GitHub Bot commented on FLINK-10411: StefanRRichter commented on a change in pull request #6743: [FLINK-10411] Make ClusterEntrypoint more compositional URL: https://github.com/apache/flink/pull/6743#discussion_r220197869 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ## @@ -518,6 +498,34 @@ protected static Configuration loadConfiguration(EntrypointClusterConfiguration return configuration; } + // -- + // Helper methods + // -- + + public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) { + + final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName(); + try { + clusterEntrypoint.startCluster(); + } catch (ClusterEntrypointException e) { + LOG.error(String.format("Could not start cluster entrypoint %s.", clusterEntrypointName, e)); Review comment: `LOG.error(String.format("Could not start cluster entrypoint %s.", clusterEntrypointName), e);` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make ClusterEntrypoint more modular > --- > > Key: FLINK-10411 > URL: https://issues.apache.org/jira/browse/FLINK-10411 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently, the {{ClusterEntrypoint}} is not very modular in the sense that it > cannot be really used for testing purposes (e.g. starting a {{Dispatcher}} > with a {{WebMonitorRestEndpoint}}). The problem is that the > {{ClusterEntrypoint}} combines too many responsibilities (creating the > cluster services, starting the cluster components and deciding on when to > terminate the JVM process). > I suggest to make the structure more compositional, meaning to split up the > service generation from the cluster component start up. That way we could > also remove code duplication between the different {{ClusterEntrypoint}} > implementations. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter commented on a change in pull request #6743: [FLINK-10411] Make ClusterEntrypoint more compositional
StefanRRichter commented on a change in pull request #6743: [FLINK-10411] Make ClusterEntrypoint more compositional URL: https://github.com/apache/flink/pull/6743#discussion_r220197869 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ## @@ -518,6 +498,34 @@ protected static Configuration loadConfiguration(EntrypointClusterConfiguration return configuration; } + // -- + // Helper methods + // -- + + public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) { + + final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName(); + try { + clusterEntrypoint.startCluster(); + } catch (ClusterEntrypointException e) { + LOG.error(String.format("Could not start cluster entrypoint %s.", clusterEntrypointName, e)); Review comment: `LOG.error(String.format("Could not start cluster entrypoint %s.", clusterEntrypointName), e);` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10411) Make ClusterEntrypoint more modular
[ https://issues.apache.org/jira/browse/FLINK-10411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627385#comment-16627385 ] ASF GitHub Bot commented on FLINK-10411: StefanRRichter commented on a change in pull request #6743: [FLINK-10411] Make ClusterEntrypoint more compositional URL: https://github.com/apache/flink/pull/6743#discussion_r220128418 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterComponent.java ## @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.runtime.dispatcher.DispatcherFactory; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.dispatcher.DispatcherId; +import org.apache.flink.runtime.dispatcher.HistoryServerArchivist; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.metrics.util.MetricUtils; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.ResourceManagerId; +import org.apache.flink.runtime.rest.RestEndpointFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint; +import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever; +import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever; +import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever; +import org.apache.flink.util.AutoCloseableAsync; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.Preconditions; + +import akka.actor.ActorSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; + +/** + * Component which starts a {@link Dispatcher}, {@link ResourceManager} and {@link WebMonitorEndpoint} + * in the same process. + */ +public class ClusterComponent implements AutoCloseableAsync { + + private static final Logger LOG = LoggerFactory.getLogger(ClusterComponent.class); + + private final Object lock = new Object(); + + private final DispatcherFactory dispatcherFactory; + + private final ResourceManagerFactory resourceManagerFactory; + + private final RestEndpointFactory restEndpointFactory; + + private final CompletableFuture terminationFuture; + + private final CompletableFuture shutDownFuture; + + @GuardedBy("lock") + private State state; + + @GuardedBy("lock") + private ResourceManager resourceManager; + + @GuardedBy("lock") + private T dispatcher; + + @GuardedBy("lock") + private LeaderRetrievalService dispatcherLeaderRetrievalService; + + @GuardedBy("lock") + private LeaderRetrievalService resourceManagerRetrievalService; + +
[jira] [Commented] (FLINK-10411) Make ClusterEntrypoint more modular
[ https://issues.apache.org/jira/browse/FLINK-10411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627389#comment-16627389 ] ASF GitHub Bot commented on FLINK-10411: StefanRRichter commented on a change in pull request #6743: [FLINK-10411] Make ClusterEntrypoint more compositional URL: https://github.com/apache/flink/pull/6743#discussion_r220194199 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterComponent.java ## @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.runtime.dispatcher.DispatcherFactory; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.dispatcher.DispatcherId; +import org.apache.flink.runtime.dispatcher.HistoryServerArchivist; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.metrics.util.MetricUtils; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.ResourceManagerId; +import org.apache.flink.runtime.rest.RestEndpointFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint; +import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever; +import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever; +import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever; +import org.apache.flink.util.AutoCloseableAsync; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.Preconditions; + +import akka.actor.ActorSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; + +/** + * Component which starts a {@link Dispatcher}, {@link ResourceManager} and {@link WebMonitorEndpoint} + * in the same process. + */ +public class ClusterComponent implements AutoCloseableAsync { Review comment: As somebody looking into this code for the fist time, I found that the name `ClusterComponent` for this class was not really helpful to understand what it does or what it is good for. Maybe there is a name that makes this a it clearer? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make ClusterEntrypoint more modular > --- > > Key: FLINK-10411 > URL: https://issues.apache.org/jira/browse/FLINK-10411 > Project: Flink > Issue Type: Improvement > Components: Distributed
[jira] [Commented] (FLINK-10411) Make ClusterEntrypoint more modular
[ https://issues.apache.org/jira/browse/FLINK-10411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627384#comment-16627384 ] ASF GitHub Bot commented on FLINK-10411: StefanRRichter commented on a change in pull request #6743: [FLINK-10411] Make ClusterEntrypoint more compositional URL: https://github.com/apache/flink/pull/6743#discussion_r220156056 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.entrypoint; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory; +import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices; +import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.yarn.YarnResourceManager; +import org.apache.flink.yarn.YarnWorkerNode; + +import javax.annotation.Nullable; + +/** + * {@link ResourceManagerFactory} implementation which creates a {@link YarnResourceManager}. + */ +public enum YarnResourceManagerFactory implements ResourceManagerFactory { + INSTANCE; + + @Override + public ResourceManager createResourceManager(Configuration configuration, ResourceID resourceId, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, FatalErrorHandler fatalErrorHandler, ClusterInformation clusterInformation, @Nullable String webInterfaceUrl) throws Exception { Review comment: newlines. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make ClusterEntrypoint more modular > --- > > Key: FLINK-10411 > URL: https://issues.apache.org/jira/browse/FLINK-10411 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently, the {{ClusterEntrypoint}} is not very modular in the sense that it > cannot be really used for testing purposes (e.g. starting a {{Dispatcher}} > with a {{WebMonitorRestEndpoint}}). The problem is that the > {{ClusterEntrypoint}} combines too many responsibilities (creating the > cluster services, starting the cluster components and deciding on when to > terminate the JVM process). > I suggest to make the structure more compositional, meaning to split up the > service generation from the cluster component start up. That way we could > also remove code duplication between the different {{ClusterEntrypoint}} > implementations. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10411) Make ClusterEntrypoint more modular
[ https://issues.apache.org/jira/browse/FLINK-10411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627386#comment-16627386 ] ASF GitHub Bot commented on FLINK-10411: StefanRRichter commented on a change in pull request #6743: [FLINK-10411] Make ClusterEntrypoint more compositional URL: https://github.com/apache/flink/pull/6743#discussion_r220127856 ## File path: flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java ## @@ -179,7 +111,15 @@ protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkExc } @Override - protected void registerShutdownActions(CompletableFuture terminationFuture) {} + protected ClusterComponent createClusterComponent(Configuration configuration) { + return new JobClusterComponent( + new MesosResourceManagerFactory( + mesosServices, + schedulerConfiguration, + taskManagerParameters, + taskManagerContainerSpec), + new FileJobGraphRetriever(configuration.getString(JOB_GRAPH_FILE_PATH, "job.graph"))); Review comment: We could use a helper method that does construct `FileJobGraphRetriever` from `configuration` because this is duplicated. At least `"job.graph"` could become a string constant somewhere. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make ClusterEntrypoint more modular > --- > > Key: FLINK-10411 > URL: https://issues.apache.org/jira/browse/FLINK-10411 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently, the {{ClusterEntrypoint}} is not very modular in the sense that it > cannot be really used for testing purposes (e.g. starting a {{Dispatcher}} > with a {{WebMonitorRestEndpoint}}). The problem is that the > {{ClusterEntrypoint}} combines too many responsibilities (creating the > cluster services, starting the cluster components and deciding on when to > terminate the JVM process). > I suggest to make the structure more compositional, meaning to split up the > service generation from the cluster component start up. That way we could > also remove code duplication between the different {{ClusterEntrypoint}} > implementations. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10411) Make ClusterEntrypoint more modular
[ https://issues.apache.org/jira/browse/FLINK-10411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627382#comment-16627382 ] ASF GitHub Bot commented on FLINK-10411: StefanRRichter commented on a change in pull request #6743: [FLINK-10411] Make ClusterEntrypoint more compositional URL: https://github.com/apache/flink/pull/6743#discussion_r220124106 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ## @@ -163,9 +135,6 @@ @GuardedBy("lock") private ClusterInformation clusterInformation; Review comment: I think this could be a local variable. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make ClusterEntrypoint more modular > --- > > Key: FLINK-10411 > URL: https://issues.apache.org/jira/browse/FLINK-10411 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently, the {{ClusterEntrypoint}} is not very modular in the sense that it > cannot be really used for testing purposes (e.g. starting a {{Dispatcher}} > with a {{WebMonitorRestEndpoint}}). The problem is that the > {{ClusterEntrypoint}} combines too many responsibilities (creating the > cluster services, starting the cluster components and deciding on when to > terminate the JVM process). > I suggest to make the structure more compositional, meaning to split up the > service generation from the cluster component start up. That way we could > also remove code duplication between the different {{ClusterEntrypoint}} > implementations. -- This message was sent by Atlassian JIRA (v7.6.3#76005)