[jira] [Commented] (FLINK-10392) Remove legacy mode

2018-09-25 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16628234#comment-16628234
 ] 

tison commented on FLINK-10392:
---

[~till.rohrmann] I am afraid that if we add every test porting job a sub task 
then we will get a long list even before we start the removal of legacy project 
production file. It is acceptable or we can do some squash work?

> Remove legacy mode
> --
>
> Key: FLINK-10392
> URL: https://issues.apache.org/jira/browse/FLINK-10392
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
> Fix For: 1.7.0
>
>
> This issue is the umbrella issue to remove the legacy mode code from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10406) Port JobManagerTest to new code base

2018-09-25 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16628197#comment-16628197
 ] 

tison edited comment on FLINK-10406 at 9/26/18 3:41 AM:


- {{testCancelWithSavepoint}} is covered by 
{{JobMasterTriggerSavepointIT#testStopJobAfterSavepoint}}

- {{testCancelWithSavepointNoDirectoriesConfigured}} is somehow covered by 
{{JobMasterTriggerSavepointIT#testDoNotCancelJobIfSavepointFails}}. Now we 
don't provide detail error message to dig out the cause a savepoint fails. 
{{testDoNotCancelJobIfSavepointFails}} tests if the savepoint path permission 
denied, but change it to a /not/exist/path provide the same process.

the exception stringified as "java.util.concurrent.ExecutionException: 
java.util.concurrent.CompletionException: 
org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed to 
trigger savepoint. Decline reason: An Exception occurred while triggering the 
checkpoint."

- {{testCancelJobWithSavepointFailurePeriodicCheckpoints}} is covered by 
{{JobMasterTriggerSavepointIT#testDoNotCancelJobIfSavepointFails}}.


was (Author: tison):
- {{testCancelWithSavepoint}} is covered by 
{{JobMasterTriggerSavepointIT#testStopJobAfterSavepoint}}

- {{testCancelWithSavepointNoDirectoriesConfigured}} is somehow covered by 
{{JobMasterTriggerSavepointIT#testDoNotCancelJobIfSavepointFails}}. Now we 
don't provide detail error message to dig out the cause a savepoint fails. 
{{testDoNotCancelJobIfSavepointFails}} tests if the savepoint path permission 
denied, but change it to a /not/exist/path provide the same process.

the exception stringified as "java.util.concurrent.ExecutionException: 
java.util.concurrent.CompletionException: 
org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed to 
trigger savepoint. Decline reason: An Exception occurred while triggering the 
checkpoint."

> Port JobManagerTest to new code base
> 
>
> Key: FLINK-10406
> URL: https://issues.apache.org/jira/browse/FLINK-10406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
> Fix For: 1.7.0
>
>
> Port {{JobManagerTest}} to new code base
> Not all of its tests should be ported, since some of them are covered by 
> {{JobMasterTest}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10406) Port JobManagerTest to new code base

2018-09-25 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16628197#comment-16628197
 ] 

tison edited comment on FLINK-10406 at 9/26/18 3:27 AM:


- {{testCancelWithSavepoint}} is covered by 
{{JobMasterTriggerSavepointIT#testStopJobAfterSavepoint}}

- {{testCancelWithSavepointNoDirectoriesConfigured}} is somehow covered by 
{{JobMasterTriggerSavepointIT#testDoNotCancelJobIfSavepointFails}}. Now we 
don't provide detail error message to dig out the cause a savepoint fails. 
{{testDoNotCancelJobIfSavepointFails}} tests if the savepoint path permission 
denied, but change it to a /not/exist/path provide the same process.

the exception stringified as "java.util.concurrent.ExecutionException: 
java.util.concurrent.CompletionException: 
org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed to 
trigger savepoint. Decline reason: An Exception occurred while triggering the 
checkpoint."


was (Author: tison):
- {{testCancelWithSavepoint}} is covered by 
{{JobMasterTriggerSavepointIT#testStopJobAfterSavepoint}}

- {{testCancelWithSavepointNoDirectoriesConfigured}} is somehow covered by 
{{JobMasterTriggerSavepointIT#testDoNotCancelJobIfSavepointFails}}. Now we 
don't provide detail error message to dig out the cause a savepoint fails. 
{{testDoNotCancelJobIfSavepointFails}} tests if the savepoint path permission 
denied, but change it to a /not/exist/path provide the same process.

> Port JobManagerTest to new code base
> 
>
> Key: FLINK-10406
> URL: https://issues.apache.org/jira/browse/FLINK-10406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
> Fix For: 1.7.0
>
>
> Port {{JobManagerTest}} to new code base
> Not all of its tests should be ported, since some of them are covered by 
> {{JobMasterTest}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10406) Port JobManagerTest to new code base

2018-09-25 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16628197#comment-16628197
 ] 

tison edited comment on FLINK-10406 at 9/26/18 3:26 AM:


- {{testCancelWithSavepoint}} is covered by 
{{JobMasterTriggerSavepointIT#testStopJobAfterSavepoint}}

- {{testCancelWithSavepointNoDirectoriesConfigured}} is somehow covered by 
{{JobMasterTriggerSavepointIT#testDoNotCancelJobIfSavepointFails}}. Now we 
don't provide detail error message to dig out the cause a savepoint fails. 
{{testDoNotCancelJobIfSavepointFails}} tests if the savepoint path permission 
denied, but change it to a /not/exist/path provide the same process.


was (Author: tison):
- {{testCancelWithSavepoint}} is covered by 
{{JobMasterTriggerSavepointIT#testStopJobAfterSavepoint}}

> Port JobManagerTest to new code base
> 
>
> Key: FLINK-10406
> URL: https://issues.apache.org/jira/browse/FLINK-10406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
> Fix For: 1.7.0
>
>
> Port {{JobManagerTest}} to new code base
> Not all of its tests should be ported, since some of them are covered by 
> {{JobMasterTest}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10420) Create and drop view in sql client should check the view created based on the configuration.

2018-09-25 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16628208#comment-16628208
 ] 

vinoyang commented on FLINK-10420:
--

[~hequn8128] What do you think about this issue?

> Create and drop view in sql client should check the view created based on the 
> configuration.
> 
>
> Key: FLINK-10420
> URL: https://issues.apache.org/jira/browse/FLINK-10420
> Project: Flink
>  Issue Type: Bug
>  Components: SQL Client
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>
> Currently, just checked current session : 
> {code:java}
> private void callCreateView(SqlCommandCall cmdCall) {
>final String name = cmdCall.operands[0];
>final String query = cmdCall.operands[1];
>//here
>final String previousQuery = context.getViews().get(name);
>if (previousQuery != null) {
>   printExecutionError(CliStrings.MESSAGE_VIEW_ALREADY_EXISTS);
>   return;
>}
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10406) Port JobManagerTest to new code base

2018-09-25 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16628197#comment-16628197
 ] 

tison commented on FLINK-10406:
---

- {{testCancelWithSavepoint}} is covered by 
{{JobMasterTriggerSavepointIT#testStopJobAfterSavepoint}}

> Port JobManagerTest to new code base
> 
>
> Key: FLINK-10406
> URL: https://issues.apache.org/jira/browse/FLINK-10406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
> Fix For: 1.7.0
>
>
> Port {{JobManagerTest}} to new code base
> Not all of its tests should be ported, since some of them are covered by 
> {{JobMasterTest}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10427) Port JobSubmitTest to new code base

2018-09-25 Thread tison (JIRA)
tison created FLINK-10427:
-

 Summary: Port JobSubmitTest to new code base
 Key: FLINK-10427
 URL: https://issues.apache.org/jira/browse/FLINK-10427
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Affects Versions: 1.7.0
Reporter: tison
Assignee: tison
 Fix For: 1.7.0


Port {{JobSubmitTest}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10406) Port JobManagerTest to new code base

2018-09-25 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16628148#comment-16628148
 ] 

tison edited comment on FLINK-10406 at 9/26/18 2:57 AM:


- {{testSavepointRestoreSettings}} is covered by 
{{JobMaster#testRestoringFromSavepoint}}

the {{triggerSavepoint}} part is covered by {{JobMasterTriggerSavepointIT}}, 
and the submit failure part should be taken care of when port 
{{JobSubmitTest}}, which has a test {{testAnswerFailureWhenSavepointReadFails}}


was (Author: tison):
- {{testSavepointRestoreSettings}} is covered by 
{{JobMaster#testRestoringFromSavepoint}}

> Port JobManagerTest to new code base
> 
>
> Key: FLINK-10406
> URL: https://issues.apache.org/jira/browse/FLINK-10406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
> Fix For: 1.7.0
>
>
> Port {{JobManagerTest}} to new code base
> Not all of its tests should be ported, since some of them are covered by 
> {{JobMasterTest}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16628183#comment-16628183
 ] 

ASF GitHub Bot commented on FLINK-10134:


XuQianJin-Stars commented on issue #6710: [FLINK-10134] UTF-16 support for 
TextInputFormat bug fixed
URL: https://github.com/apache/flink/pull/6710#issuecomment-424565714
 
 
   @StephanEwen Hello, regarding the two questions you raised yesterday, I have 
some opinions about myself and I don’t know if it’s right.
   1.Where should the BOM be read?I think it is still necessary to increase the 
logic for processing the bom when the file is started at the beginning of the 
file. Add an attribute to the read bom encoding logic to record the file bom 
encoding.For example: put it in the function `createInputSplits`.
   2.Regarding the second performance problem, you can use the previously 
generated bom code to judge UTF8 with bom, UTF16 wuth bom, UTF32 with bom, and 
control the byte size to process the end of each line, because I found The 
previous bug garbled is actually a coding problem, one is caused by improper 
processing of the end byte of each line. I have done the following for this 
problem:
   `String utf8 = "UTF-8";`
   `String utf16 = "UTF-16";`
   `String utf32 = "UTF-32";`
   `int stepSize = 0;`
   `String charsetName = this.getCharsetName();`
   `if (charsetName.contains(utf8)) {`
   ​`stepSize = 1;`
   `} else if (charsetName.contains(utf16)) {`
   ​`stepSize = 2;`
   `} else if (charsetName.contains(utf32)) {`
   ​`stepSize = 4;`
   `}`
   `//Check if \n is used as delimiter and the end of this line is a \r, then 
remove \r from the line`
   `if (this.getDelimiter() != null && this.getDelimiter().length == 1`
   ​`&& this.getDelimiter()[0] == NEW_LINE && offset + numBytes >= 
stepSize`
   ​`&& bytes[offset + numBytes - stepSize] == CARRIAGE_RETURN) {`
   ​`numBytes -= stepSize;`
   `}`
   `numBytes = numBytes - stepSize + 1;`
   `return new String(bytes, offset, numBytes, this.getCharsetName());`
   These are some of my own ideas. I hope that you can give some better 
suggestions and handle this jira better. Thank you.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> UTF-16 support for TextInputFormat
> --
>
> Key: FLINK-10134
> URL: https://issues.apache.org/jira/browse/FLINK-10134
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.2
>Reporter: David Dreyfus
>Priority: Blocker
>  Labels: pull-request-available
>
> It does not appear that Flink supports a charset encoding of "UTF-16". It 
> particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) 
> to establish whether a UTF-16 file is UTF-16LE or UTF-16BE.
>  
> TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), 
> which sets TextInputFormat.charsetName and then modifies the previously set 
> delimiterString to construct the proper byte string encoding of the the 
> delimiter. This same charsetName is also used in TextInputFormat.readRecord() 
> to interpret the bytes read from the file.
>  
> There are two problems that this implementation would seem to have when using 
> UTF-16.
>  # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will 
> return a Big Endian byte sequence including the Byte Order Mark (BOM). The 
> actual text file will not contain a BOM at each line ending, so the delimiter 
> will never be read. Moreover, if the actual byte encoding of the file is 
> Little Endian, the bytes will be interpreted incorrectly.
>  # TextInputFormat.readRecord() will not see a BOM each time it decodes a 
> byte sequence with the String(bytes, offset, numBytes, charset) call. 
> Therefore, it will assume Big Endian, which may not always be correct. [1] 
> [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95]
>  
> While there are likely many solutions, I would think that all of them would 
> have to start by reading the BOM from the file when a Split is opened and 
> then using that BOM to modify the specified encoding to a BOM specific one 
> when the caller doesn't specify one, and to overwrite the caller's 
> specification if the BOM is in conflict with the caller's specification. That 
> is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, 
> Flink should rewrite the charsetName as UTF-16LE.
>  I hope this makes sense and that I haven't been testing incorrectly or 
> 

[GitHub] XuQianJin-Stars commented on issue #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed

2018-09-25 Thread GitBox
XuQianJin-Stars commented on issue #6710: [FLINK-10134] UTF-16 support for 
TextInputFormat bug fixed
URL: https://github.com/apache/flink/pull/6710#issuecomment-424565714
 
 
   @StephanEwen Hello, regarding the two questions you raised yesterday, I have 
some opinions about myself and I don’t know if it’s right.
   1.Where should the BOM be read?I think it is still necessary to increase the 
logic for processing the bom when the file is started at the beginning of the 
file. Add an attribute to the read bom encoding logic to record the file bom 
encoding.For example: put it in the function `createInputSplits`.
   2.Regarding the second performance problem, you can use the previously 
generated bom code to judge UTF8 with bom, UTF16 wuth bom, UTF32 with bom, and 
control the byte size to process the end of each line, because I found The 
previous bug garbled is actually a coding problem, one is caused by improper 
processing of the end byte of each line. I have done the following for this 
problem:
   `String utf8 = "UTF-8";`
   `String utf16 = "UTF-16";`
   `String utf32 = "UTF-32";`
   `int stepSize = 0;`
   `String charsetName = this.getCharsetName();`
   `if (charsetName.contains(utf8)) {`
   ​`stepSize = 1;`
   `} else if (charsetName.contains(utf16)) {`
   ​`stepSize = 2;`
   `} else if (charsetName.contains(utf32)) {`
   ​`stepSize = 4;`
   `}`
   `//Check if \n is used as delimiter and the end of this line is a \r, then 
remove \r from the line`
   `if (this.getDelimiter() != null && this.getDelimiter().length == 1`
   ​`&& this.getDelimiter()[0] == NEW_LINE && offset + numBytes >= 
stepSize`
   ​`&& bytes[offset + numBytes - stepSize] == CARRIAGE_RETURN) {`
   ​`numBytes -= stepSize;`
   `}`
   `numBytes = numBytes - stepSize + 1;`
   `return new String(bytes, offset, numBytes, this.getCharsetName());`
   These are some of my own ideas. I hope that you can give some better 
suggestions and handle this jira better. Thank you.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9328) RocksDBStateBackend might use PlaceholderStreamStateHandle to restor due to StateBackendTestBase class not register snapshots in some UTs

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16628169#comment-16628169
 ] 

ASF GitHub Bot commented on FLINK-9328:
---

Myasuka commented on issue #5984: [FLINK-9328][state] Fix RocksDBStateBackend 
restore problem due to StateBackendTestBase not registering snapshots
URL: https://github.com/apache/flink/pull/5984#issuecomment-424561812
 
 
   @azagrebin Sure, since the test code already fixed, just close this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RocksDBStateBackend might use PlaceholderStreamStateHandle to restor due to 
> StateBackendTestBase class not register snapshots in some UTs
> -
>
> Key: FLINK-9328
> URL: https://issues.apache.org/jira/browse/FLINK-9328
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Yun Tang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.5.5
>
>
> Currently, StateBackendTestBase class does not register snapshots to 
> SharedStateRegistry in testValueState, testListState, testReducingState, 
> testFoldingState and testMapState UTs, which may cause RocksDBStateBackend to 
> restore from PlaceholderStreamStateHandle during the 2nd restore procedure if 
> one specific sst file both existed in the 1st snapshot and the 2nd snapshot 
> handle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Myasuka commented on issue #5984: [FLINK-9328][state] Fix RocksDBStateBackend restore problem due to StateBackendTestBase not registering snapshots

2018-09-25 Thread GitBox
Myasuka commented on issue #5984: [FLINK-9328][state] Fix RocksDBStateBackend 
restore problem due to StateBackendTestBase not registering snapshots
URL: https://github.com/apache/flink/pull/5984#issuecomment-424561812
 
 
   @azagrebin Sure, since the test code already fixed, just close this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10406) Port JobManagerTest to new code base

2018-09-25 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16628148#comment-16628148
 ] 

tison commented on FLINK-10406:
---

- {{testSavepointRestoreSettings}} is covered by 
{{JobMaster#testRestoringFromSavepoint}}

> Port JobManagerTest to new code base
> 
>
> Key: FLINK-10406
> URL: https://issues.apache.org/jira/browse/FLINK-10406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
> Fix For: 1.7.0
>
>
> Port {{JobManagerTest}} to new code base
> Not all of its tests should be ported, since some of them are covered by 
> {{JobMasterTest}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10292) Generate JobGraph in StandaloneJobClusterEntrypoint only once

2018-09-25 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16628139#comment-16628139
 ] 

vinoyang commented on FLINK-10292:
--

Hi [~uce] Thank you for your clarification on "image", that doesn't have much 
impact. My general idea is that Flink runs in a virtualized environment and the 
standalone environment is not much different in nature, and can be treated 
equally.

[~till.rohrmann], what do you think about our discussion, if you have time, 
please help to review the PR of FLINK-10291, it should be no dispute.

> Generate JobGraph in StandaloneJobClusterEntrypoint only once
> -
>
> Key: FLINK-10292
> URL: https://issues.apache.org/jira/browse/FLINK-10292
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.7.0, 1.6.2
>
>
> Currently the {{StandaloneJobClusterEntrypoint}} generates the {{JobGraph}} 
> from the given user code every time it starts/is restarted. This can be 
> problematic if the the {{JobGraph}} generation has side effects. Therefore, 
> it would be better to generate the {{JobGraph}} only once and store it in HA 
> storage instead from where to retrieve.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16628084#comment-16628084
 ] 

ASF GitHub Bot commented on FLINK-10205:


chunhui-shi commented on issue #6684: [FLINK-10205] Batch Job: InputSplit 
Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-424544286
 
 
   Nice work. Just that the stored history of split 'inputSplits' does not have 
to be a full history. If we understand that the list is short enough, then it 
is fine to ship it as is. +1.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] chunhui-shi commented on issue #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource…

2018-09-25 Thread GitBox
chunhui-shi commented on issue #6684: [FLINK-10205] Batch Job: InputSplit 
Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-424544286
 
 
   Nice work. Just that the stored history of split 'inputSplits' does not have 
to be a full history. If we understand that the list is short enough, then it 
is fine to ship it as is. +1.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-9363) Bump up the Jackson version

2018-09-25 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-9363:
--
Description: 
CVE's for Jackson :

CVE-2017-17485
CVE-2018-5968
CVE-2018-7489


We can upgrade to 2.9.5

  was:
CVE's for Jackson :

CVE-2017-17485
CVE-2018-5968
CVE-2018-7489

We can upgrade to 2.9.5


> Bump up the Jackson version
> ---
>
> Key: FLINK-9363
> URL: https://issues.apache.org/jira/browse/FLINK-9363
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Major
>  Labels: security
>
> CVE's for Jackson :
> CVE-2017-17485
> CVE-2018-5968
> CVE-2018-7489
> We can upgrade to 2.9.5



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10423) Forward RocksDB memory metrics to Flink metrics reporter

2018-09-25 Thread Monal Daxini (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627866#comment-16627866
 ] 

Monal Daxini commented on FLINK-10423:
--

It will be great to have this ported to 1.6 release as well. 

Spoke with [~srichter] about this offline.

> Forward RocksDB memory metrics to Flink metrics reporter 
> -
>
> Key: FLINK-10423
> URL: https://issues.apache.org/jira/browse/FLINK-10423
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics, State Backends, Checkpointing
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>
> RocksDB contains a number of metrics at the column family level about current 
> memory usage, open memtables,  etc that would be useful to users wishing 
> greater insight what rocksdb is doing. This work is inspired heavily by the 
> comments on this rocksdb issue thread 
> (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10423) Forward RocksDB memory metrics to Flink metrics reporter

2018-09-25 Thread Monal Daxini (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627866#comment-16627866
 ] 

Monal Daxini edited comment on FLINK-10423 at 9/25/18 8:15 PM:
---

It will be great to have this ported to 1.6 release.

Spoke with [~srichter] about this offline.


was (Author: mdaxini):
It will be great to have this ported to 1.6 release as well. 

Spoke with [~srichter] about this offline.

> Forward RocksDB memory metrics to Flink metrics reporter 
> -
>
> Key: FLINK-10423
> URL: https://issues.apache.org/jira/browse/FLINK-10423
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics, State Backends, Checkpointing
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>
> RocksDB contains a number of metrics at the column family level about current 
> memory usage, open memtables,  etc that would be useful to users wishing 
> greater insight what rocksdb is doing. This work is inspired heavily by the 
> comments on this rocksdb issue thread 
> (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10406) Port JobManagerTest to new code base

2018-09-25 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627806#comment-16627806
 ] 

tison edited comment on FLINK-10406 at 9/25/18 7:28 PM:


* {{testStopSignal}} and  {{testStopSignalFail}} are covered by 
{{ExecutionGraphStopTest}}. High level invocation at {{Dispatcher}} and 
{{JobMaster}} level are trivial.
 * {{testNullHostnameGoesToLocalhost}} is ported to {{AkkaUtilsTest#"null 
hostname should go to localhost"}}
 * {{testRequestPartitionState*}} I would propose to ignore all of them since 
we have FLINK-10319. It proposed to disable {{JobMaster#requestPartitionState}} 
and have one approval and no objection yet. Also cc [~trohrm...@apache.org], 
could you take a look at FLINK-10319 so that we could make the decision of this 
removal? (UPDATE: even without FLINK-10319 accepted, these tests should be 
covered by {{JobMasterTest#testRequestPartitionState}} and {{TaskTest#...}})


was (Author: tison):
* {{testStopSignal}} and  {{testStopSignalFail}} are covered by 
{{ExecutionGraphStopTest}}. High level invocation at {{Dispatcher}} and 
{{JobMaster}} level are trivial.
 * {{testNullHostnameGoesToLocalhost}} is ported to {{AkkaUtilsTest#"null 
hostname should go to localhost"}}
 * {{testRequestPartitionState*}} I would propose to ignore all of them since 
we have FLINK-10319. It proposed to disable {{JobMaster#requestPartitionState}} 
and have one approval and no objection yet. Also cc [~trohrm...@apache.org], 
could you take a look at FLINK-10319 so that we could make the decision of this 
removal?

> Port JobManagerTest to new code base
> 
>
> Key: FLINK-10406
> URL: https://issues.apache.org/jira/browse/FLINK-10406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
> Fix For: 1.7.0
>
>
> Port {{JobManagerTest}} to new code base
> Not all of its tests should be ported, since some of them are covered by 
> {{JobMasterTest}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10406) Port JobManagerTest to new code base

2018-09-25 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627806#comment-16627806
 ] 

tison edited comment on FLINK-10406 at 9/25/18 7:22 PM:


* {{testStopSignal}} and  {{testStopSignalFail}} are covered by 
{{ExecutionGraphStopTest}}. High level invocation at {{Dispatcher}} and 
{{JobMaster}} level are trivial.
 * {{testNullHostnameGoesToLocalhost}} is ported to {{AkkaUtilsTest#"null 
hostname should go to localhost"}}
 * {{testRequestPartitionState*}} I would propose to ignore all of them since 
we have FLINK-10319. It proposed to disable {{JobMaster#requestPartitionState}} 
and have one approval and no objection yet. Also cc [~trohrm...@apache.org], 
could you take a look at FLINK-10319 so that we could make the decision of this 
removal?


was (Author: tison):
* {{testStopSignal}} and  {{testStopSignalFail}} are covered by 
{{ExecutionGraphStopTest}}. High level invocation at {{Dispatcher}} and 
{{JobMaster}} level are trivial.
 * {{testNullHostnameGoesToLocalhost}} is ported to {{AkkaUtilsTest#"null 
hostname should go to localhost"}}

> Port JobManagerTest to new code base
> 
>
> Key: FLINK-10406
> URL: https://issues.apache.org/jira/browse/FLINK-10406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
> Fix For: 1.7.0
>
>
> Port {{JobManagerTest}} to new code base
> Not all of its tests should be ported, since some of them are covered by 
> {{JobMasterTest}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10406) Port JobManagerTest to new code base

2018-09-25 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627806#comment-16627806
 ] 

tison edited comment on FLINK-10406 at 9/25/18 7:13 PM:


* {{testStopSignal}} and  {{testStopSignalFail}} are covered by 
{{ExecutionGraphStopTest}}. High level invocation at {{Dispatcher}} and 
{{JobMaster}} level are trivial.
 * {{testNullHostnameGoesToLocalhost}} is ported to {{AkkaUtilsTest#"null 
hostname should go to localhost"}}


was (Author: tison):
* {{testStopSignal}} and  {{testStopSignalFail}} are covered by 
{{ExecutionGraphStopTest}}. High level invocation at {{Dispatcher}} and 
{{JobMaster}} level are trivial.
 *

> Port JobManagerTest to new code base
> 
>
> Key: FLINK-10406
> URL: https://issues.apache.org/jira/browse/FLINK-10406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
> Fix For: 1.7.0
>
>
> Port {{JobManagerTest}} to new code base
> Not all of its tests should be ported, since some of them are covered by 
> {{JobMasterTest}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10406) Port JobManagerTest to new code base

2018-09-25 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627806#comment-16627806
 ] 

tison commented on FLINK-10406:
---

* {{testStopSignal}} and  {{testStopSignalFail}} are covered by 
{{ExecutionGraphStopTest}}. High level invocation at {{Dispatcher}} and 
{{JobMaster}} level are trivial.
 *

> Port JobManagerTest to new code base
> 
>
> Key: FLINK-10406
> URL: https://issues.apache.org/jira/browse/FLINK-10406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
> Fix For: 1.7.0
>
>
> Port {{JobManagerTest}} to new code base
> Not all of its tests should be ported, since some of them are covered by 
> {{JobMasterTest}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10225) Cannot access state from a empty taskmanager

2018-09-25 Thread Pierre Zemb (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16620743#comment-16620743
 ] 

Pierre Zemb edited comment on FLINK-10225 at 9/25/18 6:40 PM:
--

Hi! I've been reading the code a bit, and I have a question.

>From what I can see, only the TaskExecutor has enough knowledge to make a call 
>to the ResourceManager. He is also the only one that is updating the 
>ConcurrentHashMap used by the RPC handler. Meaning that when I'm inside the 
>RPC handler, I cannot find a way to nicely trigger a method of TaskExecutor.

 

I like the way the interface for KvStateClientProxy is implemented, and I don't 
want to change , do you have an idea on how could I implement this?

 

cc [~till.rohrmann]


was (Author: pierrez):
Hi! I've been reading the code a bit, and I have a question.

>From what I can see, only the TaskExecutor has enough knowledge to make a call 
>to the ResourceManager. He is also the only one that is updating the 
>ConcurrentHashMap used by the RPC handler. Meaning that when I'm inside the 
>RPC handler, I cannot find a way to nicely trigger a method of TaskExecutor.

 

I like the way the interface for KvStateClientProxy is implemented, and I don't 
want to change , do you have an idea on how could I implement this?

> Cannot access state from a empty taskmanager
> 
>
> Key: FLINK-10225
> URL: https://issues.apache.org/jira/browse/FLINK-10225
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.5.3, 1.6.0
> Environment: 4tm and 1jm for now on 1.6.0
>Reporter: Pierre Zemb
>Priority: Critical
>
> Hi!
> I've started to deploy a small Flink cluster (4tm and 1jm for now on 1.6.0), 
> and deployed a small job on it. Because of the current load, job is 
> completely handled by a single tm. I've created a small proxy that is using 
> [QueryableStateClient|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/queryablestate/client/QueryableStateClient.html]
>  to access the current state. It is working nicely, except under certain 
> circumstances. It seems to me that I can only access the state through a node 
> that is holding a part of the job. Here's an example:
>  * job on tm1. Pointing QueryableStateClient to tm1. State accessible
>  * job still on tm1. Pointing QueryableStateClient to tm2 (for example). 
> State inaccessible
>  * killing tm1, job is now on tm2. State accessible
>  * job still on tm2. Pointing QueryableStateClient to tm3. State inaccessible
>  * adding some parallelism to spread job on tm1 and tm2. Pointing 
> QueryableStateClient to either tm1 and tm2 is working
>  * job still on tm1 and tm2. Pointing QueryableStateClient to tm3. State 
> inaccessible
> When the state is inaccessible, I can see this (generated 
> [here|https://github.com/apache/flink/blob/release-1.6/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java#L228]):
> {code:java}
> java.lang.RuntimeException: Failed request 0. Caused by: 
> org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could 
> not retrieve location of state=repo-status of 
> job=3ac3bc00b2d5bc0752917186a288d40a. Potential reasons are: i) the state is 
> not ready, or ii) the job does not exist. at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63)
>  at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> {code}
> Went a bit through the (master branch) code. Class KvStateClientProxy is 
> holding {color:#33}kvStateLocationOracle the key-value state location 
> oracle for the given JobID. Here's the usage{color}{color:#33}:{color}
>  * {color:#33}updateKvStateLocationOracle() in 

[jira] [Commented] (FLINK-10292) Generate JobGraph in StandaloneJobClusterEntrypoint only once

2018-09-25 Thread Ufuk Celebi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627766#comment-16627766
 ] 

Ufuk Celebi commented on FLINK-10292:
-

[~yanghua] I didn't mean to confuse you with the image and Kubernetes example. 
I just took it as *one* example for when the {{StandaloneJobClusterEntryPoint}} 
is used. My main question was independent of this as you also pointed out.

Thanks for your input. To summarize your answer, you think that in such a 
scenario, it should be the **responsibility of the user** to clean up the HA 
data.

> Generate JobGraph in StandaloneJobClusterEntrypoint only once
> -
>
> Key: FLINK-10292
> URL: https://issues.apache.org/jira/browse/FLINK-10292
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.7.0, 1.6.2
>
>
> Currently the {{StandaloneJobClusterEntrypoint}} generates the {{JobGraph}} 
> from the given user code every time it starts/is restarted. This can be 
> problematic if the the {{JobGraph}} generation has side effects. Therefore, 
> it would be better to generate the {{JobGraph}} only once and store it in HA 
> storage instead from where to retrieve.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10292) Generate JobGraph in StandaloneJobClusterEntrypoint only once

2018-09-25 Thread Ufuk Celebi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626190#comment-16626190
 ] 

Ufuk Celebi edited comment on FLINK-10292 at 9/25/18 6:24 PM:
--

I understand that non-determinism may be an issue when generating the 
{{JobGraph}}, but do we have some data about how common that is for 
applications? Would it be possible to keep a fixed JobGraph in the image 
instead of persisting one in the {{SubmittedJobGraphStore}}?

I like our current approach, because it keeps the source of truth with 
image-based deployments such as Kubernetes *in* the image instead of the 
{{SubmittedJobGraphStore}}. I'm wondering about the following scenario in 
particular (this is independent of the question whether it runs on Kubernetes 
or not and can be reproduced in an other way as well):
 * A user creates a job cluster with high availability enabled (cluster ID for 
the logical application, e.g. myapp)
 ** This will persist the job with a fixed ID (after FLINK-10291) on first 
submission
 * The user kills the application *without* cancelling
 ** This will leave all data in the high availability store(s) such as job 
graphs or checkpoints
 * The user updates the image with a modified application and keeps the high 
availability configuration (e.g. cluster ID stays myapp)
 ** This will result in the job in the image to be ignored since we already 
have a job graph with the same (fixed) ID

I think in such a scenario it can be desirable to still have the checkpoints 
available, but it might be problematic if the job graph is recovered from the 
{{SubmittedJobGraphStore}} instead of using the job that is part of the image. 
What do you think about this scenario? Is it the responsibility of the user to 
handle this? If so, I think that the approach outlined in this ticket makes 
sense. If not, we may want to consider alternatives or ignore potential 
non-determinism.


was (Author: uce):
I understand that non-determinism may be an issue when generating the 
{{JobGraph}}, but do we have some data about how common that is for 
applications? Would it be possible to keep a fixed JobGraph in the image 
instead of persisting one in the {{SubmittedJobGraphStore}}?

I like our current approach, because it keeps the source of truth for the job 
in the image instead of the {{SubmittedJobGraphStore}}. I'm wondering about the 
following scenario:
 * A user creates a job cluster with high availability enabled (cluster ID for 
the logical application, e.g. myapp)
 ** This will persist the job with a fixed ID (after FLINK-10291) on first 
submission
 * The user kills the application *without* cancelling
 ** This will leave all data in the high availability store(s) such as job 
graphs or checkpoints
 * The user updates the image with a modified application and keeps the high 
availability configuration (e.g. cluster ID stays myapp)
 ** This will result in the job in the image to be ignored since we already 
have a job graph with the same (fixed) ID

I think in such a scenario it can be desirable to still have the checkpoints 
available, but it might be problematic if the job graph is recovered from the 
{{SubmittedJobGraphStore}} instead of using the job that is part of the image. 
What do you think about this scenario? Is it the responsibility of the user to 
handle this? If so, I think that the approach outlined in this ticket makes 
sense. If not, we may want to consider alternatives or ignore potential 
non-determinism.

> Generate JobGraph in StandaloneJobClusterEntrypoint only once
> -
>
> Key: FLINK-10292
> URL: https://issues.apache.org/jira/browse/FLINK-10292
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.7.0, 1.6.2
>
>
> Currently the {{StandaloneJobClusterEntrypoint}} generates the {{JobGraph}} 
> from the given user code every time it starts/is restarted. This can be 
> problematic if the the {{JobGraph}} generation has side effects. Therefore, 
> it would be better to generate the {{JobGraph}} only once and store it in HA 
> storage instead from where to retrieve.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10279) Make jython limitations more obvious in documentation

2018-09-25 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10279:
---
Labels: pull-request-available  (was: )

> Make jython limitations more obvious in documentation
> -
>
> Key: FLINK-10279
> URL: https://issues.apache.org/jira/browse/FLINK-10279
> Project: Flink
>  Issue Type: Task
>  Components: Documentation, Python API
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> The "Python Programming Guide (Streaming) Beta" at 
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/python.html]
>  does not sufficiently highlight limitations of the API. It should probably 
> have a prominent disclaimer right at the top stating that this actually isn't 
> a "Python" API but Jython, which likely means that the user looking for a 
> solution to run native Python code won't be able to use many important 
> libraries, which is often the reason to look for Python support in first 
> place.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10279) Make jython limitations more obvious in documentation

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627725#comment-16627725
 ] 

ASF GitHub Bot commented on FLINK-10279:


tweise opened a new pull request #6761: [FLINK-10279] [documentation] Make 
jython limitations more obvious in documentation.
URL: https://github.com/apache/flink/pull/6761
 
 
   Purpose of this change is to better highlight the potential restrictions of 
the Jython based streaming API to avoid user confusion.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make jython limitations more obvious in documentation
> -
>
> Key: FLINK-10279
> URL: https://issues.apache.org/jira/browse/FLINK-10279
> Project: Flink
>  Issue Type: Task
>  Components: Documentation, Python API
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> The "Python Programming Guide (Streaming) Beta" at 
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/python.html]
>  does not sufficiently highlight limitations of the API. It should probably 
> have a prominent disclaimer right at the top stating that this actually isn't 
> a "Python" API but Jython, which likely means that the user looking for a 
> solution to run native Python code won't be able to use many important 
> libraries, which is often the reason to look for Python support in first 
> place.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tweise commented on issue #6761: [FLINK-10279] [documentation] Make jython limitations more obvious in documentation.

2018-09-25 Thread GitBox
tweise commented on issue #6761: [FLINK-10279] [documentation] Make jython 
limitations more obvious in documentation.
URL: https://github.com/apache/flink/pull/6761#issuecomment-424442918
 
 
   CC: @StephanEwen @aljoscha @zentol 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10279) Make jython limitations more obvious in documentation

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627726#comment-16627726
 ] 

ASF GitHub Bot commented on FLINK-10279:


tweise commented on issue #6761: [FLINK-10279] [documentation] Make jython 
limitations more obvious in documentation.
URL: https://github.com/apache/flink/pull/6761#issuecomment-424442918
 
 
   CC: @StephanEwen @aljoscha @zentol 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make jython limitations more obvious in documentation
> -
>
> Key: FLINK-10279
> URL: https://issues.apache.org/jira/browse/FLINK-10279
> Project: Flink
>  Issue Type: Task
>  Components: Documentation, Python API
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> The "Python Programming Guide (Streaming) Beta" at 
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/python.html]
>  does not sufficiently highlight limitations of the API. It should probably 
> have a prominent disclaimer right at the top stating that this actually isn't 
> a "Python" API but Jython, which likely means that the user looking for a 
> solution to run native Python code won't be able to use many important 
> libraries, which is often the reason to look for Python support in first 
> place.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tweise opened a new pull request #6761: [FLINK-10279] [documentation] Make jython limitations more obvious in documentation.

2018-09-25 Thread GitBox
tweise opened a new pull request #6761: [FLINK-10279] [documentation] Make 
jython limitations more obvious in documentation.
URL: https://github.com/apache/flink/pull/6761
 
 
   Purpose of this change is to better highlight the potential restrictions of 
the Jython based streaming API to avoid user confusion.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #6725: [FLINK-10263] [sql-client] Fix classloader issues in SQL Client

2018-09-25 Thread GitBox
asfgit closed pull request #6725: [FLINK-10263] [sql-client] Fix classloader 
issues in SQL Client
URL: https://github.com/apache/flink/pull/6725
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh 
b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
index b5830725db1..ca0251365e6 100755
--- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
@@ -212,6 +212,8 @@ tables:
 type: VARCHAR
   - name: duplicate_count
 type: BIGINT
+  - name: constant
+type: VARCHAR
 connector:
   type: filesystem
   path: $RESULT
@@ -226,6 +228,8 @@ tables:
   type: VARCHAR
 - name: duplicate_count
   type: BIGINT
+- name: constant
+  type: VARCHAR
 
 functions:
   - name: RegReplace
@@ -261,7 +265,7 @@ $FLINK_DIR/bin/sql-client.sh embedded \
 
 read -r -d '' SQL_STATEMENT_2 << EOF
 INSERT INTO CsvSinkTable
-  SELECT *
+  SELECT AvroBothTable.*, RegReplace('Test constant folding.', 'Test', 
'Success') AS constant
   FROM AvroBothTable
 EOF
 
@@ -285,4 +289,4 @@ for i in {1..10}; do
   sleep 5
 done
 
-check_result_hash "SQLClient" $RESULT "dca08a82cc09f6b19950291dbbef16bb"
+check_result_hash "SQLClient" $RESULT "0a1bf8bf716069b7269f575f87a802c0"
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 85b3e9265a8..552d0b37dca 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -75,6 +75,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Supplier;
 
 /**
  * Context for executing table programs. This class caches everything that can 
be cached across
@@ -183,6 +184,19 @@ public EnvironmentInstance createEnvironmentInstance() {
return tableSinks;
}
 
+   /**
+* Executes the given supplier using the execution context's 
classloader as thread classloader.
+*/
+   public  R wrapClassLoader(Supplier supplier) {
+   final ClassLoader previousClassloader = 
Thread.currentThread().getContextClassLoader();
+   Thread.currentThread().setContextClassLoader(classLoader);
+   try {
+   return supplier.get();
+   } finally {
+   
Thread.currentThread().setContextClassLoader(previousClassloader);
+   }
+   }
+
// 

 
private static CommandLine createCommandLine(Deployment deployment, 
Options commandLineOptions) {
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 3b9e8e99b82..1318043faf1 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -219,14 +219,16 @@ public TableSchema getTableSchema(SessionContext session, 
String name) throws Sq
 
@Override
public String explainStatement(SessionContext session, String 
statement) throws SqlExecutionException {
-   final TableEnvironment tableEnv = 
getOrCreateExecutionContext(session)
+   final ExecutionContext context = 
getOrCreateExecutionContext(session);
+   final TableEnvironment tableEnv = context
.createEnvironmentInstance()
.getTableEnvironment();
 
// translate
try {
final Table table = createTable(tableEnv, statement);
-   return tableEnv.explain(table);
+   // explanation requires an optimization step that might 
reference UDFs during code compilation
+   return context.wrapClassLoader(() -> 
tableEnv.explain(table));
} catch (Throwable t) {
// catch everything such that the query does not crash 
the executor
throw new SqlExecutionException("Invalid SQL 
statement.", t);
@@ -242,7 +244,7 @@ public 

[jira] [Commented] (FLINK-10263) User-defined function with LITERAL paramters yields CompileException

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627711#comment-16627711
 ] 

ASF GitHub Bot commented on FLINK-10263:


asfgit closed pull request #6725: [FLINK-10263] [sql-client] Fix classloader 
issues in SQL Client
URL: https://github.com/apache/flink/pull/6725
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh 
b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
index b5830725db1..ca0251365e6 100755
--- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
@@ -212,6 +212,8 @@ tables:
 type: VARCHAR
   - name: duplicate_count
 type: BIGINT
+  - name: constant
+type: VARCHAR
 connector:
   type: filesystem
   path: $RESULT
@@ -226,6 +228,8 @@ tables:
   type: VARCHAR
 - name: duplicate_count
   type: BIGINT
+- name: constant
+  type: VARCHAR
 
 functions:
   - name: RegReplace
@@ -261,7 +265,7 @@ $FLINK_DIR/bin/sql-client.sh embedded \
 
 read -r -d '' SQL_STATEMENT_2 << EOF
 INSERT INTO CsvSinkTable
-  SELECT *
+  SELECT AvroBothTable.*, RegReplace('Test constant folding.', 'Test', 
'Success') AS constant
   FROM AvroBothTable
 EOF
 
@@ -285,4 +289,4 @@ for i in {1..10}; do
   sleep 5
 done
 
-check_result_hash "SQLClient" $RESULT "dca08a82cc09f6b19950291dbbef16bb"
+check_result_hash "SQLClient" $RESULT "0a1bf8bf716069b7269f575f87a802c0"
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 85b3e9265a8..552d0b37dca 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -75,6 +75,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Supplier;
 
 /**
  * Context for executing table programs. This class caches everything that can 
be cached across
@@ -183,6 +184,19 @@ public EnvironmentInstance createEnvironmentInstance() {
return tableSinks;
}
 
+   /**
+* Executes the given supplier using the execution context's 
classloader as thread classloader.
+*/
+   public  R wrapClassLoader(Supplier supplier) {
+   final ClassLoader previousClassloader = 
Thread.currentThread().getContextClassLoader();
+   Thread.currentThread().setContextClassLoader(classLoader);
+   try {
+   return supplier.get();
+   } finally {
+   
Thread.currentThread().setContextClassLoader(previousClassloader);
+   }
+   }
+
// 

 
private static CommandLine createCommandLine(Deployment deployment, 
Options commandLineOptions) {
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 3b9e8e99b82..1318043faf1 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -219,14 +219,16 @@ public TableSchema getTableSchema(SessionContext session, 
String name) throws Sq
 
@Override
public String explainStatement(SessionContext session, String 
statement) throws SqlExecutionException {
-   final TableEnvironment tableEnv = 
getOrCreateExecutionContext(session)
+   final ExecutionContext context = 
getOrCreateExecutionContext(session);
+   final TableEnvironment tableEnv = context
.createEnvironmentInstance()
.getTableEnvironment();
 
// translate
try {
final Table table = createTable(tableEnv, statement);
-   return tableEnv.explain(table);
+   // explanation requires an optimization step that might 
reference UDFs during code compilation
+   return context.wrapClassLoader(() -> 
tableEnv.explain(table));

[jira] [Resolved] (FLINK-6813) Add TIMESTAMPDIFF supported in SQL

2018-09-25 Thread Timo Walther (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-6813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-6813.
-
   Resolution: Fixed
Fix Version/s: 1.7.0

Fixed as part of FLINK-6847.

> Add TIMESTAMPDIFF supported in SQL
> --
>
> Key: FLINK-6813
> URL: https://issues.apache.org/jira/browse/FLINK-6813
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
> Fix For: 1.7.0
>
>
> TIMESTAMPDIFF(unit,datetime_expr1,datetime_expr2) Returns datetime_expr2 − 
> datetime_expr1, where datetime_expr1 and datetime_expr2 are date or datetime 
> expressions. One expression may be a date and the other a datetime; a date 
> value is treated as a datetime having the time part '00:00:00' where 
> necessary. The unit for the result (an integer) is given by the unit 
> argument. The legal values for unit are the same as those listed in the 
> description of the TIMESTAMPADD() function.
> * Syntax
> TIMESTAMPDIFF(unit,datetime_expr1,datetime_expr2) 
> -unit
> Is the part of datetime_expr1 and datetime_expr2 that specifies the type of 
> boundary crossed.
> -datetime_expr1
> Is an expression that can be resolved to a time, date.
> -datetime_expr2
> Same with startdate.
> * Example
> SELECT TIMESTAMPDIFF(year, '2015-12-31 23:59:59.999', '2017-01-01 
> 00:00:00.000')  from tab; --> 2
> * See more:
>   
> [MySQL|https://dev.mysql.com/doc/refman/5.7/en/date-and-time-functions.html#function_timestampdiff]
> CALCITE:
> {code}
>  SELECT timestampdiff(YEAR, timestamp '2019-06-01 07:01:11', timestamp 
> '2020-06-01 07:01:11'),timestampdiff(QUARTER, timestamp '2019-06-01 
> 07:01:11', timestamp '2020-06-01 07:01:11'),timestampdiff(MONTH, timestamp 
> '2019-06-01 07:01:11',timestamp '2020-06-01 07:01:11'),timestampdiff(WEEK, 
> timestamp '2019-06-01 07:01:11',timestamp '2020-06-01 
> 07:01:11'),timestampdiff(DAY, timestamp '2019-06-01 07:01:11',timestamp 
> '2020-06-01 07:01:11'),timestampdiff(HOUR, timestamp '2019-06-01 
> 07:01:11',timestamp '2020-06-01 07:01:11'),timestampdiff(MINUTE, timestamp 
> '2019-06-01 07:01:11',timestamp '2020-06-01 07:01:11'),timestampdiff(SECOND, 
> timestamp '2019-06-01 07:01:11',timestamp '2020-06-01 07:01:11') FROM depts;
> | 1 | 4 | 12 | **52** | 366| 8784| 527040 | 
> 31622400  
> {code}
> MSSQL:
> {code}
> SELECT
>   datediff(YEAR, '2019-06-01 07:01:11','2020-06-01 07:01:11'),
>   datediff(QUARTER, '2019-06-01 07:01:11', '2020-06-01 07:01:11'),
>   datediff(MONTH, '2019-06-01 07:01:11','2020-06-01 07:01:11'),
>   datediff(WEEK, '2019-06-01 07:01:11', '2020-06-01 07:01:11'),
>   datediff(DAY, '2019-06-01 07:01:11','2020-06-01 07:01:11'),
>   datediff(HOUR, '2019-06-01 07:01:11','2020-06-01 07:01:11'),
>   datediff(MINUTE, '2019-06-01 07:01:11','2020-06-01 07:01:11'),
>   datediff(SECOND,  '2019-06-01 07:01:11', '2020-06-01 07:01:11')
> FROM stu;
> |1|4  |12 |**53** |366|8784   |527040 |31622400
> {code}
> The differences I have discussed with the calcite community. And find the 
> reason: 
> https://stackoverflow.com/questions/26138167/is-timestampdiff-in-mysql-equivalent-to-datediff-in-sql-server.
> So, In this JIRA. we will keep consistency with calcite.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-6847) Add TIMESTAMPDIFF supported in TableAPI

2018-09-25 Thread Timo Walther (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-6847.
-
   Resolution: Fixed
Fix Version/s: 1.7.0

Fixed in 1.7.0: c5ce970e781df60eb27b62446853eaa0579c8706

> Add TIMESTAMPDIFF supported in TableAPI
> ---
>
> Key: FLINK-6847
> URL: https://issues.apache.org/jira/browse/FLINK-6847
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Priority: Major
>  Labels: pull-request-available, starter
> Fix For: 1.7.0
>
>
> see FLINK-6813



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6847) Add TIMESTAMPDIFF supported in TableAPI

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627664#comment-16627664
 ] 

ASF GitHub Bot commented on FLINK-6847:
---

asfgit closed pull request #6282: [FLINK-6847][FLINK-6813] [table] 
TimestampDiff table api and sql support
URL: https://github.com/apache/flink/pull/6282
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md
index eac6d16eaf8..7d9aee22956 100644
--- a/docs/dev/table/functions.md
+++ b/docs/dev/table/functions.md
@@ -3176,6 +3176,18 @@ TIMESTAMPADD(unit, interval, timevalue)
 E.g., TIMESTAMPADD(WEEK, 1, DATE '2003-01-02') returns 
2003-01-09.
   
 
+
+
+  
+{% highlight text %}
+TIMESTAMPDIFF(unit, timestamp1, timestamp2)
+{% endhighlight %}
+  
+  
+Returns the (signed) number of timeUnit intervals between 
timestamp1 and timestamp2. The unit for the interval is given by the unit 
argument, which should be one of the following values: SECOND, 
MINUTE, HOUR, DAY, WEEK, 
MONTH, QUARTER, or YEAR. The unit for 
the interval could refer Time Interval and Point Unit 
Specifiers table. E.g. TIMESTAMPDIFF(DAY, TIMESTAMP '2003-01-02 
10:00:00', TIMESTAMP '2003-01-03 10:00:00') leads to 1.
+  
+
+
   
 
 
@@ -3421,6 +3433,18 @@ dateFormat(TIMESTAMP, STRING)
 E.g., dateFormat(ts, '%Y, %d %M') results in strings 
formatted as "2017, 05 May".
   
 
+
+
+  
+{% highlight java %}
+timestampDiff(TimeIntervalUnit, datetime1, datetime2)
+{% endhighlight %}
+  
+  
+Returns the (signed) number of timeUnit intervals between datetime1 
and datetime2. The unit for the interval is given by the unit argument, which 
should be one of the following values: SECOND, 
MINUTE, HOUR, DAY, MONTH, 
or YEAR. The unit for the interval could refer Time Interval and Point Unit 
Specifiers table. E.g. timestampDiff(TimeIntervalUnit.DAY, 
'2003-01-02 10:00:00'.toTimestamp, '2003-01-03 10:00:00'.toTimestamp) 
leads to 1.
+  
+
+
 
 
 
@@ -3666,6 +3690,18 @@ dateFormat(TIMESTAMP, STRING)
 E.g., dateFormat('ts, "%Y, %d %M") results in strings 
formatted as "2017, 05 May".
   
 
+
+
+  
+{% highlight scala %}
+timestampDiff(TimeIntervalUnit, datetime1, datetime2)
+{% endhighlight %}
+  
+  
+Returns the (signed) number of timeUnit intervals between datetime1 
and datetime2. The unit for the interval is given by the unit argument, which 
should be one of the following values: SECOND, 
MINUTE, HOUR, DAY, MONTH, 
or YEAR. The unit for the interval could refer Time Interval and Point Unit 
Specifiers table. E.g. timestampDiff(TimeIntervalUnit.DAY, 
'2003-01-02 10:00:00'.toTimestamp, '2003-01-03 10:00:00'.toTimestamp) 
leads to 1.
+  
+
+
   
 
 
@@ -5319,4 +5355,65 @@ The following table lists specifiers for date format 
functions.
   
 
 
+Time Interval and Point Unit Specifiers
+--
+
+The following table lists specifiers for time interval and point unit.
+
+
+  
+
+  Interval Unit
+  Point Unit
+
+  
+  
+  YEAR
+  YEAR
+  
+  QUARTER
+  QUARTER
+  
+  MONTH
+  MONTH
+  
+  WEEK
+  WEEK
+  
+  DAY
+  DAY
+  
+  HOUR
+  HOUR
+  
+  MINUTE
+  MINUTE
+  
+  SECOND
+  SECOND
+  
+  YEAR_TO_MONTH
+  MICROSECOND
+  
+  DAY_TO_HOUR
+  MILLISECOND
+  
+  
+  
+  
+  DAY_TO_SECOND
+  
+  
+  HOUR_TO_MINUTE
+  
+  
+  HOUR_TO_SECOND
+  
+  
+  MINUTE_TO_SECOND
+  
+  
+  
+
+
 {% top %}
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index dfe69cb0411..eb4aad7e40a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.api.{TableException, 
CurrentRow, CurrentRange, Unb
 import org.apache.flink.table.expressions.ExpressionUtils.{convertArray, 
toMilliInterval, toMonthInterval, toRowInterval}
 import org.apache.flink.table.api.Table
 import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit
+import org.apache.flink.table.expressions.TimePointUnit.TimePointUnit
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.AggregateFunction
 
@@ -1104,6 +1105,34 @@ object dateFormat {
   }
 }
 
+/**
+ * Returns the (signed) number of timeUnit intervals between timestamp1 and 
timestamp2.
+ *
+ * For example 

[GitHub] asfgit closed pull request #6282: [FLINK-6847][FLINK-6813] [table] TimestampDiff table api and sql support

2018-09-25 Thread GitBox
asfgit closed pull request #6282: [FLINK-6847][FLINK-6813] [table] 
TimestampDiff table api and sql support
URL: https://github.com/apache/flink/pull/6282
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md
index eac6d16eaf8..7d9aee22956 100644
--- a/docs/dev/table/functions.md
+++ b/docs/dev/table/functions.md
@@ -3176,6 +3176,18 @@ TIMESTAMPADD(unit, interval, timevalue)
 E.g., TIMESTAMPADD(WEEK, 1, DATE '2003-01-02') returns 
2003-01-09.
   
 
+
+
+  
+{% highlight text %}
+TIMESTAMPDIFF(unit, timestamp1, timestamp2)
+{% endhighlight %}
+  
+  
+Returns the (signed) number of timeUnit intervals between 
timestamp1 and timestamp2. The unit for the interval is given by the unit 
argument, which should be one of the following values: SECOND, 
MINUTE, HOUR, DAY, WEEK, 
MONTH, QUARTER, or YEAR. The unit for 
the interval could refer Time Interval and Point Unit 
Specifiers table. E.g. TIMESTAMPDIFF(DAY, TIMESTAMP '2003-01-02 
10:00:00', TIMESTAMP '2003-01-03 10:00:00') leads to 1.
+  
+
+
   
 
 
@@ -3421,6 +3433,18 @@ dateFormat(TIMESTAMP, STRING)
 E.g., dateFormat(ts, '%Y, %d %M') results in strings 
formatted as "2017, 05 May".
   
 
+
+
+  
+{% highlight java %}
+timestampDiff(TimeIntervalUnit, datetime1, datetime2)
+{% endhighlight %}
+  
+  
+Returns the (signed) number of timeUnit intervals between datetime1 
and datetime2. The unit for the interval is given by the unit argument, which 
should be one of the following values: SECOND, 
MINUTE, HOUR, DAY, MONTH, 
or YEAR. The unit for the interval could refer Time Interval and Point Unit 
Specifiers table. E.g. timestampDiff(TimeIntervalUnit.DAY, 
'2003-01-02 10:00:00'.toTimestamp, '2003-01-03 10:00:00'.toTimestamp) 
leads to 1.
+  
+
+
 
 
 
@@ -3666,6 +3690,18 @@ dateFormat(TIMESTAMP, STRING)
 E.g., dateFormat('ts, "%Y, %d %M") results in strings 
formatted as "2017, 05 May".
   
 
+
+
+  
+{% highlight scala %}
+timestampDiff(TimeIntervalUnit, datetime1, datetime2)
+{% endhighlight %}
+  
+  
+Returns the (signed) number of timeUnit intervals between datetime1 
and datetime2. The unit for the interval is given by the unit argument, which 
should be one of the following values: SECOND, 
MINUTE, HOUR, DAY, MONTH, 
or YEAR. The unit for the interval could refer Time Interval and Point Unit 
Specifiers table. E.g. timestampDiff(TimeIntervalUnit.DAY, 
'2003-01-02 10:00:00'.toTimestamp, '2003-01-03 10:00:00'.toTimestamp) 
leads to 1.
+  
+
+
   
 
 
@@ -5319,4 +5355,65 @@ The following table lists specifiers for date format 
functions.
   
 
 
+Time Interval and Point Unit Specifiers
+--
+
+The following table lists specifiers for time interval and point unit.
+
+
+  
+
+  Interval Unit
+  Point Unit
+
+  
+  
+  YEAR
+  YEAR
+  
+  QUARTER
+  QUARTER
+  
+  MONTH
+  MONTH
+  
+  WEEK
+  WEEK
+  
+  DAY
+  DAY
+  
+  HOUR
+  HOUR
+  
+  MINUTE
+  MINUTE
+  
+  SECOND
+  SECOND
+  
+  YEAR_TO_MONTH
+  MICROSECOND
+  
+  DAY_TO_HOUR
+  MILLISECOND
+  
+  
+  
+  
+  DAY_TO_SECOND
+  
+  
+  HOUR_TO_MINUTE
+  
+  
+  HOUR_TO_SECOND
+  
+  
+  MINUTE_TO_SECOND
+  
+  
+  
+
+
 {% top %}
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index dfe69cb0411..eb4aad7e40a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.api.{TableException, 
CurrentRow, CurrentRange, Unb
 import org.apache.flink.table.expressions.ExpressionUtils.{convertArray, 
toMilliInterval, toMonthInterval, toRowInterval}
 import org.apache.flink.table.api.Table
 import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit
+import org.apache.flink.table.expressions.TimePointUnit.TimePointUnit
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.AggregateFunction
 
@@ -1104,6 +1105,34 @@ object dateFormat {
   }
 }
 
+/**
+ * Returns the (signed) number of timeUnit intervals between timestamp1 and 
timestamp2.
+ *
+ * For example timestampDiff(TimeIntervalUnit.DAY, `2016-06-15`.toDate,
+ *  `2016-06-18`.toDate results in integer as 3
+ */
+object timestampDiff {
+
+  /**
+* Returns the (signed) number of timeUnit intervals between timestamp1 and 
timestamp2.
+*
+* For example 

[jira] [Issue Comment Deleted] (FLINK-10240) Pluggable scheduling strategy for batch jobs

2018-09-25 Thread tison (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tison updated FLINK-10240:
--
Comment: was deleted

(was: Introduce pluggable schedule strategy is an excellent idea that could 
expand a lot the cases Flink is able to handle.

I like this idea and can help. However, the document attached above is 
read-only. So I remain my comments as a link to a copy of it below. Most of 
them are layout improvements and minor reword, the body of document is no more 
than the original design.

https://docs.google.com/document/d/15pUYc5_yrY2IwmnADCoNWZwOIYCOcroWmuuZHs-vdlU/edit?usp=sharing

Note that this is a EDITABLE document and everyone interest on it can remains 
comments or edit it directly. As an open source software we just trust our 
contributors and the document could be frozen and left comment-only if the 
discussion reaches a consensus.)

> Pluggable scheduling strategy for batch jobs
> 
>
> Key: FLINK-10240
> URL: https://issues.apache.org/jira/browse/FLINK-10240
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination
>Reporter: Zhu Zhu
>Priority: Major
>  Labels: scheduling
>
> Currently batch jobs are scheduled with LAZY_FROM_SOURCES strategy: source 
> tasks are scheduled in the beginning, and other tasks are scheduled once 
> there input data are consumable.
> However, input data consumable does not always mean the task can work at 
> once. 
>  
> One example is the hash join operation, where the operator first consumes one 
> side(we call it build side) to setup a table, then consumes the other side(we 
> call it probe side) to do the real join work. If the probe side is started 
> early, it just get stuck on back pressure as the join operator will not 
> consume data from it before the building stage is done, causing a waste of 
> resources.
> If we have the probe side task started after the build stage is done, both 
> the build and probe side can have more computing resources as they are 
> staggered.
>  
> That's why we think a flexible scheduling strategy is needed, allowing job 
> owners to customize the vertex schedule order and constraints. Better 
> resource utilization usually means better performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10292) Generate JobGraph in StandaloneJobClusterEntrypoint only once

2018-09-25 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627538#comment-16627538
 ] 

vinoyang commented on FLINK-10292:
--

[~uce] Thank you for verifying, my understanding of "image". Then I give some 
of my thoughts about the questions you asked before. In fact, I think we can't 
help but think about running in image(container), and we can't just consider 
the image scene. In my opinion, there is no essential difference between 
standalone and running in docker/k8s (virtualization can be seen as a 
micro-standalone model? Is it also dependent on Zookeeper in the same 
environment).
 * Regarding what you said: Killing rather than canceling the scene, the 
resulting metadata persists, and the same happens in standalone, which can't be 
avoided, because this is not a normal operation.
 * Regarding what you said "update the image with a modified application" may 
cause the new jobGraph to fail to overwrite the old jobgraph, it should not 
happen. Because this can be seen as a scenario of a job upgrade. In order to 
achieve this, our first step should be to execute "cancel with savepoint", then 
the job will be resubmitted (of course we should recognize it and update the 
metadata of the jobgraph stored in Zookeeper), then restore based on savepoint .

My personal opinion is that Flink on k8s does not seem to need to be considered 
separately on this issue.

> Generate JobGraph in StandaloneJobClusterEntrypoint only once
> -
>
> Key: FLINK-10292
> URL: https://issues.apache.org/jira/browse/FLINK-10292
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.7.0, 1.6.2
>
>
> Currently the {{StandaloneJobClusterEntrypoint}} generates the {{JobGraph}} 
> from the given user code every time it starts/is restarted. This can be 
> problematic if the the {{JobGraph}} generation has side effects. Therefore, 
> it would be better to generate the {{JobGraph}} only once and store it in HA 
> storage instead from where to retrieve.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627516#comment-16627516
 ] 

ASF GitHub Bot commented on FLINK-9377:
---

dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state 
backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220213096
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleConfigSnapshot.java
 ##
 @@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * A utility {@link TypeSerializerConfigSnapshot} that is used for backwards 
compatibility purposes.
+ *
+ * In older versions of Flink (<= 1.6), we used to write state serializers 
into checkpoints, along
+ * with the serializer's configuration snapshot. Since 1.7.0, we no longer 
wrote the serializers, but
+ * instead used the configuration snapshot as a factory to instantiate 
serializers for restoring state.
+ * However, since some outdated implementations of configuration snapshots did 
not contain sufficient
 
 Review comment:
   `did` -> `do` I would use present tense here, as you describe current state, 
don't you?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove writing serializers as part of the checkpoint meta information
> -
>
> Key: FLINK-9377
> URL: https://issues.apache.org/jira/browse/FLINK-9377
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> When writing meta information of a state in savepoints, we currently write 
> both the state serializer as well as the state serializer's configuration 
> snapshot.
> Writing both is actually redundant, as most of the time they have identical 
> information.
>  Moreover, the fact that we use Java serialization to write the serializer 
> and rely on it to be re-readable on the restore run, already poses problems 
> for serializers such as the {{AvroSerializer}} (see discussion in FLINK-9202) 
> to perform even a compatible upgrade.
> The proposal here is to leave only the config snapshot as meta information, 
> and use that as the single source of truth of information about the schema of 
> serialized state.
>  The config snapshot should be treated as a factory (or provided to a 
> factory) to re-create serializers capable of reading old, serialized state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627521#comment-16627521
 ] 

ASF GitHub Bot commented on FLINK-9377:
---

dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state 
backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220212410
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleConfigSnapshot.java
 ##
 @@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * A utility {@link TypeSerializerConfigSnapshot} that is used for backwards 
compatibility purposes.
+ *
+ * In older versions of Flink (<= 1.6), we used to write state serializers 
into checkpoints, along
+ * with the serializer's configuration snapshot. Since 1.7.0, we no longer 
wrote the serializers, but
+ * instead used the configuration snapshot as a factory to instantiate 
serializers for restoring state.
 
 Review comment:
   `used` -> `use`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove writing serializers as part of the checkpoint meta information
> -
>
> Key: FLINK-9377
> URL: https://issues.apache.org/jira/browse/FLINK-9377
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> When writing meta information of a state in savepoints, we currently write 
> both the state serializer as well as the state serializer's configuration 
> snapshot.
> Writing both is actually redundant, as most of the time they have identical 
> information.
>  Moreover, the fact that we use Java serialization to write the serializer 
> and rely on it to be re-readable on the restore run, already poses problems 
> for serializers such as the {{AvroSerializer}} (see discussion in FLINK-9202) 
> to perform even a compatible upgrade.
> The proposal here is to leave only the config snapshot as meta information, 
> and use that as the single source of truth of information about the schema of 
> serialized state.
>  The config snapshot should be treated as a factory (or provided to a 
> factory) to re-create serializers capable of reading old, serialized state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627518#comment-16627518
 ] 

ASF GitHub Bot commented on FLINK-9377:
---

dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state 
backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220211873
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleConfigSnapshot.java
 ##
 @@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * A utility {@link TypeSerializerConfigSnapshot} that is used for backwards 
compatibility purposes.
+ *
+ * In older versions of Flink (<= 1.6), we used to write state serializers 
into checkpoints, along
+ * with the serializer's configuration snapshot. Since 1.7.0, we no longer 
wrote the serializers, but
 
 Review comment:
   `wrote` -> `write`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove writing serializers as part of the checkpoint meta information
> -
>
> Key: FLINK-9377
> URL: https://issues.apache.org/jira/browse/FLINK-9377
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> When writing meta information of a state in savepoints, we currently write 
> both the state serializer as well as the state serializer's configuration 
> snapshot.
> Writing both is actually redundant, as most of the time they have identical 
> information.
>  Moreover, the fact that we use Java serialization to write the serializer 
> and rely on it to be re-readable on the restore run, already poses problems 
> for serializers such as the {{AvroSerializer}} (see discussion in FLINK-9202) 
> to perform even a compatible upgrade.
> The proposal here is to leave only the config snapshot as meta information, 
> and use that as the single source of truth of information about the schema of 
> serialized state.
>  The config snapshot should be treated as a factory (or provided to a 
> factory) to re-create serializers capable of reading old, serialized state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints

2018-09-25 Thread GitBox
dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state 
backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220211873
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleConfigSnapshot.java
 ##
 @@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * A utility {@link TypeSerializerConfigSnapshot} that is used for backwards 
compatibility purposes.
+ *
+ * In older versions of Flink (<= 1.6), we used to write state serializers 
into checkpoints, along
+ * with the serializer's configuration snapshot. Since 1.7.0, we no longer 
wrote the serializers, but
 
 Review comment:
   `wrote` -> `write`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints

2018-09-25 Thread GitBox
dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state 
backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220203782
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
 ##
 @@ -21,35 +21,131 @@
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
 
+import java.io.IOException;
+
 /**
  * A {@code TypeSerializerConfigSnapshot} is a point-in-time view of a {@link 
TypeSerializer's} configuration.
- * The configuration snapshot of a serializer is persisted along with 
checkpoints of the managed state that the
- * serializer is registered to.
+ * The configuration snapshot of a serializer is persisted within checkpoints
+ * as a single source of meta information about the schema of serialized data 
in the checkpoint.
+ * This serves three purposes:
+ *
+ * 
+ *   Capturing serializer parameters and schema: a 
serializer's configuration snapshot
+ *   represents information about the parameters, state, and schema of a 
serializer.
+ *   This is explained in more detail below.
  *
- * The persisted configuration may later on be used by new serializers to 
ensure serialization compatibility
- * for the same managed state. In order for new serializers to be able to 
ensure this, the configuration snapshot
- * should encode sufficient information about:
+ *   Compatibility checks for new serializers: when new 
serializers are available,
+ *   they need to be checked whether or not they are compatible to read the 
data written by the previous serializer.
+ *   This is performed by providing the new serializer to the correspondibng 
serializer configuration
 
 Review comment:
   `correspondibng` -> `corresponding`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints

2018-09-25 Thread GitBox
dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state 
backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220232529
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSchemaCompatibility.java
 ##
 @@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@code TypeSerializerSchemaCompatibility} represents information about 
whether or not a {@link TypeSerializer}
+ * can be safely used to read data written by a previous type serializer.
+ *
+ * Typically, the compatibility of the new serializer is resolved by 
checking it against the snapshotted
+ * {@link TypeSerializerConfigSnapshot} of the previous serializer. Depending 
on the type of the
+ * resolved compatibility result, migration (i.e., reading bytes with the 
previous serializer and then writing
+ * it again with the new serializer) may be required before the new serializer 
can be used.
+ *
+ * @see TypeSerializer
+ * @see TypeSerializerConfigSnapshot#resolveSchemaCompatibility(TypeSerializer)
+ */
+@PublicEvolving
+public class TypeSerializerSchemaCompatibility {
+
+   /**
+* Enum for the type of the compatibility.
+*/
+   enum Type {
+
+   /** This indicates that the new serializer continued to be used 
as is. */
+   COMPATIBLE_AS_IS,
+
+   /**
+* This indicates that it is required to reconfigure the new 
serializer before
+* it can be used. The reconfigured serializer should be 
provided as part of the
+* resolved {@link TypeSerializerSchemaCompatibility} result.
+*/
+   COMPATIBLE_AFTER_RECONFIGURATION,
+
+   /**
+* This indicates that it is possible to use the new serializer 
after performing a
+* full-scan migration over all state, by reading bytes with 
the previous serializer
+* and then writing it again with the new serializer, 
effectively converting the
+* serialization schema to correspond to the new serializer.
+*/
+   COMPATIBLE_AFTER_MIGRATION,
+
+   /**
+* This indicates that the new serializer is incompatible, even 
with migration.
+* This normally implies that the deserialized Java class can 
not be commonly recognized
+* by the previous and new serializer.
+*/
+   INCOMPATIBLE
+   }
+
+   /**
+* The type of the compatibility.
+*/
+   private final Type resultType;
+
+   /**
+* The reconfigured new serializer to use. This is only relevant
+* in the case that the type of the compatibility is {@link 
Type#COMPATIBLE_AFTER_RECONFIGURATION}.
+*/
+   private final TypeSerializer reconfiguredNewSerializer;
+
+   /**
+* Returns a result that indicates that the new serializer is 
compatible and no migration is required.
+* The new serializer can continued to be used as is.
+*
+* @return a result that indicates migration is not required for the 
new serializer.
+*/
+   public static  TypeSerializerSchemaCompatibility compatibleAsIs() 
{
+   return new 
TypeSerializerSchemaCompatibility<>(Type.COMPATIBLE_AS_IS, null);
+   }
+
+   /**
+* Returns a result that indicates that no migration is required, but 
the new serializer had to be
+* reconfigured in order for it to be compatible. A reconfigured 
serializer is provided and
+* should be used instead.
+*
+* @param reconfiguredSerializer the reconfigured new serializer that 
should be used.
+*
+* @return a result that indicates migration is not required, but a 
reconfigured version of the new
+* serializer should be used.
+*/
+   public static  

[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627520#comment-16627520
 ] 

ASF GitHub Bot commented on FLINK-9377:
---

dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state 
backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220203782
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
 ##
 @@ -21,35 +21,131 @@
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
 
+import java.io.IOException;
+
 /**
  * A {@code TypeSerializerConfigSnapshot} is a point-in-time view of a {@link 
TypeSerializer's} configuration.
- * The configuration snapshot of a serializer is persisted along with 
checkpoints of the managed state that the
- * serializer is registered to.
+ * The configuration snapshot of a serializer is persisted within checkpoints
+ * as a single source of meta information about the schema of serialized data 
in the checkpoint.
+ * This serves three purposes:
+ *
+ * 
+ *   Capturing serializer parameters and schema: a 
serializer's configuration snapshot
+ *   represents information about the parameters, state, and schema of a 
serializer.
+ *   This is explained in more detail below.
  *
- * The persisted configuration may later on be used by new serializers to 
ensure serialization compatibility
- * for the same managed state. In order for new serializers to be able to 
ensure this, the configuration snapshot
- * should encode sufficient information about:
+ *   Compatibility checks for new serializers: when new 
serializers are available,
+ *   they need to be checked whether or not they are compatible to read the 
data written by the previous serializer.
+ *   This is performed by providing the new serializer to the correspondibng 
serializer configuration
 
 Review comment:
   `correspondibng` -> `corresponding`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove writing serializers as part of the checkpoint meta information
> -
>
> Key: FLINK-9377
> URL: https://issues.apache.org/jira/browse/FLINK-9377
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> When writing meta information of a state in savepoints, we currently write 
> both the state serializer as well as the state serializer's configuration 
> snapshot.
> Writing both is actually redundant, as most of the time they have identical 
> information.
>  Moreover, the fact that we use Java serialization to write the serializer 
> and rely on it to be re-readable on the restore run, already poses problems 
> for serializers such as the {{AvroSerializer}} (see discussion in FLINK-9202) 
> to perform even a compatible upgrade.
> The proposal here is to leave only the config snapshot as meta information, 
> and use that as the single source of truth of information about the schema of 
> serialized state.
>  The config snapshot should be treated as a factory (or provided to a 
> factory) to re-create serializers capable of reading old, serialized state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627519#comment-16627519
 ] 

ASF GitHub Bot commented on FLINK-9377:
---

dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state 
backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220232529
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSchemaCompatibility.java
 ##
 @@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@code TypeSerializerSchemaCompatibility} represents information about 
whether or not a {@link TypeSerializer}
+ * can be safely used to read data written by a previous type serializer.
+ *
+ * Typically, the compatibility of the new serializer is resolved by 
checking it against the snapshotted
+ * {@link TypeSerializerConfigSnapshot} of the previous serializer. Depending 
on the type of the
+ * resolved compatibility result, migration (i.e., reading bytes with the 
previous serializer and then writing
+ * it again with the new serializer) may be required before the new serializer 
can be used.
+ *
+ * @see TypeSerializer
+ * @see TypeSerializerConfigSnapshot#resolveSchemaCompatibility(TypeSerializer)
+ */
+@PublicEvolving
+public class TypeSerializerSchemaCompatibility {
+
+   /**
+* Enum for the type of the compatibility.
+*/
+   enum Type {
+
+   /** This indicates that the new serializer continued to be used 
as is. */
+   COMPATIBLE_AS_IS,
+
+   /**
+* This indicates that it is required to reconfigure the new 
serializer before
+* it can be used. The reconfigured serializer should be 
provided as part of the
+* resolved {@link TypeSerializerSchemaCompatibility} result.
+*/
+   COMPATIBLE_AFTER_RECONFIGURATION,
+
+   /**
+* This indicates that it is possible to use the new serializer 
after performing a
+* full-scan migration over all state, by reading bytes with 
the previous serializer
+* and then writing it again with the new serializer, 
effectively converting the
+* serialization schema to correspond to the new serializer.
+*/
+   COMPATIBLE_AFTER_MIGRATION,
+
+   /**
+* This indicates that the new serializer is incompatible, even 
with migration.
+* This normally implies that the deserialized Java class can 
not be commonly recognized
+* by the previous and new serializer.
+*/
+   INCOMPATIBLE
+   }
+
+   /**
+* The type of the compatibility.
+*/
+   private final Type resultType;
+
+   /**
+* The reconfigured new serializer to use. This is only relevant
+* in the case that the type of the compatibility is {@link 
Type#COMPATIBLE_AFTER_RECONFIGURATION}.
+*/
+   private final TypeSerializer reconfiguredNewSerializer;
+
+   /**
+* Returns a result that indicates that the new serializer is 
compatible and no migration is required.
+* The new serializer can continued to be used as is.
+*
+* @return a result that indicates migration is not required for the 
new serializer.
+*/
+   public static  TypeSerializerSchemaCompatibility compatibleAsIs() 
{
+   return new 
TypeSerializerSchemaCompatibility<>(Type.COMPATIBLE_AS_IS, null);
+   }
+
+   /**
+* Returns a result that indicates that no migration is required, but 
the new serializer had to be
+* reconfigured in order for it to be compatible. A reconfigured 
serializer is provided and
+* should be used instead.
+*
+* @param reconfiguredSerializer the reconfigured new 

[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627517#comment-16627517
 ] 

ASF GitHub Bot commented on FLINK-9377:
---

dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state 
backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220227910
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
 ##
 @@ -21,35 +21,131 @@
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
 
+import java.io.IOException;
+
 /**
  * A {@code TypeSerializerConfigSnapshot} is a point-in-time view of a {@link 
TypeSerializer's} configuration.
- * The configuration snapshot of a serializer is persisted along with 
checkpoints of the managed state that the
- * serializer is registered to.
+ * The configuration snapshot of a serializer is persisted within checkpoints
+ * as a single source of meta information about the schema of serialized data 
in the checkpoint.
+ * This serves three purposes:
+ *
+ * 
+ *   Capturing serializer parameters and schema: a 
serializer's configuration snapshot
+ *   represents information about the parameters, state, and schema of a 
serializer.
+ *   This is explained in more detail below.
  *
- * The persisted configuration may later on be used by new serializers to 
ensure serialization compatibility
- * for the same managed state. In order for new serializers to be able to 
ensure this, the configuration snapshot
- * should encode sufficient information about:
+ *   Compatibility checks for new serializers: when new 
serializers are available,
+ *   they need to be checked whether or not they are compatible to read the 
data written by the previous serializer.
+ *   This is performed by providing the new serializer to the correspondibng 
serializer configuration
+ *   snapshots in checkpoints.
+ *
+ *   Factory for a read serializer when schema conversion is 
required: in the case that new
+ *   serializers are not compatible to read previous data, a schema conversion 
process executed across all data
+ *   is required before the new serializer can be continued to be used. This 
conversion process requires a compatible
+ *   read serializer to restore serialized bytes as objects, and then written 
back again using the new serializer.
+ *   In this scenario, the serializer configuration snapshots in checkpoints 
doubles as a factory for the read
+ *   serializer of the conversion process.
+ * 
+ *
+ * Serializer Configuration and Schema
+ *
+ * Since serializer configuration snapshots needs to be used to ensure 
serialization compatibility
+ * for the same managed state as well as serving as a factory for compatible 
read serializers, the configuration
+ * snapshot should encode sufficient information about:
  *
  * 
  *   Parameter settings of the serializer: parameters of 
the serializer include settings
  *   required to setup the serializer, or the state of the serializer if it is 
stateful. If the serializer
  *   has nested serializers, then the configuration snapshot should also 
contain the parameters of the nested
  *   serializers.
  *
- *   Serialization schema of the serializer: the data 
format used by the serializer.
+ *   Serialization schema of the serializer: the binary 
format used by the serializer, or
+ *   in other words, the schema of data written by the serializer.
  * 
  *
  * NOTE: Implementations must contain the default empty nullary 
constructor. This is required to be able to
  * deserialize the configuration snapshot from its binary form.
+ *
+ * @param  The data type that the originating serializer of this 
configuration serializes.
  */
 @PublicEvolving
-public abstract class TypeSerializerConfigSnapshot extends 
VersionedIOReadableWritable {
+public abstract class TypeSerializerConfigSnapshot extends 
VersionedIOReadableWritable {
 
/** The user code class loader; only relevant if this configuration 
instance was deserialized from binary form. */
private ClassLoader userCodeClassLoader;
 
+   /**
+* The originating serializer of this configuration snapshot.
+*
+* TODO to allow for incrementally adapting the implementation of 
serializer config snapshot subclasses,
+* TODO we currently have a base implementation for the {@link 
#restoreSerializer()}
+* TODO method which simply returns this serializer instance. The 
serializer is written
+* TODO and read using Java serialization as part of reading / writing 
the config snapshot
+*/
+   private TypeSerializer 

[GitHub] dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints

2018-09-25 Thread GitBox
dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state 
backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220212410
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleConfigSnapshot.java
 ##
 @@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * A utility {@link TypeSerializerConfigSnapshot} that is used for backwards 
compatibility purposes.
+ *
+ * In older versions of Flink (<= 1.6), we used to write state serializers 
into checkpoints, along
+ * with the serializer's configuration snapshot. Since 1.7.0, we no longer 
wrote the serializers, but
+ * instead used the configuration snapshot as a factory to instantiate 
serializers for restoring state.
 
 Review comment:
   `used` -> `use`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints

2018-09-25 Thread GitBox
dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state 
backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220213096
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleConfigSnapshot.java
 ##
 @@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * A utility {@link TypeSerializerConfigSnapshot} that is used for backwards 
compatibility purposes.
+ *
+ * In older versions of Flink (<= 1.6), we used to write state serializers 
into checkpoints, along
+ * with the serializer's configuration snapshot. Since 1.7.0, we no longer 
wrote the serializers, but
+ * instead used the configuration snapshot as a factory to instantiate 
serializers for restoring state.
+ * However, since some outdated implementations of configuration snapshots did 
not contain sufficient
 
 Review comment:
   `did` -> `do` I would use present tense here, as you describe current state, 
don't you?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints

2018-09-25 Thread GitBox
dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state 
backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220227910
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
 ##
 @@ -21,35 +21,131 @@
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
 
+import java.io.IOException;
+
 /**
  * A {@code TypeSerializerConfigSnapshot} is a point-in-time view of a {@link 
TypeSerializer's} configuration.
- * The configuration snapshot of a serializer is persisted along with 
checkpoints of the managed state that the
- * serializer is registered to.
+ * The configuration snapshot of a serializer is persisted within checkpoints
+ * as a single source of meta information about the schema of serialized data 
in the checkpoint.
+ * This serves three purposes:
+ *
+ * 
+ *   Capturing serializer parameters and schema: a 
serializer's configuration snapshot
+ *   represents information about the parameters, state, and schema of a 
serializer.
+ *   This is explained in more detail below.
  *
- * The persisted configuration may later on be used by new serializers to 
ensure serialization compatibility
- * for the same managed state. In order for new serializers to be able to 
ensure this, the configuration snapshot
- * should encode sufficient information about:
+ *   Compatibility checks for new serializers: when new 
serializers are available,
+ *   they need to be checked whether or not they are compatible to read the 
data written by the previous serializer.
+ *   This is performed by providing the new serializer to the correspondibng 
serializer configuration
+ *   snapshots in checkpoints.
+ *
+ *   Factory for a read serializer when schema conversion is 
required: in the case that new
+ *   serializers are not compatible to read previous data, a schema conversion 
process executed across all data
+ *   is required before the new serializer can be continued to be used. This 
conversion process requires a compatible
+ *   read serializer to restore serialized bytes as objects, and then written 
back again using the new serializer.
+ *   In this scenario, the serializer configuration snapshots in checkpoints 
doubles as a factory for the read
+ *   serializer of the conversion process.
+ * 
+ *
+ * Serializer Configuration and Schema
+ *
+ * Since serializer configuration snapshots needs to be used to ensure 
serialization compatibility
+ * for the same managed state as well as serving as a factory for compatible 
read serializers, the configuration
+ * snapshot should encode sufficient information about:
  *
  * 
  *   Parameter settings of the serializer: parameters of 
the serializer include settings
  *   required to setup the serializer, or the state of the serializer if it is 
stateful. If the serializer
  *   has nested serializers, then the configuration snapshot should also 
contain the parameters of the nested
  *   serializers.
  *
- *   Serialization schema of the serializer: the data 
format used by the serializer.
+ *   Serialization schema of the serializer: the binary 
format used by the serializer, or
+ *   in other words, the schema of data written by the serializer.
  * 
  *
  * NOTE: Implementations must contain the default empty nullary 
constructor. This is required to be able to
  * deserialize the configuration snapshot from its binary form.
+ *
+ * @param  The data type that the originating serializer of this 
configuration serializes.
  */
 @PublicEvolving
-public abstract class TypeSerializerConfigSnapshot extends 
VersionedIOReadableWritable {
+public abstract class TypeSerializerConfigSnapshot extends 
VersionedIOReadableWritable {
 
/** The user code class loader; only relevant if this configuration 
instance was deserialized from binary form. */
private ClassLoader userCodeClassLoader;
 
+   /**
+* The originating serializer of this configuration snapshot.
+*
+* TODO to allow for incrementally adapting the implementation of 
serializer config snapshot subclasses,
+* TODO we currently have a base implementation for the {@link 
#restoreSerializer()}
+* TODO method which simply returns this serializer instance. The 
serializer is written
+* TODO and read using Java serialization as part of reading / writing 
the config snapshot
+*/
+   private TypeSerializer serializer;
+
+   /**
+* Creates a serializer using this configuration, that is capable of 
reading data
+* written by the serializer described by this configuration.
+*
+* @return the restored serializer.
+ 

[jira] [Commented] (FLINK-10386) Remove legacy class TaskExecutionStateListener

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627509#comment-16627509
 ] 

ASF GitHub Bot commented on FLINK-10386:


TisonKun commented on a change in pull request #6729: [FLINK-10386] 
[taskmanager] Remove legacy class TaskExecutionStateListener
URL: https://github.com/apache/flink/pull/6729#discussion_r220238017
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 ##
 @@ -429,7 +413,7 @@ public void testFailExternallyDuringInvoke() {
awaitLatch.await();
 
task.failExternally(new Exception("test"));
-   assertTrue(task.getExecutionState() == 
ExecutionState.FAILED);
 
 Review comment:
   FYI [FLINK-10426](https://issues.apache.org/jira/browse/FLINK-10426), you 
can take over it if have idea of that anytime before I start processing it :-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove legacy class TaskExecutionStateListener
> --
>
> Key: FLINK-10386
> URL: https://issues.apache.org/jira/browse/FLINK-10386
> Project: Flink
>  Issue Type: Sub-task
>  Components: TaskManager
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> After a discussion 
> [here|https://github.com/apache/flink/commit/0735b5b935b0c0757943e2d58047afcfb9949560#commitcomment-30584257]
>  with [~trohrm...@apache.org]. I start to analyze the usage of 
> {{ActorGatewayTaskExecutionStateListener}} and {{TaskExecutionStateListener}}.
> In conclusion, we abort {{TaskExecutionStateListener}} strategy and no any 
> component rely on it. Instead, we introduce {{TaskManagerActions}} to take 
> the role for the communication of {{Task}} with {{TaskManager}}. No one 
> except {{TaskManager}} should directly communicate with {{Task}}. So it can 
> be safely remove legacy class {{TaskExecutionStateListener}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on a change in pull request #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener

2018-09-25 Thread GitBox
TisonKun commented on a change in pull request #6729: [FLINK-10386] 
[taskmanager] Remove legacy class TaskExecutionStateListener
URL: https://github.com/apache/flink/pull/6729#discussion_r220238017
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 ##
 @@ -429,7 +413,7 @@ public void testFailExternallyDuringInvoke() {
awaitLatch.await();
 
task.failExternally(new Exception("test"));
-   assertTrue(task.getExecutionState() == 
ExecutionState.FAILED);
 
 Review comment:
   FYI [FLINK-10426](https://issues.apache.org/jira/browse/FLINK-10426), you 
can take over it if have idea of that anytime before I start processing it :-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10386) Remove legacy class TaskExecutionStateListener

2018-09-25 Thread tison (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tison updated FLINK-10386:
--
Issue Type: Sub-task  (was: Improvement)
Parent: FLINK-10392

> Remove legacy class TaskExecutionStateListener
> --
>
> Key: FLINK-10386
> URL: https://issues.apache.org/jira/browse/FLINK-10386
> Project: Flink
>  Issue Type: Sub-task
>  Components: TaskManager
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> After a discussion 
> [here|https://github.com/apache/flink/commit/0735b5b935b0c0757943e2d58047afcfb9949560#commitcomment-30584257]
>  with [~trohrm...@apache.org]. I start to analyze the usage of 
> {{ActorGatewayTaskExecutionStateListener}} and {{TaskExecutionStateListener}}.
> In conclusion, we abort {{TaskExecutionStateListener}} strategy and no any 
> component rely on it. Instead, we introduce {{TaskManagerActions}} to take 
> the role for the communication of {{Task}} with {{TaskManager}}. No one 
> except {{TaskManager}} should directly communicate with {{Task}}. So it can 
> be safely remove legacy class {{TaskExecutionStateListener}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10426) Port TaskTest to new code base

2018-09-25 Thread tison (JIRA)
tison created FLINK-10426:
-

 Summary: Port TaskTest to new code base
 Key: FLINK-10426
 URL: https://issues.apache.org/jira/browse/FLINK-10426
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Affects Versions: 1.7.0
Reporter: tison
Assignee: tison
 Fix For: 1.7.0


Port {{TaskTest}} to new code base



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10425) taskmaster.host is not respected

2018-09-25 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10425?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-10425:


Assignee: vinoyang

> taskmaster.host is not respected
> 
>
> Key: FLINK-10425
> URL: https://issues.apache.org/jira/browse/FLINK-10425
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.6.1
>Reporter: Andrew Kowpak
>Assignee: vinoyang
>Priority: Major
>
> The documentation states that taskmanager.host can be set to override the 
> discovered hostname, however, setting this value has no effect.
> Looking at the code, the value never seems to be used.  Instead, the 
> deprecated taskmanager.hostname is still used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10386) Remove legacy class TaskExecutionStateListener

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627498#comment-16627498
 ] 

ASF GitHub Bot commented on FLINK-10386:


TisonKun commented on a change in pull request #6729: [FLINK-10386] 
[taskmanager] Remove legacy class TaskExecutionStateListener
URL: https://github.com/apache/flink/pull/6729#discussion_r220236164
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 ##
 @@ -429,7 +413,7 @@ public void testFailExternallyDuringInvoke() {
awaitLatch.await();
 
task.failExternally(new Exception("test"));
-   assertTrue(task.getExecutionState() == 
ExecutionState.FAILED);
 
 Review comment:
   @Clark Thanks for your review! Please see also my first comment.
   `TaskTest` is a test based on legacy code, would be ported to FLIP-6 
codebase soon as a sub task of FLINK-10392. All changes under `TaskTest` is 
temporary. Once the porting job done and if this PR doesn't get merged then, I 
would do a rebase.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove legacy class TaskExecutionStateListener
> --
>
> Key: FLINK-10386
> URL: https://issues.apache.org/jira/browse/FLINK-10386
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> After a discussion 
> [here|https://github.com/apache/flink/commit/0735b5b935b0c0757943e2d58047afcfb9949560#commitcomment-30584257]
>  with [~trohrm...@apache.org]. I start to analyze the usage of 
> {{ActorGatewayTaskExecutionStateListener}} and {{TaskExecutionStateListener}}.
> In conclusion, we abort {{TaskExecutionStateListener}} strategy and no any 
> component rely on it. Instead, we introduce {{TaskManagerActions}} to take 
> the role for the communication of {{Task}} with {{TaskManager}}. No one 
> except {{TaskManager}} should directly communicate with {{Task}}. So it can 
> be safely remove legacy class {{TaskExecutionStateListener}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on a change in pull request #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener

2018-09-25 Thread GitBox
TisonKun commented on a change in pull request #6729: [FLINK-10386] 
[taskmanager] Remove legacy class TaskExecutionStateListener
URL: https://github.com/apache/flink/pull/6729#discussion_r220236164
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 ##
 @@ -429,7 +413,7 @@ public void testFailExternallyDuringInvoke() {
awaitLatch.await();
 
task.failExternally(new Exception("test"));
-   assertTrue(task.getExecutionState() == 
ExecutionState.FAILED);
 
 Review comment:
   @Clark Thanks for your review! Please see also my first comment.
   `TaskTest` is a test based on legacy code, would be ported to FLIP-6 
codebase soon as a sub task of FLINK-10392. All changes under `TaskTest` is 
temporary. Once the porting job done and if this PR doesn't get merged then, I 
would do a rebase.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10386) Remove legacy class TaskExecutionStateListener

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627496#comment-16627496
 ] 

ASF GitHub Bot commented on FLINK-10386:


TisonKun commented on a change in pull request #6729: [FLINK-10386] 
[taskmanager] Remove legacy class TaskExecutionStateListener
URL: https://github.com/apache/flink/pull/6729#discussion_r220236164
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 ##
 @@ -429,7 +413,7 @@ public void testFailExternallyDuringInvoke() {
awaitLatch.await();
 
task.failExternally(new Exception("test"));
-   assertTrue(task.getExecutionState() == 
ExecutionState.FAILED);
 
 Review comment:
   @Clark Thanks for your review! Please see also my first comment.
   `TaskTest` is a test based on legacy code, would be ported to FLIP-6 
codebase soon as a sub task of FLINK-10392. All changes under `TaskTest` is 
temporary. Once the porting job done and if this PR doesn't get merged, I would 
do a rebase.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove legacy class TaskExecutionStateListener
> --
>
> Key: FLINK-10386
> URL: https://issues.apache.org/jira/browse/FLINK-10386
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> After a discussion 
> [here|https://github.com/apache/flink/commit/0735b5b935b0c0757943e2d58047afcfb9949560#commitcomment-30584257]
>  with [~trohrm...@apache.org]. I start to analyze the usage of 
> {{ActorGatewayTaskExecutionStateListener}} and {{TaskExecutionStateListener}}.
> In conclusion, we abort {{TaskExecutionStateListener}} strategy and no any 
> component rely on it. Instead, we introduce {{TaskManagerActions}} to take 
> the role for the communication of {{Task}} with {{TaskManager}}. No one 
> except {{TaskManager}} should directly communicate with {{Task}}. So it can 
> be safely remove legacy class {{TaskExecutionStateListener}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on a change in pull request #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener

2018-09-25 Thread GitBox
TisonKun commented on a change in pull request #6729: [FLINK-10386] 
[taskmanager] Remove legacy class TaskExecutionStateListener
URL: https://github.com/apache/flink/pull/6729#discussion_r220236164
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 ##
 @@ -429,7 +413,7 @@ public void testFailExternallyDuringInvoke() {
awaitLatch.await();
 
task.failExternally(new Exception("test"));
-   assertTrue(task.getExecutionState() == 
ExecutionState.FAILED);
 
 Review comment:
   @Clark Thanks for your review! Please see also my first comment.
   `TaskTest` is a test based on legacy code, would be ported to FLIP-6 
codebase soon as a sub task of FLINK-10392. All changes under `TaskTest` is 
temporary. Once the porting job done and if this PR doesn't get merged, I would 
do a rebase.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-10425) taskmaster.host is not respected

2018-09-25 Thread Andrew Kowpak (JIRA)
Andrew Kowpak created FLINK-10425:
-

 Summary: taskmaster.host is not respected
 Key: FLINK-10425
 URL: https://issues.apache.org/jira/browse/FLINK-10425
 Project: Flink
  Issue Type: Bug
  Components: TaskManager
Affects Versions: 1.6.1
Reporter: Andrew Kowpak


The documentation states that taskmanager.host can be set to override the 
discovered hostname, however, setting this value has no effect.

Looking at the code, the value never seems to be used.  Instead, the deprecated 
taskmanager.hostname is still used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10402) Port AbstractTaskManagerProcessFailureRecoveryTest to new code base

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627489#comment-16627489
 ] 

ASF GitHub Bot commented on FLINK-10402:


StefanRRichter commented on a change in pull request #6750: [FLINK-10402] Port 
AbstractTaskManagerProcessFailureRecoveryTest to new code base
URL: https://github.com/apache/flink/pull/6750#discussion_r220230749
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
 ##
 @@ -173,10 +140,6 @@ public void testTaskManagerProcessFailure() throws 
Exception {
taskManagerProcess2 = new 
ProcessBuilder(command).start();
new 
CommonTestUtils.PipeForwarder(taskManagerProcess2.getErrorStream(), 
processOutput2);
 
-   // we wait for the JobManager to have the two 
TaskManagers available
-   // since some of the CI environments are very hostile, 
we need to give this a lot of time (2 minutes)
-   waitUntilNumTaskManagersAreRegistered(jmActor, 2, 
12);
 
 Review comment:
   We can skip this waiting part now because it is a standalone session 
cluster? Just double checking.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Port AbstractTaskManagerProcessFailureRecoveryTest to new code base
> ---
>
> Key: FLINK-10402
> URL: https://issues.apache.org/jira/browse/FLINK-10402
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Port {{AbstractTaskManagerProcessFailureRecoveryTest}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10402) Port AbstractTaskManagerProcessFailureRecoveryTest to new code base

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627488#comment-16627488
 ] 

ASF GitHub Bot commented on FLINK-10402:


StefanRRichter commented on a change in pull request #6750: [FLINK-10402] Port 
AbstractTaskManagerProcessFailureRecoveryTest to new code base
URL: https://github.com/apache/flink/pull/6750#discussion_r220226695
 
 

 ##
 File path: flink-tests/src/test/resources/log4j-test.properties
 ##
 @@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
 
 Review comment:
   I assume this change was not intentional and should be reverted?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Port AbstractTaskManagerProcessFailureRecoveryTest to new code base
> ---
>
> Key: FLINK-10402
> URL: https://issues.apache.org/jira/browse/FLINK-10402
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Port {{AbstractTaskManagerProcessFailureRecoveryTest}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10402) Port AbstractTaskManagerProcessFailureRecoveryTest to new code base

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627487#comment-16627487
 ] 

ASF GitHub Bot commented on FLINK-10402:


StefanRRichter commented on a change in pull request #6750: [FLINK-10402] Port 
AbstractTaskManagerProcessFailureRecoveryTest to new code base
URL: https://github.com/apache/flink/pull/6750#discussion_r220230036
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ##
 @@ -397,7 +397,7 @@ private Configuration 
generateClusterConfiguration(Configuration configuration)
return resultConfiguration;
}
 
-   private CompletableFuture shutDownAsync(
+   public CompletableFuture shutDownAsync(
 
 Review comment:
   This method should be moved away from the comment section declaring 
"internal methods" in this class. Or even better, instead of changing the 
visibility of methods in this class, couldn't you expose them only through 
`public static` helper methods in a class that is only in the test package like 
`ClusterEntryPointTestUtils.startCluster(ClusterEntryPoint ep)`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Port AbstractTaskManagerProcessFailureRecoveryTest to new code base
> ---
>
> Key: FLINK-10402
> URL: https://issues.apache.org/jira/browse/FLINK-10402
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Port {{AbstractTaskManagerProcessFailureRecoveryTest}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter commented on a change in pull request #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base

2018-09-25 Thread GitBox
StefanRRichter commented on a change in pull request #6750: [FLINK-10402] Port 
AbstractTaskManagerProcessFailureRecoveryTest to new code base
URL: https://github.com/apache/flink/pull/6750#discussion_r220230036
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ##
 @@ -397,7 +397,7 @@ private Configuration 
generateClusterConfiguration(Configuration configuration)
return resultConfiguration;
}
 
-   private CompletableFuture shutDownAsync(
+   public CompletableFuture shutDownAsync(
 
 Review comment:
   This method should be moved away from the comment section declaring 
"internal methods" in this class. Or even better, instead of changing the 
visibility of methods in this class, couldn't you expose them only through 
`public static` helper methods in a class that is only in the test package like 
`ClusterEntryPointTestUtils.startCluster(ClusterEntryPoint ep)`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base

2018-09-25 Thread GitBox
StefanRRichter commented on a change in pull request #6750: [FLINK-10402] Port 
AbstractTaskManagerProcessFailureRecoveryTest to new code base
URL: https://github.com/apache/flink/pull/6750#discussion_r220230749
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
 ##
 @@ -173,10 +140,6 @@ public void testTaskManagerProcessFailure() throws 
Exception {
taskManagerProcess2 = new 
ProcessBuilder(command).start();
new 
CommonTestUtils.PipeForwarder(taskManagerProcess2.getErrorStream(), 
processOutput2);
 
-   // we wait for the JobManager to have the two 
TaskManagers available
-   // since some of the CI environments are very hostile, 
we need to give this a lot of time (2 minutes)
-   waitUntilNumTaskManagersAreRegistered(jmActor, 2, 
12);
 
 Review comment:
   We can skip this waiting part now because it is a standalone session 
cluster? Just double checking.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base

2018-09-25 Thread GitBox
StefanRRichter commented on a change in pull request #6750: [FLINK-10402] Port 
AbstractTaskManagerProcessFailureRecoveryTest to new code base
URL: https://github.com/apache/flink/pull/6750#discussion_r220226695
 
 

 ##
 File path: flink-tests/src/test/resources/log4j-test.properties
 ##
 @@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
 
 Review comment:
   I assume this change was not intentional and should be reverted?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10386) Remove legacy class TaskExecutionStateListener

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627477#comment-16627477
 ] 

ASF GitHub Bot commented on FLINK-10386:


Clark commented on a change in pull request #6729: [FLINK-10386] 
[taskmanager] Remove legacy class TaskExecutionStateListener
URL: https://github.com/apache/flink/pull/6729#discussion_r220221550
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 ##
 @@ -429,7 +413,7 @@ public void testFailExternallyDuringInvoke() {
awaitLatch.await();
 
task.failExternally(new Exception("test"));
-   assertTrue(task.getExecutionState() == 
ExecutionState.FAILED);
 
 Review comment:
   Is it necessary to change here? Seems like it's works the same for enum.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove legacy class TaskExecutionStateListener
> --
>
> Key: FLINK-10386
> URL: https://issues.apache.org/jira/browse/FLINK-10386
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> After a discussion 
> [here|https://github.com/apache/flink/commit/0735b5b935b0c0757943e2d58047afcfb9949560#commitcomment-30584257]
>  with [~trohrm...@apache.org]. I start to analyze the usage of 
> {{ActorGatewayTaskExecutionStateListener}} and {{TaskExecutionStateListener}}.
> In conclusion, we abort {{TaskExecutionStateListener}} strategy and no any 
> component rely on it. Instead, we introduce {{TaskManagerActions}} to take 
> the role for the communication of {{Task}} with {{TaskManager}}. No one 
> except {{TaskManager}} should directly communicate with {{Task}}. So it can 
> be safely remove legacy class {{TaskExecutionStateListener}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10386) Remove legacy class TaskExecutionStateListener

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627478#comment-16627478
 ] 

ASF GitHub Bot commented on FLINK-10386:


Clark commented on a change in pull request #6729: [FLINK-10386] 
[taskmanager] Remove legacy class TaskExecutionStateListener
URL: https://github.com/apache/flink/pull/6729#discussion_r220230614
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 ##
 @@ -846,6 +817,23 @@ public void 
testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable
//  Test Utilities
// 

 
+   private static void waitUntilExecutionState(Task task, ExecutionState 
exceptedState, Deadline deadline) {
+   while (deadline.hasTimeLeft()) {
+   if (exceptedState == task.getExecutionState()) {
+   return;
+   }
+
+   try {
+   
Thread.sleep(Math.min(deadline.timeLeft().toMillis(), 200));
 
 Review comment:
   Just a little reminder, using Thread.sleep() in Junit Test thread sometimes 
may leads to some strange behavior. I am not sure if it is safe here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove legacy class TaskExecutionStateListener
> --
>
> Key: FLINK-10386
> URL: https://issues.apache.org/jira/browse/FLINK-10386
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> After a discussion 
> [here|https://github.com/apache/flink/commit/0735b5b935b0c0757943e2d58047afcfb9949560#commitcomment-30584257]
>  with [~trohrm...@apache.org]. I start to analyze the usage of 
> {{ActorGatewayTaskExecutionStateListener}} and {{TaskExecutionStateListener}}.
> In conclusion, we abort {{TaskExecutionStateListener}} strategy and no any 
> component rely on it. Instead, we introduce {{TaskManagerActions}} to take 
> the role for the communication of {{Task}} with {{TaskManager}}. No one 
> except {{TaskManager}} should directly communicate with {{Task}}. So it can 
> be safely remove legacy class {{TaskExecutionStateListener}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Clarkkkkk commented on a change in pull request #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener

2018-09-25 Thread GitBox
Clark commented on a change in pull request #6729: [FLINK-10386] 
[taskmanager] Remove legacy class TaskExecutionStateListener
URL: https://github.com/apache/flink/pull/6729#discussion_r220230614
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 ##
 @@ -846,6 +817,23 @@ public void 
testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable
//  Test Utilities
// 

 
+   private static void waitUntilExecutionState(Task task, ExecutionState 
exceptedState, Deadline deadline) {
+   while (deadline.hasTimeLeft()) {
+   if (exceptedState == task.getExecutionState()) {
+   return;
+   }
+
+   try {
+   
Thread.sleep(Math.min(deadline.timeLeft().toMillis(), 200));
 
 Review comment:
   Just a little reminder, using Thread.sleep() in Junit Test thread sometimes 
may leads to some strange behavior. I am not sure if it is safe here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] Clarkkkkk commented on a change in pull request #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener

2018-09-25 Thread GitBox
Clark commented on a change in pull request #6729: [FLINK-10386] 
[taskmanager] Remove legacy class TaskExecutionStateListener
URL: https://github.com/apache/flink/pull/6729#discussion_r220221550
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 ##
 @@ -429,7 +413,7 @@ public void testFailExternallyDuringInvoke() {
awaitLatch.await();
 
task.failExternally(new Exception("test"));
-   assertTrue(task.getExecutionState() == 
ExecutionState.FAILED);
 
 Review comment:
   Is it necessary to change here? Seems like it's works the same for enum.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10401) Port ProcessFailureCancelingITCase to new code base

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627467#comment-16627467
 ] 

ASF GitHub Bot commented on FLINK-10401:


StefanRRichter commented on a change in pull request #6749: [FLINK-10401] Port 
ProcessFailureCancelingITCase to new code base
URL: https://github.com/apache/flink/pull/6749#discussion_r220205505
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterComponent.java
 ##
 @@ -130,6 +131,13 @@ public T getDispatcher() {
}
}
 
+   @VisibleForTesting
+   public WebMonitorEndpoint getWebMonitorEndpoint() {
 
 Review comment:
   Coming back to my longer comment on #6749, I think this is only required 
because the collaborator is lazily created in a lifecycle method. I wonder if 
we could not have a building object the will create all collaborators at the 
place where currently `startComponent` is called because `clusterComponent = 
createClusterComponent(configuration)` and 
`clusterComponent.startComponent(...)` are even called in a straight sequence. 
Tests could then simply pass WebMonitorEndpoint into the component.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Port ProcessFailureCancelingITCase to new code base
> ---
>
> Key: FLINK-10401
> URL: https://issues.apache.org/jira/browse/FLINK-10401
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Port {{ProcessFailureCancelingITCase}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter commented on a change in pull request #6749: [FLINK-10401] Port ProcessFailureCancelingITCase to new code base

2018-09-25 Thread GitBox
StefanRRichter commented on a change in pull request #6749: [FLINK-10401] Port 
ProcessFailureCancelingITCase to new code base
URL: https://github.com/apache/flink/pull/6749#discussion_r220205505
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterComponent.java
 ##
 @@ -130,6 +131,13 @@ public T getDispatcher() {
}
}
 
+   @VisibleForTesting
+   public WebMonitorEndpoint getWebMonitorEndpoint() {
 
 Review comment:
   Coming back to my longer comment on #6749, I think this is only required 
because the collaborator is lazily created in a lifecycle method. I wonder if 
we could not have a building object the will create all collaborators at the 
place where currently `startComponent` is called because `clusterComponent = 
createClusterComponent(configuration)` and 
`clusterComponent.startComponent(...)` are even called in a straight sequence. 
Tests could then simply pass WebMonitorEndpoint into the component.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10422) Follow AWS specs in Kinesis Consumer

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627458#comment-16627458
 ] 

ASF GitHub Bot commented on FLINK-10422:


EugeneYushin opened a new pull request #6760: [FLINK-10422] Follow AWS specs in 
Kinesis Consumer
URL: https://github.com/apache/flink/pull/6760
 
 
   ## What is the purpose of the change
   
   Inline Flink Kinesis connector with AWS specs related to shard id 
conventions.
   Related Jira story: https://issues.apache.org/jira/browse/FLINK-10422
   Mailing list conversation: 
https://lists.apache.org/thread.html/96de3bac9761564767cf283b58d664f5ae1b076e0c4431620552af5b@%3Cdev.flink.apache.org%3E
   
   ## Brief change log
 - Remove custom ShardId comparator logic as it's redundant, rely on AWS 
client libs to get shard list with exclusive start shard id
 - Remove test related cleaning in production code
 - Add tests to check correct shards a returned on second run of 
`listShards` method for 2 cases: new shards are present, no new shards
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
 - Run 
`testGetShardListWithNewShardsOnSecondRun/testGetShardWithNoNewShards` unit 
tests in `KinesisProxyTest.class`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Follow AWS specs in Kinesis Consumer 
> -
>
> Key: FLINK-10422
> URL: https://issues.apache.org/jira/browse/FLINK-10422
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.6.1
>Reporter: eugen yushin
>Priority: Major
>  Labels: pull-request-available
>
> *Related conversation in mailing list:*
> [https://lists.apache.org/thread.html/96de3bac9761564767cf283b58d664f5ae1b076e0c4431620552af5b@%3Cdev.flink.apache.org%3E]
> *Summary:*
> Flink Kinesis consumer checks shards id for a particular pattern:
> {noformat}
> "^shardId-\\d{12}"
> {noformat}
> [https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle.java#L132]
> While this inlines with current Kinesis streams server implementation (all 
> streams follows this pattern), it confronts with AWS docs:
>  
> {code:java}
> ShardId
>  The unique identifier of the shard within the stream.
>  Type: String
>  Length Constraints: Minimum length of 1. Maximum length of 128.
> Pattern: [a-zA-Z0-9_.-]+
>  Required: Yes
> {code}
>  
> [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Shard.html]
> *Intention:*
>  We have no guarantees and can't rely on patterns other than provided in AWS 
> manifest.
>  Any custom implementation of Kinesis mock should rely on AWS manifest which 
> claims ShardID to be alfanums. This prevents anyone to use Flink with such 
> kind of mocks.
> The reason behind the scene to use particular pattern "^shardId-d12" is to 
> create Flink's custom Shard comparator, filter already seen shards, and pass 
> latest shard for client.listShards only to limit the scope for RPC call to 
> AWS.
> In the meantime, I think we can get rid of this logic at all. The current 
> usage in project is:
>  - fix Kinesalite bug (I've already opened an issue to cover this:
>  [https://github.com/mhart/kinesalite/issues/76] and opened PR: 
> [https://github.com/mhart/kinesalite/pull/77]). We can move this logic to 
> test code base to keep production code clean for now
>  
> [https://github.com/apache/flink/blob/50d076ab6ad325907690a2c115ee2cb1c45775c9/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java#L464]
>  - adjust last seen shard id. We can simply omit this cause' AWS client won't 
> return already seen shards and we will have new ids only or nothing.
> 

[GitHub] EugeneYushin opened a new pull request #6760: [FLINK-10422] Follow AWS specs in Kinesis Consumer

2018-09-25 Thread GitBox
EugeneYushin opened a new pull request #6760: [FLINK-10422] Follow AWS specs in 
Kinesis Consumer
URL: https://github.com/apache/flink/pull/6760
 
 
   ## What is the purpose of the change
   
   Inline Flink Kinesis connector with AWS specs related to shard id 
conventions.
   Related Jira story: https://issues.apache.org/jira/browse/FLINK-10422
   Mailing list conversation: 
https://lists.apache.org/thread.html/96de3bac9761564767cf283b58d664f5ae1b076e0c4431620552af5b@%3Cdev.flink.apache.org%3E
   
   ## Brief change log
 - Remove custom ShardId comparator logic as it's redundant, rely on AWS 
client libs to get shard list with exclusive start shard id
 - Remove test related cleaning in production code
 - Add tests to check correct shards a returned on second run of 
`listShards` method for 2 cases: new shards are present, no new shards
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
 - Run 
`testGetShardListWithNewShardsOnSecondRun/testGetShardWithNoNewShards` unit 
tests in `KinesisProxyTest.class`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10424) Inconsistency between JsonSchemaConveerter and FlinkTypeFactory

2018-09-25 Thread JIRA


[ 
https://issues.apache.org/jira/browse/FLINK-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627440#comment-16627440
 ] 

Dominik Wosiński commented on FLINK-10424:
--

Two possible ways of solving this issue are possible:
  - allow using _BigInteger_ Type Info in _FlinkTypeFactory_

  _-_ change _JsonSchemaConverter,_ so it returns Integer Type Info instead.

My opinion is that the _FlinkTypeFactory_ should be changed to stay consistent 
since _JsonSchemaConverter_ does return BigDecimal for __the _number_. So 
returning BigInteger for _integer_ is more consistent with the current behavior 
and more natural,

> Inconsistency between JsonSchemaConveerter and FlinkTypeFactory
> ---
>
> Key: FLINK-10424
> URL: https://issues.apache.org/jira/browse/FLINK-10424
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.6.0
>Reporter: Dominik Wosiński
>Assignee: Dominik Wosiński
>Priority: Major
>
> There is still an inconsistency between _JsonSchemaConverter_ and 
> _FlinkTypeFactory_ in case of using JsonSchema with _integer_ type field. 
> _JsonSchemaConverter_ will return BigInteger Type Information for _integer_, 
> but _FlinkTypeFactory_ currently does not support BigInteger Type Info and 
> thus an exception will be thrown. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10424) Inconsistency between JsonSchemaConveerter and FlinkTypeFactory

2018-09-25 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/FLINK-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dominik Wosiński updated FLINK-10424:
-
Description: 
There is still an inconsistency between _JsonSchemaConverter_ and 
_FlinkTypeFactory_ in case of using JsonSchema with _integer_ type field. 
_JsonSchemaConverter_ will return BigInteger Type Information for _integer_, 
but _FlinkTypeFactory_ currently does not support BigInteger Type Info and thus 
an exception will be thrown. 

 

  was:
There is still an inconsistency between _JsonSchemaConverter_ and 
_FlinkTypeFactory_ in case of using JsonSchema with _integer_ type field. 
_JsonSchemaConverter_ will return BigInteger Type Information for _integer_, 
but _FlinkTypeFactory_ currently does not support BigInteger Type Info and thus 
an exception will be thrown. 

Two possible ways of solving this issue are possible:
  - allow using _BigInteger_ Type Info in _FlinkTypeFactory_

  _-_ change _JsonSchemaConverter,_ so it returns Integer Type Info instead.


IMHO, the changes should be made in _FlinkTypeFactory._


> Inconsistency between JsonSchemaConveerter and FlinkTypeFactory
> ---
>
> Key: FLINK-10424
> URL: https://issues.apache.org/jira/browse/FLINK-10424
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.6.0
>Reporter: Dominik Wosiński
>Assignee: Dominik Wosiński
>Priority: Major
>
> There is still an inconsistency between _JsonSchemaConverter_ and 
> _FlinkTypeFactory_ in case of using JsonSchema with _integer_ type field. 
> _JsonSchemaConverter_ will return BigInteger Type Information for _integer_, 
> but _FlinkTypeFactory_ currently does not support BigInteger Type Info and 
> thus an exception will be thrown. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10424) Inconsistency between JsonSchemaConveerter and FlinkTypeFactory

2018-09-25 Thread JIRA
Dominik Wosiński created FLINK-10424:


 Summary: Inconsistency between JsonSchemaConveerter and 
FlinkTypeFactory
 Key: FLINK-10424
 URL: https://issues.apache.org/jira/browse/FLINK-10424
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.6.0
Reporter: Dominik Wosiński
Assignee: Dominik Wosiński


There is still an inconsistency between _JsonSchemaConverter_ and 
_FlinkTypeFactory_ in case of using JsonSchema with _integer_ type field. 
_JsonSchemaConverter_ will return BigInteger Type Information for _integer_, 
but _FlinkTypeFactory_ currently does not support BigInteger Type Info and thus 
an exception will be thrown. 

Two possible ways of solving this issue are possible:
  - allow using _BigInteger_ Type Info in _FlinkTypeFactory_

  _-_ change _JsonSchemaConverter,_ so it returns Integer Type Info instead.


IMHO, the changes should be made in _FlinkTypeFactory._



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10379) Can not use Table Functions in Java Table API

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627432#comment-16627432
 ] 

ASF GitHub Bot commented on FLINK-10379:


hequn8128 commented on issue #6744: [FLINK-10379][docs,table] Fix Table 
Function docs
URL: https://github.com/apache/flink/pull/6744#issuecomment-424360728
 
 
   +1 to merge :-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Can not use Table Functions in Java Table API
> -
>
> Key: FLINK-10379
> URL: https://issues.apache.org/jira/browse/FLINK-10379
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.1
>Reporter: Piotr Nowojski
>Assignee: Hequn Cheng
>Priority: Critical
>  Labels: pull-request-available
>
> As stated in the 
> [documentation|[https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#table-functions],]
>  this is how table functions should be used in Java Table API:
> {code:java}
> // Register the function.
> tableEnv.registerFunction("split", new Split("#"));
> myTable.join("split(a) as (word, length)");
> {code}
> However {{Table.join(String)}} was removed sometime ago and now it is 
> impossible to use Table Functions in Java Table API.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] hequn8128 commented on issue #6744: [FLINK-10379][docs, table] Fix Table Function docs

2018-09-25 Thread GitBox
hequn8128 commented on issue #6744: [FLINK-10379][docs,table] Fix Table 
Function docs
URL: https://github.com/apache/flink/pull/6744#issuecomment-424360728
 
 
   +1 to merge :-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9891) Flink cluster is not shutdown in YARN mode when Flink client is stopped

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627429#comment-16627429
 ] 

ASF GitHub Bot commented on FLINK-9891:
---

GJL edited a comment on issue #6540: [FLINK-9891] Added hook to shutdown 
cluster if a session was created in per-job mode.
URL: https://github.com/apache/flink/pull/6540#issuecomment-424360084
 
 
   Thank you for your contribution to Apache Flink, @packet23. LGTM, merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink cluster is not shutdown in YARN mode when Flink client is stopped
> ---
>
> Key: FLINK-9891
> URL: https://issues.apache.org/jira/browse/FLINK-9891
> Project: Flink
>  Issue Type: Bug
>  Components: Client, YARN
>Affects Versions: 1.5.0, 1.5.1
>Reporter: Sergey Krasovskiy
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> We are not using session mode and detached mode. The command to run Flink job 
> on YARN is:
> {code:java}
> /bin/flink run -m yarn-cluster -yn 1 -yqu flink -yjm 768 -ytm 
> 2048 -j ./flink-quickstart-java-1.0-SNAPSHOT.jar -c org.test.WordCount
> {code}
> Flink CLI logs:
> {code:java}
> Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/hdp/2.4.2.10-1/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2018-07-18 12:47:03,747 INFO 
> org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service 
> address: http://hmaster-1.ipbl.rgcloud.net:8188/ws/v1/timeline/
> 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-07-18 12:47:04,248 WARN 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Neither the 
> HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink 
> YARN Client needs one of these to be set to properly load the Hadoop 
> configuration for accessing YARN.
> 2018-07-18 12:47:04,409 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: 
> ClusterSpecification{masterMemoryMB=768, taskManagerMemoryMB=2048, 
> numberTaskManagers=1, slotsPerTaskManager=1}
> 2018-07-18 12:47:04,783 WARN 
> org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit 
> local reads feature cannot be used because libhadoop cannot be loaded.
> 2018-07-18 12:47:04,788 WARN 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration 
> directory 
> ('/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/conf')
>  contains both LOG4J and Logback configuration files. Please delete or rename 
> one of them.
> 2018-07-18 12:47:07,846 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application 
> master application_1531474158783_10814
> 2018-07-18 12:47:08,073 INFO 
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application 
> application_1531474158783_10814
> 2018-07-18 12:47:08,074 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster 
> to be allocated
> 2018-07-18 12:47:08,076 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, 
> current state ACCEPTED
> 2018-07-18 12:47:12,864 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has 
> been deployed successfully.
> {code}
> Job Manager logs:
> {code:java}
> 2018-07-18 12:47:09,913 INFO 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 
> 
> 2018-07-18 12:47:09,915 INFO 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Starting 
> YarnSessionClusterEntrypoint (Version: 

[jira] [Commented] (FLINK-9891) Flink cluster is not shutdown in YARN mode when Flink client is stopped

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627431#comment-16627431
 ] 

ASF GitHub Bot commented on FLINK-9891:
---

GJL commented on issue #6718: [FLINK-9891] Add optional hook to shutdown 
cluster if a session was created in per-job mode in attached mode
URL: https://github.com/apache/flink/pull/6718#issuecomment-424360423
 
 
   Thank you for your contribution to Apache Flink, @azagrebin. LGTM, merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink cluster is not shutdown in YARN mode when Flink client is stopped
> ---
>
> Key: FLINK-9891
> URL: https://issues.apache.org/jira/browse/FLINK-9891
> Project: Flink
>  Issue Type: Bug
>  Components: Client, YARN
>Affects Versions: 1.5.0, 1.5.1
>Reporter: Sergey Krasovskiy
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> We are not using session mode and detached mode. The command to run Flink job 
> on YARN is:
> {code:java}
> /bin/flink run -m yarn-cluster -yn 1 -yqu flink -yjm 768 -ytm 
> 2048 -j ./flink-quickstart-java-1.0-SNAPSHOT.jar -c org.test.WordCount
> {code}
> Flink CLI logs:
> {code:java}
> Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/hdp/2.4.2.10-1/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2018-07-18 12:47:03,747 INFO 
> org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service 
> address: http://hmaster-1.ipbl.rgcloud.net:8188/ws/v1/timeline/
> 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-07-18 12:47:04,248 WARN 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Neither the 
> HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink 
> YARN Client needs one of these to be set to properly load the Hadoop 
> configuration for accessing YARN.
> 2018-07-18 12:47:04,409 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: 
> ClusterSpecification{masterMemoryMB=768, taskManagerMemoryMB=2048, 
> numberTaskManagers=1, slotsPerTaskManager=1}
> 2018-07-18 12:47:04,783 WARN 
> org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit 
> local reads feature cannot be used because libhadoop cannot be loaded.
> 2018-07-18 12:47:04,788 WARN 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration 
> directory 
> ('/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/conf')
>  contains both LOG4J and Logback configuration files. Please delete or rename 
> one of them.
> 2018-07-18 12:47:07,846 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application 
> master application_1531474158783_10814
> 2018-07-18 12:47:08,073 INFO 
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application 
> application_1531474158783_10814
> 2018-07-18 12:47:08,074 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster 
> to be allocated
> 2018-07-18 12:47:08,076 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, 
> current state ACCEPTED
> 2018-07-18 12:47:12,864 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has 
> been deployed successfully.
> {code}
> Job Manager logs:
> {code:java}
> 2018-07-18 12:47:09,913 INFO 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 
> 
> 2018-07-18 12:47:09,915 INFO 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Starting 
> 

[GitHub] GJL commented on issue #6718: [FLINK-9891] Add optional hook to shutdown cluster if a session was created in per-job mode in attached mode

2018-09-25 Thread GitBox
GJL commented on issue #6718: [FLINK-9891] Add optional hook to shutdown 
cluster if a session was created in per-job mode in attached mode
URL: https://github.com/apache/flink/pull/6718#issuecomment-424360423
 
 
   Thank you for your contribution to Apache Flink, @azagrebin. LGTM, merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL edited a comment on issue #6540: [FLINK-9891] Added hook to shutdown cluster if a session was created in per-job mode.

2018-09-25 Thread GitBox
GJL edited a comment on issue #6540: [FLINK-9891] Added hook to shutdown 
cluster if a session was created in per-job mode.
URL: https://github.com/apache/flink/pull/6540#issuecomment-424360084
 
 
   Thank you for your contribution to Apache Flink, @packet23. LGTM, merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9891) Flink cluster is not shutdown in YARN mode when Flink client is stopped

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627428#comment-16627428
 ] 

ASF GitHub Bot commented on FLINK-9891:
---

GJL commented on issue #6540: [FLINK-9891] Added hook to shutdown cluster if a 
session was created in per-job mode.
URL: https://github.com/apache/flink/pull/6540#issuecomment-424360084
 
 
   LGTM, merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink cluster is not shutdown in YARN mode when Flink client is stopped
> ---
>
> Key: FLINK-9891
> URL: https://issues.apache.org/jira/browse/FLINK-9891
> Project: Flink
>  Issue Type: Bug
>  Components: Client, YARN
>Affects Versions: 1.5.0, 1.5.1
>Reporter: Sergey Krasovskiy
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> We are not using session mode and detached mode. The command to run Flink job 
> on YARN is:
> {code:java}
> /bin/flink run -m yarn-cluster -yn 1 -yqu flink -yjm 768 -ytm 
> 2048 -j ./flink-quickstart-java-1.0-SNAPSHOT.jar -c org.test.WordCount
> {code}
> Flink CLI logs:
> {code:java}
> Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/hdp/2.4.2.10-1/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2018-07-18 12:47:03,747 INFO 
> org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service 
> address: http://hmaster-1.ipbl.rgcloud.net:8188/ws/v1/timeline/
> 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-07-18 12:47:04,248 WARN 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Neither the 
> HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink 
> YARN Client needs one of these to be set to properly load the Hadoop 
> configuration for accessing YARN.
> 2018-07-18 12:47:04,409 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: 
> ClusterSpecification{masterMemoryMB=768, taskManagerMemoryMB=2048, 
> numberTaskManagers=1, slotsPerTaskManager=1}
> 2018-07-18 12:47:04,783 WARN 
> org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit 
> local reads feature cannot be used because libhadoop cannot be loaded.
> 2018-07-18 12:47:04,788 WARN 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration 
> directory 
> ('/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/conf')
>  contains both LOG4J and Logback configuration files. Please delete or rename 
> one of them.
> 2018-07-18 12:47:07,846 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application 
> master application_1531474158783_10814
> 2018-07-18 12:47:08,073 INFO 
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application 
> application_1531474158783_10814
> 2018-07-18 12:47:08,074 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster 
> to be allocated
> 2018-07-18 12:47:08,076 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, 
> current state ACCEPTED
> 2018-07-18 12:47:12,864 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has 
> been deployed successfully.
> {code}
> Job Manager logs:
> {code:java}
> 2018-07-18 12:47:09,913 INFO 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 
> 
> 2018-07-18 12:47:09,915 INFO 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Starting 
> YarnSessionClusterEntrypoint (Version: 1.5.1, Rev:3488f8b, Date:10.07.2018 @ 
> 11:51:27 GMT)
> ...
> 

[GitHub] GJL commented on issue #6540: [FLINK-9891] Added hook to shutdown cluster if a session was created in per-job mode.

2018-09-25 Thread GitBox
GJL commented on issue #6540: [FLINK-9891] Added hook to shutdown cluster if a 
session was created in per-job mode.
URL: https://github.com/apache/flink/pull/6540#issuecomment-424360084
 
 
   LGTM, merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-10423) Forward RocksDB memory metrics to Flink metrics reporter

2018-09-25 Thread Seth Wiesman (JIRA)
Seth Wiesman created FLINK-10423:


 Summary: Forward RocksDB memory metrics to Flink metrics reporter 
 Key: FLINK-10423
 URL: https://issues.apache.org/jira/browse/FLINK-10423
 Project: Flink
  Issue Type: New Feature
  Components: Metrics, State Backends, Checkpointing
Reporter: Seth Wiesman
Assignee: Seth Wiesman


RocksDB contains a number of metrics at the column family level about current 
memory usage, open memtables,  etc that would be useful to users wishing 
greater insight what rocksdb is doing. This work is inspired heavily by the 
comments on this rocksdb issue thread 
(https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10379) Can not use Table Functions in Java Table API

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627403#comment-16627403
 ] 

ASF GitHub Bot commented on FLINK-10379:


pnowojski commented on issue #6744: [FLINK-10379][docs,table] Fix Table 
Function docs
URL: https://github.com/apache/flink/pull/6744#issuecomment-424352562
 
 
   Thanks for the review @hequn8128 and spotting the place that I've missed :) 
I've fixed it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Can not use Table Functions in Java Table API
> -
>
> Key: FLINK-10379
> URL: https://issues.apache.org/jira/browse/FLINK-10379
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.1
>Reporter: Piotr Nowojski
>Assignee: Hequn Cheng
>Priority: Critical
>  Labels: pull-request-available
>
> As stated in the 
> [documentation|[https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#table-functions],]
>  this is how table functions should be used in Java Table API:
> {code:java}
> // Register the function.
> tableEnv.registerFunction("split", new Split("#"));
> myTable.join("split(a) as (word, length)");
> {code}
> However {{Table.join(String)}} was removed sometime ago and now it is 
> impossible to use Table Functions in Java Table API.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on issue #6744: [FLINK-10379][docs, table] Fix Table Function docs

2018-09-25 Thread GitBox
pnowojski commented on issue #6744: [FLINK-10379][docs,table] Fix Table 
Function docs
URL: https://github.com/apache/flink/pull/6744#issuecomment-424352562
 
 
   Thanks for the review @hequn8128 and spotting the place that I've missed :) 
I've fixed it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10411) Make ClusterEntrypoint more modular

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627399#comment-16627399
 ] 

ASF GitHub Bot commented on FLINK-10411:


StefanRRichter commented on a change in pull request #6743: [FLINK-10411] Make 
ClusterEntrypoint more compositional
URL: https://github.com/apache/flink/pull/6743#discussion_r220200492
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ##
 @@ -141,11 +144,11 @@ protected ClusterEntrypoint(Configuration configuration) 
{
shutDownHook = 
ShutdownHookUtil.addShutdownHook(this::cleanupDirectories, 
getClass().getSimpleName(), LOG);
}
 
-   public CompletableFuture getTerminationFuture() {
+   public CompletableFuture getTerminationFuture() {
return terminationFuture;
}
 
-   protected void startCluster() {
+   protected void startCluster() throws ClusterEntrypointException {
 
 Review comment:
   Could currently be `private`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make ClusterEntrypoint more modular
> ---
>
> Key: FLINK-10411
> URL: https://issues.apache.org/jira/browse/FLINK-10411
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently, the {{ClusterEntrypoint}} is not very modular in the sense that it 
> cannot be really used for testing purposes (e.g. starting a {{Dispatcher}} 
> with a {{WebMonitorRestEndpoint}}). The problem is that the 
> {{ClusterEntrypoint}} combines too many responsibilities (creating the 
> cluster services, starting the cluster components and deciding on when to 
> terminate the JVM process).
> I suggest to make the structure more compositional, meaning to split up the 
> service generation from the cluster component start up. That way we could 
> also remove code duplication between the different {{ClusterEntrypoint}} 
> implementations.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter commented on a change in pull request #6743: [FLINK-10411] Make ClusterEntrypoint more compositional

2018-09-25 Thread GitBox
StefanRRichter commented on a change in pull request #6743: [FLINK-10411] Make 
ClusterEntrypoint more compositional
URL: https://github.com/apache/flink/pull/6743#discussion_r220200492
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ##
 @@ -141,11 +144,11 @@ protected ClusterEntrypoint(Configuration configuration) 
{
shutDownHook = 
ShutdownHookUtil.addShutdownHook(this::cleanupDirectories, 
getClass().getSimpleName(), LOG);
}
 
-   public CompletableFuture getTerminationFuture() {
+   public CompletableFuture getTerminationFuture() {
return terminationFuture;
}
 
-   protected void startCluster() {
+   protected void startCluster() throws ClusterEntrypointException {
 
 Review comment:
   Could currently be `private`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10411) Make ClusterEntrypoint more modular

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627394#comment-16627394
 ] 

ASF GitHub Bot commented on FLINK-10411:


StefanRRichter commented on a change in pull request #6743: [FLINK-10411] Make 
ClusterEntrypoint more compositional
URL: https://github.com/apache/flink/pull/6743#discussion_r220124106
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ##
 @@ -163,9 +135,6 @@
@GuardedBy("lock")
private ClusterInformation clusterInformation;
 
 Review comment:
   I think this field could become just a local variable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make ClusterEntrypoint more modular
> ---
>
> Key: FLINK-10411
> URL: https://issues.apache.org/jira/browse/FLINK-10411
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently, the {{ClusterEntrypoint}} is not very modular in the sense that it 
> cannot be really used for testing purposes (e.g. starting a {{Dispatcher}} 
> with a {{WebMonitorRestEndpoint}}). The problem is that the 
> {{ClusterEntrypoint}} combines too many responsibilities (creating the 
> cluster services, starting the cluster components and deciding on when to 
> terminate the JVM process).
> I suggest to make the structure more compositional, meaning to split up the 
> service generation from the cluster component start up. That way we could 
> also remove code duplication between the different {{ClusterEntrypoint}} 
> implementations.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter commented on a change in pull request #6743: [FLINK-10411] Make ClusterEntrypoint more compositional

2018-09-25 Thread GitBox
StefanRRichter commented on a change in pull request #6743: [FLINK-10411] Make 
ClusterEntrypoint more compositional
URL: https://github.com/apache/flink/pull/6743#discussion_r220124106
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ##
 @@ -163,9 +135,6 @@
@GuardedBy("lock")
private ClusterInformation clusterInformation;
 
 Review comment:
   I think this field could become just a local variable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10411) Make ClusterEntrypoint more modular

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627390#comment-16627390
 ] 

ASF GitHub Bot commented on FLINK-10411:


StefanRRichter commented on a change in pull request #6743: [FLINK-10411] Make 
ClusterEntrypoint more compositional
URL: https://github.com/apache/flink/pull/6743#discussion_r220197869
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ##
 @@ -518,6 +498,34 @@ protected static Configuration 
loadConfiguration(EntrypointClusterConfiguration
return configuration;
}
 
+   // --
+   // Helper methods
+   // --
+
+   public static void runClusterEntrypoint(ClusterEntrypoint 
clusterEntrypoint) {
+
+   final String clusterEntrypointName = 
clusterEntrypoint.getClass().getSimpleName();
+   try {
+   clusterEntrypoint.startCluster();
+   } catch (ClusterEntrypointException e) {
+   LOG.error(String.format("Could not start cluster 
entrypoint %s.", clusterEntrypointName, e));
 
 Review comment:
   `LOG.error(String.format("Could not start cluster entrypoint %s.", 
clusterEntrypointName), e);`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make ClusterEntrypoint more modular
> ---
>
> Key: FLINK-10411
> URL: https://issues.apache.org/jira/browse/FLINK-10411
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently, the {{ClusterEntrypoint}} is not very modular in the sense that it 
> cannot be really used for testing purposes (e.g. starting a {{Dispatcher}} 
> with a {{WebMonitorRestEndpoint}}). The problem is that the 
> {{ClusterEntrypoint}} combines too many responsibilities (creating the 
> cluster services, starting the cluster components and deciding on when to 
> terminate the JVM process).
> I suggest to make the structure more compositional, meaning to split up the 
> service generation from the cluster component start up. That way we could 
> also remove code duplication between the different {{ClusterEntrypoint}} 
> implementations.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter commented on a change in pull request #6743: [FLINK-10411] Make ClusterEntrypoint more compositional

2018-09-25 Thread GitBox
StefanRRichter commented on a change in pull request #6743: [FLINK-10411] Make 
ClusterEntrypoint more compositional
URL: https://github.com/apache/flink/pull/6743#discussion_r220197869
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ##
 @@ -518,6 +498,34 @@ protected static Configuration 
loadConfiguration(EntrypointClusterConfiguration
return configuration;
}
 
+   // --
+   // Helper methods
+   // --
+
+   public static void runClusterEntrypoint(ClusterEntrypoint 
clusterEntrypoint) {
+
+   final String clusterEntrypointName = 
clusterEntrypoint.getClass().getSimpleName();
+   try {
+   clusterEntrypoint.startCluster();
+   } catch (ClusterEntrypointException e) {
+   LOG.error(String.format("Could not start cluster 
entrypoint %s.", clusterEntrypointName, e));
 
 Review comment:
   `LOG.error(String.format("Could not start cluster entrypoint %s.", 
clusterEntrypointName), e);`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10411) Make ClusterEntrypoint more modular

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627385#comment-16627385
 ] 

ASF GitHub Bot commented on FLINK-10411:


StefanRRichter commented on a change in pull request #6743: [FLINK-10411] Make 
ClusterEntrypoint more compositional
URL: https://github.com/apache/flink/pull/6743#discussion_r220128418
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterComponent.java
 ##
 @@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherFactory;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.MetricUtils;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.rest.RestEndpointFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Component which starts a {@link Dispatcher}, {@link ResourceManager} and 
{@link WebMonitorEndpoint}
+ * in the same process.
+ */
+public class ClusterComponent implements 
AutoCloseableAsync {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ClusterComponent.class);
+
+   private final Object lock = new Object();
+
+   private final DispatcherFactory dispatcherFactory;
+
+   private final ResourceManagerFactory resourceManagerFactory;
+
+   private final RestEndpointFactory restEndpointFactory;
+
+   private final CompletableFuture terminationFuture;
+
+   private final CompletableFuture shutDownFuture;
+
+   @GuardedBy("lock")
+   private State state;
+
+   @GuardedBy("lock")
+   private ResourceManager resourceManager;
+
+   @GuardedBy("lock")
+   private T dispatcher;
+
+   @GuardedBy("lock")
+   private LeaderRetrievalService dispatcherLeaderRetrievalService;
+
+   @GuardedBy("lock")
+   private LeaderRetrievalService resourceManagerRetrievalService;
+
+   

[jira] [Commented] (FLINK-10411) Make ClusterEntrypoint more modular

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627389#comment-16627389
 ] 

ASF GitHub Bot commented on FLINK-10411:


StefanRRichter commented on a change in pull request #6743: [FLINK-10411] Make 
ClusterEntrypoint more compositional
URL: https://github.com/apache/flink/pull/6743#discussion_r220194199
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterComponent.java
 ##
 @@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherFactory;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.MetricUtils;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.rest.RestEndpointFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Component which starts a {@link Dispatcher}, {@link ResourceManager} and 
{@link WebMonitorEndpoint}
+ * in the same process.
+ */
+public class ClusterComponent implements 
AutoCloseableAsync {
 
 Review comment:
   As somebody looking into this code for the fist time, I found that the name 
`ClusterComponent` for this class was not really helpful to understand what it 
does or what it is good for. Maybe there is a name that makes this a it clearer?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make ClusterEntrypoint more modular
> ---
>
> Key: FLINK-10411
> URL: https://issues.apache.org/jira/browse/FLINK-10411
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed 

[jira] [Commented] (FLINK-10411) Make ClusterEntrypoint more modular

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627384#comment-16627384
 ] 

ASF GitHub Bot commented on FLINK-10411:


StefanRRichter commented on a change in pull request #6743: [FLINK-10411] Make 
ClusterEntrypoint more compositional
URL: https://github.com/apache/flink/pull/6743#discussion_r220156056
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
 ##
 @@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.yarn.YarnResourceManager;
+import org.apache.flink.yarn.YarnWorkerNode;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link ResourceManagerFactory} implementation which creates a {@link 
YarnResourceManager}.
+ */
+public enum YarnResourceManagerFactory implements 
ResourceManagerFactory {
+   INSTANCE;
+
+   @Override
+   public ResourceManager 
createResourceManager(Configuration configuration, ResourceID resourceId, 
RpcService rpcService, HighAvailabilityServices highAvailabilityServices, 
HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, 
FatalErrorHandler fatalErrorHandler, ClusterInformation clusterInformation, 
@Nullable String webInterfaceUrl) throws Exception {
 
 Review comment:
   newlines.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make ClusterEntrypoint more modular
> ---
>
> Key: FLINK-10411
> URL: https://issues.apache.org/jira/browse/FLINK-10411
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently, the {{ClusterEntrypoint}} is not very modular in the sense that it 
> cannot be really used for testing purposes (e.g. starting a {{Dispatcher}} 
> with a {{WebMonitorRestEndpoint}}). The problem is that the 
> {{ClusterEntrypoint}} combines too many responsibilities (creating the 
> cluster services, starting the cluster components and deciding on when to 
> terminate the JVM process).
> I suggest to make the structure more compositional, meaning to split up the 
> service generation from the cluster component start up. That way we could 
> also remove code duplication between the different {{ClusterEntrypoint}} 
> implementations.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10411) Make ClusterEntrypoint more modular

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627386#comment-16627386
 ] 

ASF GitHub Bot commented on FLINK-10411:


StefanRRichter commented on a change in pull request #6743: [FLINK-10411] Make 
ClusterEntrypoint more compositional
URL: https://github.com/apache/flink/pull/6743#discussion_r220127856
 
 

 ##
 File path: 
flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
 ##
 @@ -179,7 +111,15 @@ protected JobGraph retrieveJobGraph(Configuration 
configuration) throws FlinkExc
}
 
@Override
-   protected void 
registerShutdownActions(CompletableFuture terminationFuture) 
{}
+   protected ClusterComponent createClusterComponent(Configuration 
configuration) {
+   return new JobClusterComponent(
+   new MesosResourceManagerFactory(
+   mesosServices,
+   schedulerConfiguration,
+   taskManagerParameters,
+   taskManagerContainerSpec),
+   new 
FileJobGraphRetriever(configuration.getString(JOB_GRAPH_FILE_PATH, 
"job.graph")));
 
 Review comment:
   We could use a helper method that does construct `FileJobGraphRetriever` 
from `configuration` because this is duplicated. At least `"job.graph"` could 
become a string constant somewhere.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make ClusterEntrypoint more modular
> ---
>
> Key: FLINK-10411
> URL: https://issues.apache.org/jira/browse/FLINK-10411
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently, the {{ClusterEntrypoint}} is not very modular in the sense that it 
> cannot be really used for testing purposes (e.g. starting a {{Dispatcher}} 
> with a {{WebMonitorRestEndpoint}}). The problem is that the 
> {{ClusterEntrypoint}} combines too many responsibilities (creating the 
> cluster services, starting the cluster components and deciding on when to 
> terminate the JVM process).
> I suggest to make the structure more compositional, meaning to split up the 
> service generation from the cluster component start up. That way we could 
> also remove code duplication between the different {{ClusterEntrypoint}} 
> implementations.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10411) Make ClusterEntrypoint more modular

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627382#comment-16627382
 ] 

ASF GitHub Bot commented on FLINK-10411:


StefanRRichter commented on a change in pull request #6743: [FLINK-10411] Make 
ClusterEntrypoint more compositional
URL: https://github.com/apache/flink/pull/6743#discussion_r220124106
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ##
 @@ -163,9 +135,6 @@
@GuardedBy("lock")
private ClusterInformation clusterInformation;
 
 Review comment:
   I think this could be a local variable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make ClusterEntrypoint more modular
> ---
>
> Key: FLINK-10411
> URL: https://issues.apache.org/jira/browse/FLINK-10411
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently, the {{ClusterEntrypoint}} is not very modular in the sense that it 
> cannot be really used for testing purposes (e.g. starting a {{Dispatcher}} 
> with a {{WebMonitorRestEndpoint}}). The problem is that the 
> {{ClusterEntrypoint}} combines too many responsibilities (creating the 
> cluster services, starting the cluster components and deciding on when to 
> terminate the JVM process).
> I suggest to make the structure more compositional, meaning to split up the 
> service generation from the cluster component start up. That way we could 
> also remove code duplication between the different {{ClusterEntrypoint}} 
> implementations.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   >