[jira] [Commented] (FLINK-7934) Upgrade Calcite dependency to 1.15

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16340668#comment-16340668
 ] 

ASF GitHub Bot commented on FLINK-7934:
---

Github user suez1224 commented on the issue:

https://github.com/apache/flink/pull/5355
  
@bowenli86 The following two features are related to this PR. 
[CALCITE-2016] Make item + dot operators work for array (e.g. SELECT 
orders[5].color FROM t (Shuyi Chen)
[CALCITE-1867] Allow user-defined grouped window functions (Timo Walther)

Also, Calcite 1.15 added built-in support for simple DDL statements like 
CREATE and DROP.


> Upgrade Calcite dependency to 1.15
> --
>
> Key: FLINK-7934
> URL: https://issues.apache.org/jira/browse/FLINK-7934
> Project: Flink
>  Issue Type: Task
>  Components: Table API  SQL
>Reporter: Rong Rong
>Assignee: Shuyi Chen
>Priority: Major
>
> Umbrella issue for all related issues for Apache Calcite 1.15 release.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5355: [FLINK-7934][Table & SQL API] Upgrade Flink to use Calcit...

2018-01-25 Thread suez1224
Github user suez1224 commented on the issue:

https://github.com/apache/flink/pull/5355
  
@bowenli86 The following two features are related to this PR. 
[CALCITE-2016] Make item + dot operators work for array (e.g. SELECT 
orders[5].color FROM t (Shuyi Chen)
[CALCITE-1867] Allow user-defined grouped window functions (Timo Walther)

Also, Calcite 1.15 added built-in support for simple DDL statements like 
CREATE and DROP.


---


[jira] [Commented] (FLINK-8514) move flink-connector-wikiedits to Apache Bahir

2018-01-25 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16340667#comment-16340667
 ] 

Ufuk Celebi commented on FLINK-8514:


+1

 

> move flink-connector-wikiedits to Apache Bahir
> --
>
> Key: FLINK-8514
> URL: https://issues.apache.org/jira/browse/FLINK-8514
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>
> I propose moving flink-connector-wikiedits to Apache Bahir given its low 
> popularity. We can probably email the community to see if anyone actually 
> still uses it



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7923) SQL parser exception when accessing subfields of a Composite element in an Object Array type column

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16340658#comment-16340658
 ] 

ASF GitHub Bot commented on FLINK-7923:
---

GitHub user suez1224 opened a pull request:

https://github.com/apache/flink/pull/5367

[FLINK-7923][Table API & SQL] Support field access of composite array 
element in SQL

Note: This is based on FLINK-7934, will rebase once FLINK-7934 is resolved.

## What is the purpose of the change

Support field access of composite array element in SQL. 


## Brief change log

  - add handling to calcite dot operator to support field access of 
composite array element in SQL
  - add unittests to verify that it works for tuple array, row array, pojo 
array and case class array



## Verifying this change

This change added tests and can be verified as follows:

  - *Added unittests to verify the query plan*
  - *Added integration tests for batch/streaming for pojo/case 
class/tuple/row type*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/suez1224/flink flink-7923

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5367.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5367


commit 9e915e4144703582843b0f31bffc1481648d0119
Author: Shuyi Chen 
Date:   2018-01-10T00:52:56Z

Upgrade to Calcite 1.15

commit 7a8328e4750ae95196f0b8ba20c6dff22c59ec08
Author: Shuyi Chen 
Date:   2018-01-25T23:36:36Z

Support access of subfields of Array element if the element is a composite 
type (e.g. case class, pojo, tuple or row).




> SQL parser exception when accessing subfields of a Composite element in an 
> Object Array type column
> ---
>
> Key: FLINK-7923
> URL: https://issues.apache.org/jira/browse/FLINK-7923
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: Rong Rong
>Assignee: Shuyi Chen
>Priority: Major
>
> Access type such as:
> {code:SQL}
> SELECT 
>   a[1].f0 
> FROM 
>   MyTable
> {code}
> will cause problem. 
> See following test sample for more details:
> https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

2018-01-25 Thread suez1224
GitHub user suez1224 opened a pull request:

https://github.com/apache/flink/pull/5367

[FLINK-7923][Table API & SQL] Support field access of composite array 
element in SQL

Note: This is based on FLINK-7934, will rebase once FLINK-7934 is resolved.

## What is the purpose of the change

Support field access of composite array element in SQL. 


## Brief change log

  - add handling to calcite dot operator to support field access of 
composite array element in SQL
  - add unittests to verify that it works for tuple array, row array, pojo 
array and case class array



## Verifying this change

This change added tests and can be verified as follows:

  - *Added unittests to verify the query plan*
  - *Added integration tests for batch/streaming for pojo/case 
class/tuple/row type*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/suez1224/flink flink-7923

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5367.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5367


commit 9e915e4144703582843b0f31bffc1481648d0119
Author: Shuyi Chen 
Date:   2018-01-10T00:52:56Z

Upgrade to Calcite 1.15

commit 7a8328e4750ae95196f0b8ba20c6dff22c59ec08
Author: Shuyi Chen 
Date:   2018-01-25T23:36:36Z

Support access of subfields of Array element if the element is a composite 
type (e.g. case class, pojo, tuple or row).




---


[jira] [Commented] (FLINK-6623) BlobCacheSuccessTest fails on Windows

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16340637#comment-16340637
 ] 

ASF GitHub Bot commented on FLINK-6623:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5351
  
Cool, do you want to commit both changes (this PR and the referenced 
branch) together?


> BlobCacheSuccessTest fails on Windows
> -
>
> Key: FLINK-6623
> URL: https://issues.apache.org/jira/browse/FLINK-6623
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.4.0, 1.5.0
> Environment: windows 10, java 1.8
>Reporter: constantine stanley
>Assignee: Chesnay Schepler
>Priority: Major
>
> All tests in {{BlobCacheSuccessTest}} fail on Windows.
> {code}
> java.nio.file.FileSystemException: 
> C:\Users\Zento\AppData\Local\Temp\junit1683251984771313204\junit8061309906182960047\blobStore-04a01d13-96c8-4c0d-b209-5ea0bf5e534d\incoming\temp-
>  -> 
> C:\Users\Zento\AppData\Local\Temp\junit1683251984771313204\junit8061309906182960047\blobStore-04a01d13-96c8-4c0d-b209-5ea0bf5e534d\job_a8fef824a8e43a546dfa05d0c8b73e57\blob_p-0ae4f711ef5d6e9d26c611fd2c8c8ac45ecbf9e7-cd525d0173571dc24f4c0723130892af:
>  The process cannot access the file because it is being used by another 
> process.
>   at 
> sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
>   at 
> sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
>   at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387)
>   at 
> sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
>   at java.nio.file.Files.move(Files.java:1395)
>   at 
> org.apache.flink.runtime.blob.BlobUtils.moveTempFileToStore(BlobUtils.java:464)
>   at 
> org.apache.flink.runtime.blob.BlobServer.moveTempFileToStore(BlobServer.java:708)
>   at 
> org.apache.flink.runtime.blob.BlobServer.putBuffer(BlobServer.java:608)
>   at 
> org.apache.flink.runtime.blob.BlobServer.putPermanent(BlobServer.java:568)
>   at 
> org.apache.flink.runtime.blob.BlobServerPutTest.put(BlobServerPutTest.java:778)
>   at 
> org.apache.flink.runtime.blob.BlobCacheSuccessTest.uploadFileGetTest(BlobCacheSuccessTest.java:173)
>   at 
> org.apache.flink.runtime.blob.BlobCacheSuccessTest.testBlobForJobCacheHa(BlobCacheSuccessTest.java:90)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5351: [FLINK-6623][Blob] BlobServer#putBuffer moves file after ...

2018-01-25 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5351
  
Cool, do you want to commit both changes (this PR and the referenced 
branch) together?


---


[GitHub] flink issue #5338: [FLINK-8476][config][HA] Deprecate HA config constants

2018-01-25 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5338
  
Thanks, good change, +1


---


[jira] [Commented] (FLINK-8476) ConfigConstants#DEFAULT_HA_JOB_MANAGER_PORT unused

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16340631#comment-16340631
 ] 

ASF GitHub Bot commented on FLINK-8476:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5338
  
Thanks, good change, +1


> ConfigConstants#DEFAULT_HA_JOB_MANAGER_PORT unused
> --
>
> Key: FLINK-8476
> URL: https://issues.apache.org/jira/browse/FLINK-8476
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.5.0
>
>
> {{ConfigConstants#DEFAULT_HA_JOB_MANAGER_PORT}} is unused and should probably 
> be deprecated.
> [~till.rohrmann]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5366: [hotfix] improve javadoc and logging of RocksDBKey...

2018-01-25 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5366

[hotfix] improve javadoc and logging of RocksDBKeyedStateBackend

## What is the purpose of the change

General improvements on javadoc and logging of RocksDBKeyedStateBackend

## Brief change log

- updated and fixed a few javadoc errors
- improved logging

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

none

## Documentation

none

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink hotfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5366.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5366


commit 649da439c9155f780021f7db3637e10823ec5b21
Author: Bowen Li 
Date:   2018-01-26T04:59:38Z

[hotfix] improve javadoc and logging of RocksDBKeyedStateBackend




---


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-01-25 Thread Thomas Weise (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16340564#comment-16340564
 ] 

Thomas Weise commented on FLINK-8516:
-

Relevant piece of code:

[https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L594]
{code:java}
public static boolean isThisSubtaskShouldSubscribeTo(StreamShardHandle shard,
int totalNumberOfConsumerSubtasks,
int indexOfThisConsumerSubtask) {
return (Math.abs(shard.hashCode() % totalNumberOfConsumerSubtasks)) == 
indexOfThisConsumerSubtask;
}{code}
 

> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8364) Add iterator() to ListState which returns empty iterator when it has no value

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16340559#comment-16340559
 ] 

ASF GitHub Bot commented on FLINK-8364:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5356
  
cc @fhueske  @aljoscha 


> Add iterator() to ListState which returns empty iterator when it has no value
> -
>
> Key: FLINK-8364
> URL: https://issues.apache.org/jira/browse/FLINK-8364
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Add iterator() to ListState which returns empty iterator when it has no value



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5356: [FLINK-8364][state backend] Add iterator() to ListState w...

2018-01-25 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5356
  
cc @fhueske  @aljoscha 


---


[jira] [Commented] (FLINK-8515) update RocksDBMapState to replace deprecated remove() with delete()

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16340557#comment-16340557
 ] 

ASF GitHub Bot commented on FLINK-8515:
---

GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5365

[FLINK-8515] update RocksDBMapState to replace deprecated remove() with 
delete()

## What is the purpose of the change

RocksDBMapState is currently using `rocksdb#remove()` which is deprecated. 
Should be replaced with `rocksdb#delete()`

## Brief change log

update RocksDBMapState to replace deprecated remove() with delete()

## Verifying this change

This change is already covered by existing tests, such as 
`StateBackendTestBase#testMapState()`

## Does this pull request potentially affect one of the following parts:

none

## Documentation

none

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-8515

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5365.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5365


commit 8cd47d5e6110e29e4c6c36aea08543048010b9aa
Author: Bowen Li 
Date:   2018-01-26T04:25:00Z

[FLINK-8515] update RocksDBMapState to replace deprecated remove() with 
delete()




> update RocksDBMapState to replace deprecated remove() with delete()
> ---
>
> Key: FLINK-8515
> URL: https://issues.apache.org/jira/browse/FLINK-8515
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.5.0
>
>
> Currently in RocksDBMapState:
> {code:java}
> @Override
>   public void remove(UK userKey) throws IOException, RocksDBException {
>   byte[] rawKeyBytes = 
> serializeUserKeyWithCurrentKeyAndNamespace(userKey);
>   backend.db.remove(columnFamily, writeOptions, rawKeyBytes);
>   }
> {code}
> remove() is actually deprecated. Should be replaced with delete()



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5365: [FLINK-8515] update RocksDBMapState to replace dep...

2018-01-25 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5365

[FLINK-8515] update RocksDBMapState to replace deprecated remove() with 
delete()

## What is the purpose of the change

RocksDBMapState is currently using `rocksdb#remove()` which is deprecated. 
Should be replaced with `rocksdb#delete()`

## Brief change log

update RocksDBMapState to replace deprecated remove() with delete()

## Verifying this change

This change is already covered by existing tests, such as 
`StateBackendTestBase#testMapState()`

## Does this pull request potentially affect one of the following parts:

none

## Documentation

none

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-8515

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5365.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5365


commit 8cd47d5e6110e29e4c6c36aea08543048010b9aa
Author: Bowen Li 
Date:   2018-01-26T04:25:00Z

[FLINK-8515] update RocksDBMapState to replace deprecated remove() with 
delete()




---


[jira] [Comment Edited] (FLINK-4534) Lack of synchronization in BucketingSink#restoreState()

2018-01-25 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16304163#comment-16304163
 ] 

Ted Yu edited comment on FLINK-4534 at 1/26/18 4:27 AM:


Can this get more review ?

Thanks


was (Author: yuzhih...@gmail.com):
lgtm

> Lack of synchronization in BucketingSink#restoreState()
> ---
>
> Key: FLINK-4534
> URL: https://issues.apache.org/jira/browse/FLINK-4534
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Major
>
> Iteration over state.bucketStates is protected by synchronization in other 
> methods, except for the following in restoreState():
> {code}
> for (BucketState bucketState : state.bucketStates.values()) {
> {code}
> and following in close():
> {code}
> for (Map.Entry entry : 
> state.bucketStates.entrySet()) {
>   closeCurrentPartFile(entry.getValue());
> {code}
> w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue 
> starting line 752:
> {code}
>   Set pastCheckpointIds = 
> bucketState.pendingFilesPerCheckpoint.keySet();
>   LOG.debug("Moving pending files to final location on restore.");
>   for (Long pastCheckpointId : pastCheckpointIds) {
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-6105) Properly handle InterruptedException in HadoopInputFormatBase

2018-01-25 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307281#comment-16307281
 ] 

Ted Yu edited comment on FLINK-6105 at 1/26/18 4:26 AM:


In 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
 :
{code}
  try {
Thread.sleep(500);
  } catch (InterruptedException e1) {
// ignore it
  }
{code}
Interrupt status should be restored.


was (Author: yuzhih...@gmail.com):
In 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
 :
{code}
  try {
Thread.sleep(500);
  } catch (InterruptedException e1) {
// ignore it
  }
{code}

> Properly handle InterruptedException in HadoopInputFormatBase
> -
>
> Key: FLINK-6105
> URL: https://issues.apache.org/jira/browse/FLINK-6105
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Major
>
> When catching InterruptedException, we should throw InterruptedIOException 
> instead of IOException.
> The following example is from HadoopInputFormatBase :
> {code}
> try {
>   splits = this.mapreduceInputFormat.getSplits(jobContext);
> } catch (InterruptedException e) {
>   throw new IOException("Could not get Splits.", e);
> }
> {code}
> There may be other places where IOE is thrown.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-01-25 Thread Thomas Weise (JIRA)
Thomas Weise created FLINK-8516:
---

 Summary: FlinkKinesisConsumer does not balance shards over subtasks
 Key: FLINK-8516
 URL: https://issues.apache.org/jira/browse/FLINK-8516
 Project: Flink
  Issue Type: Bug
  Components: Kinesis Connector
Reporter: Thomas Weise


The hash code of the shard is used to distribute discovered shards over 
subtasks round robin. This works as long as shard identifiers are sequential. 
After shards are rebalanced in Kinesis, that may no longer be the case and the 
distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8515) update RocksDBMapState to replace deprecated remove() with delete()

2018-01-25 Thread Bowen Li (JIRA)
Bowen Li created FLINK-8515:
---

 Summary: update RocksDBMapState to replace deprecated remove() 
with delete()
 Key: FLINK-8515
 URL: https://issues.apache.org/jira/browse/FLINK-8515
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Affects Versions: 1.5.0
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.5.0


Currently in RocksDBMapState:


{code:java}
@Override
public void remove(UK userKey) throws IOException, RocksDBException {
byte[] rawKeyBytes = 
serializeUserKeyWithCurrentKeyAndNamespace(userKey);

backend.db.remove(columnFamily, writeOptions, rawKeyBytes);
}
{code}

remove() is actually deprecated. Should be replaced with delete()




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8514) move flink-connector-wikiedits to Apache Bahir

2018-01-25 Thread Bowen Li (JIRA)
Bowen Li created FLINK-8514:
---

 Summary: move flink-connector-wikiedits to Apache Bahir
 Key: FLINK-8514
 URL: https://issues.apache.org/jira/browse/FLINK-8514
 Project: Flink
  Issue Type: Sub-task
Reporter: Bowen Li
Assignee: Bowen Li


I propose moving flink-connector-wikiedits to Apache Bahir given its low 
popularity. We can probably email the community to see if anyone actually still 
uses it



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8448) [flink-mesos] The flink-job jars uploaded in directory 'web.upload.dir' are deleted on flink-scheduler restart

2018-01-25 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16340524#comment-16340524
 ] 

Chesnay Schepler commented on FLINK-8448:
-

I'm not really into the specifics of flink-mesos, but the directory where jars 
are stored is cleaned up and there's no mechanism to retain uploaded jars. I 
did think about this behavior configurable; that would allow people to use a 
directory as a kind of repository for jars to use.

There may be another open issue for this, let's see if i can find it...

> [flink-mesos] The flink-job jars uploaded in directory 'web.upload.dir' are 
> deleted on flink-scheduler restart
> --
>
> Key: FLINK-8448
> URL: https://issues.apache.org/jira/browse/FLINK-8448
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.3.2
> Environment: Flink 1.3 version
>  
>Reporter: Bhumika Bayani
>Priority: Major
>
> Whatever flink-job-jars are uploaded from flink-jobmanager UI, we lose them 
> when flink-mesos-scheduler restart happens
> does flink have any mechanism to retain them?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8189) move flink-statebackend-rocksdb out of flink-contrib

2018-01-25 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16340519#comment-16340519
 ] 

Chesnay Schepler commented on FLINK-8189:
-

I'm tempted to close this as "Won't fix"/"Duplicate" due to the discussion in 
FLINK-4602.

> move flink-statebackend-rocksdb out of flink-contrib
> 
>
> Key: FLINK-8189
> URL: https://issues.apache.org/jira/browse/FLINK-8189
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>
> Move {{flink-statebackend-rocksdb}} into probably its own/state-backend 
> module or {{flink-runtime}} package.
> A few reasons:
> * RocksDB state backend has been evolving into the standard state backends 
> because it can handle much larger state compared to HeapStateBackend. It 
> would be better to put it into /lib package in Flink somehow by default
> * {{flink-contrib}} doesn't show the correct package hierarchy



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-3089) State API Should Support Data Expiration (State TTL)

2018-01-25 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16340516#comment-16340516
 ] 

Bowen Li commented on FLINK-3089:
-

So TtlDB can only support one TTL in one db opening. If you close Ttldb and 
reopen it, you can specify another TTL. I don't think frequently opening and 
closing db is a good idea. Thus, maybe the 3rd important assumption we have to 
make is that all states that have an expiry must share the same TTL.

> State API Should Support Data Expiration (State TTL)
> 
>
> Key: FLINK-3089
> URL: https://issues.apache.org/jira/browse/FLINK-3089
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: Niels Basjes
>Assignee: Bowen Li
>Priority: Major
>
> In some usecases (webanalytics) there is a need to have a state per visitor 
> on a website (i.e. keyBy(sessionid) ).
> At some point the visitor simply leaves and no longer creates new events (so 
> a special 'end of session' event will not occur).
> The only way to determine that a visitor has left is by choosing a timeout, 
> like "After 30 minutes no events we consider the visitor 'gone'".
> Only after this (chosen) timeout has expired should we discard this state.
> In the Trigger part of Windows we can set a timer and close/discard this kind 
> of information. But that introduces the buffering effect of the window (which 
> in some scenarios is unwanted).
> What I would like is to be able to set a timeout on a specific state which I 
> can update afterwards.
> This makes it possible to create a map function that assigns the right value 
> and that discards the state automatically.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8189) move flink-statebackend-rocksdb out of flink-contrib

2018-01-25 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-8189:

Description: 
Move {{flink-statebackend-rocksdb}} into probably its own/state-backend module 
or {{flink-runtime}} package.

A few reasons:
* RocksDB state backend has been evolving into the standard state backends 
because it can handle much larger state compared to HeapStateBackend. It would 
be better to put it into /lib package in Flink somehow by default
* {{flink-contrib}} doesn't show the correct package hierarchy


  was:
Move {{flink-statebackend-rocksdb}} into probably its own/state-backend module 
or {{flink-runtime}} package.

A few reasons:
* RocksDB state backend has been evolving into the standard state backends 
because it can handle much larger state compared to HeapStateBackend. It would 
be better to put it into /lib package in Flink somehow by default
* {{flink-contrib}} doesn't show the correct package hierachy of 


> move flink-statebackend-rocksdb out of flink-contrib
> 
>
> Key: FLINK-8189
> URL: https://issues.apache.org/jira/browse/FLINK-8189
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>
> Move {{flink-statebackend-rocksdb}} into probably its own/state-backend 
> module or {{flink-runtime}} package.
> A few reasons:
> * RocksDB state backend has been evolving into the standard state backends 
> because it can handle much larger state compared to HeapStateBackend. It 
> would be better to put it into /lib package in Flink somehow by default
> * {{flink-contrib}} doesn't show the correct package hierarchy



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8189) move flink-statebackend-rocksdb out of flink-contrib

2018-01-25 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-8189:

Description: 
Move {{flink-statebackend-rocksdb}} into probably its own/state-backend module 
or {{flink-runtime}} package.

A few reasons:
* RocksDB state backend has been evolving into the standard state backends 
because it can handle much larger state compared to HeapStateBackend. It would 
be better to put it into /lib package in Flink somehow by default
* {{flink-contrib}} doesn't show the correct package hierachy of 

  was:Move {{flink-statebackend-rocksdb}} into probably its own/state-backend 
module or {{flink-runtime}} package.


> move flink-statebackend-rocksdb out of flink-contrib
> 
>
> Key: FLINK-8189
> URL: https://issues.apache.org/jira/browse/FLINK-8189
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>
> Move {{flink-statebackend-rocksdb}} into probably its own/state-backend 
> module or {{flink-runtime}} package.
> A few reasons:
> * RocksDB state backend has been evolving into the standard state backends 
> because it can handle much larger state compared to HeapStateBackend. It 
> would be better to put it into /lib package in Flink somehow by default
> * {{flink-contrib}} doesn't show the correct package hierachy of 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8189) move flink-statebackend-rocksdb out of flink-contrib

2018-01-25 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16340436#comment-16340436
 ] 

Bowen Li commented on FLINK-8189:
-

[~till.rohrmann] [~aljoscha] [~srichter] Hi guys, I'm thinking of start this 
task since we've finished removing {{flink-contrib/flink-streaming-contrib}}. 

I'm leaning towards moving {{flink-statebackend-rocksdb}} into 
{{flink-runtime}} so that it can sit with {{HeapStateBackend}}.

What do you think?

> move flink-statebackend-rocksdb out of flink-contrib
> 
>
> Key: FLINK-8189
> URL: https://issues.apache.org/jira/browse/FLINK-8189
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>
> Move {{flink-statebackend-rocksdb}} into probably its own/state-backend 
> module or {{flink-runtime}} package.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8490) Allow custom docker parameters for docker tasks on Mesos

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16340240#comment-16340240
 ] 

ASF GitHub Bot commented on FLINK-8490:
---

Github user joerg84 commented on the issue:

https://github.com/apache/flink/pull/5346
  
@tillrohrmann Could you take a final look?


> Allow custom docker parameters for docker tasks on Mesos
> 
>
> Key: FLINK-8490
> URL: https://issues.apache.org/jira/browse/FLINK-8490
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Reporter: Jörg Schad
>Priority: Major
>
> It would be great to pass custom parameters to Mesos when using the Docker 
> Containerizer.
> This could be similar to this spark example: 
> `spark.mesos.executor.docker.parameters privileged=true`
>  
> Originally brought up here: 
> https://stackoverflow.com/questions/48393153/passing-custom-parameters-to-docker-when-running-flink-on-mesos-marathon?noredirect=1#comment83777480_48393153



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5346: [FLINK-8490] [mesos] Allow custom docker parameters for d...

2018-01-25 Thread joerg84
Github user joerg84 commented on the issue:

https://github.com/apache/flink/pull/5346
  
@tillrohrmann Could you take a final look?


---


[jira] [Commented] (FLINK-7386) Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ client

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16340113#comment-16340113
 ] 

ASF GitHub Bot commented on FLINK-7386:
---

Github user cjolif commented on the issue:

https://github.com/apache/flink/pull/4675
  
FYI I rebased it and got working results on a sample of mine.


> Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ 
> client
> 
>
> Key: FLINK-7386
> URL: https://issues.apache.org/jira/browse/FLINK-7386
> Project: Flink
>  Issue Type: Improvement
>  Components: ElasticSearch Connector
>Reporter: Dawid Wysakowicz
>Assignee: Fang Yong
>Priority: Critical
> Fix For: 1.5.0
>
>
> In Elasticsearch 5.2.0 client the class {{BulkProcessor}} was refactored and 
> has no longer the method {{add(ActionRequest)}}.
> For more info see: https://github.com/elastic/elasticsearch/pull/20109



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #4675: [FLINK-7386] FIx Elasticsearch 5 connector is not compati...

2018-01-25 Thread cjolif
Github user cjolif commented on the issue:

https://github.com/apache/flink/pull/4675
  
FYI I rebased it and got working results on a sample of mine.


---


[jira] [Commented] (FLINK-8267) update Kinesis Producer example for setting Region key

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339853#comment-16339853
 ] 

ASF GitHub Bot commented on FLINK-8267:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5301
  
cc @tzulitai 


> update Kinesis Producer example for setting Region key
> --
>
> Key: FLINK-8267
> URL: https://issues.apache.org/jira/browse/FLINK-8267
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Dyana Rose
>Assignee: Bowen Li
>Priority: Minor
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kinesis.html#kinesis-producer
> In the example code for the kinesis producer the region key is set like:
> {code:java}
> producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
> {code}
> However, the AWS Kinesis Producer Library requires that the region key be 
> Region 
> (https://github.com/awslabs/amazon-kinesis-producer/blob/94789ff4bb2f5dfa05aafe2d8437d3889293f264/java/amazon-kinesis-producer-sample/default_config.properties#L269)
>  so the setting at this point should be:
> {code:java}
> producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
> producerConfig.put("Region", "us-east-1");
> {code}
> When you run the Kinesis Producer you can see the effect of not setting the 
> Region key by a log line
> {noformat}
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer  - Started 
> Kinesis producer instance for region ''
> {noformat}
> The KPL also then assumes it's running on EC2 and attempts to determine it's 
> own region, which fails.
> {noformat}
> (EC2MetadataClient)Http request to Ec2MetadataService failed.
> [error] [main.cc:266] Could not configure the region. It was not given in the 
> config and we were unable to retrieve it from EC2 metadata
> {noformat}
> At the least I'd say the documentation should mention the difference between 
> these two keys and when you are required to also set the Region key.
> On the other hand, is this even the intended behaviour of this connector? Was 
> it intended that the AWSConfigConstants.AWS_REGION key also set the region of 
> the of the kinesis stream? The documentation for the example states 
> {noformat}
> The example demonstrates producing a single Kinesis stream in the AWS region 
> “us-east-1”.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5301: [FLINK-8267] [Kinesis Connector] update Kinesis Producer ...

2018-01-25 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5301
  
cc @tzulitai 


---


[GitHub] flink pull request #5149: [FLINK-7858][flp6] Port JobVertexTaskManagersHandl...

2018-01-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5149


---


[jira] [Resolved] (FLINK-8224) Should shudownApplication when job terminated in job mode

2018-01-25 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-8224.
--
   Resolution: Fixed
Fix Version/s: 1.5.0

Fixed via a4ecc7ffe4ba16a68de06c1053c7916e6082b413

> Should shudownApplication when job terminated in job mode
> -
>
> Key: FLINK-8224
> URL: https://issues.apache.org/jira/browse/FLINK-8224
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> For job mode, one job is an application. When job finished, it should tell 
> the resource manager to shutdown the application, otherwise the resource 
> manager can not set the application status. For example, if yarn resource 
> manager don't set application as finished to yarn master, the yarn will 
> consider the application as still running.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7858) Port JobVertexTaskManagersHandler to REST endpoint

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339808#comment-16339808
 ] 

ASF GitHub Bot commented on FLINK-7858:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5149


> Port JobVertexTaskManagersHandler to REST endpoint
> --
>
> Key: FLINK-7858
> URL: https://issues.apache.org/jira/browse/FLINK-7858
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
> Fix For: 1.5.0
>
>
> Port JobVertexTaskManagersHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5170: [FLINK-8266] [runtime] add network memory to Resou...

2018-01-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5170


---


[jira] [Resolved] (FLINK-7858) Port JobVertexTaskManagersHandler to REST endpoint

2018-01-25 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-7858.
--
   Resolution: Fixed
Fix Version/s: 1.5.0

Fixed via 

056c72af994bc0b7bd838faff6b2991763fc2ac1

37b4e2cef687160f2bc7cedb7d2360825089569e

> Port JobVertexTaskManagersHandler to REST endpoint
> --
>
> Key: FLINK-7858
> URL: https://issues.apache.org/jira/browse/FLINK-7858
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
> Fix For: 1.5.0
>
>
> Port JobVertexTaskManagersHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8224) Should shudownApplication when job terminated in job mode

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339806#comment-16339806
 ] 

ASF GitHub Bot commented on FLINK-8224:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5139


> Should shudownApplication when job terminated in job mode
> -
>
> Key: FLINK-8224
> URL: https://issues.apache.org/jira/browse/FLINK-8224
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> For job mode, one job is an application. When job finished, it should tell 
> the resource manager to shutdown the application, otherwise the resource 
> manager can not set the application status. For example, if yarn resource 
> manager don't set application as finished to yarn master, the yarn will 
> consider the application as still running.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8266) Add network memory to ResourceProfile

2018-01-25 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-8266.
--
   Resolution: Fixed
Fix Version/s: 1.5.0

Fixed via 47e6069d7a299c02a81f062a7acb6a792b71c146

> Add network memory to ResourceProfile
> -
>
> Key: FLINK-8266
> URL: https://issues.apache.org/jira/browse/FLINK-8266
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In task manager side, it should allocated the network buffer pool according 
> to the input channel and output sub partition number, but when allocating a 
> worker, the resource profile doesn't contain the information about these 
> memory. 
> So I suggest add a network memory filed to ResourceProfile and job master 
> should calculate it when scheduling a task and then resource manager can 
> allocating a container with the resource profile.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8266) Add network memory to ResourceProfile

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339807#comment-16339807
 ] 

ASF GitHub Bot commented on FLINK-8266:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5170


> Add network memory to ResourceProfile
> -
>
> Key: FLINK-8266
> URL: https://issues.apache.org/jira/browse/FLINK-8266
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In task manager side, it should allocated the network buffer pool according 
> to the input channel and output sub partition number, but when allocating a 
> worker, the resource profile doesn't contain the information about these 
> memory. 
> So I suggest add a network memory filed to ResourceProfile and job master 
> should calculate it when scheduling a task and then resource manager can 
> allocating a container with the resource profile.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5139: [FLINK-8224] [runtime] shutdown application when j...

2018-01-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5139


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163938277
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
 ---
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Objects;
+
+public class RegisteredBroadcastBackendStateMetaInfo {
+
+   /** The name of the state, as registered by the user. */
+   private final String name;
+
+   /** The mode how elements in this state are assigned to tasks during 
restore. */
+   private final OperatorStateHandle.Mode assignmentMode;
+
+   /** The type serializer for the keys in the map state. */
+   private final TypeSerializer keySerializer;
+
+   /** The type serializer for the values in the map state. */
+   private final TypeSerializer valueSerializer;
+
+   public RegisteredBroadcastBackendStateMetaInfo(
+   final String name,
+   final OperatorStateHandle.Mode assignmentMode,
+   final TypeSerializer keySerializer,
+   final TypeSerializer valueSerializer) {
+
+   Preconditions.checkArgument(assignmentMode != null && 
assignmentMode == OperatorStateHandle.Mode.UNIFORM_BROADCAST);
+
+   this.name = Preconditions.checkNotNull(name);
+   this.assignmentMode = assignmentMode;
+   this.keySerializer = Preconditions.checkNotNull(keySerializer);
+   this.valueSerializer = 
Preconditions.checkNotNull(valueSerializer);
+   }
+
+   public String getName() {
+   return name;
+   }
+
+   public TypeSerializer getKeySerializer() {
+   return keySerializer;
+   }
+
+   public TypeSerializer getValueSerializer() {
+   return valueSerializer;
+   }
+
+   public OperatorStateHandle.Mode getAssignmentMode() {
+   return assignmentMode;
+   }
+
+   public RegisteredBroadcastBackendStateMetaInfo.Snapshot 
snapshot() {
+   return new RegisteredBroadcastBackendStateMetaInfo.Snapshot<>(
+   name,
+   assignmentMode,
+   keySerializer.duplicate(),
+   valueSerializer.duplicate(),
+   keySerializer.snapshotConfiguration(),
+   valueSerializer.snapshotConfiguration());
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   if (obj == this) {
+   return true;
+   }
+
+   if (!(obj instanceof RegisteredBroadcastBackendStateMetaInfo)) {
+   return false;
+   }
+
+   final RegisteredBroadcastBackendStateMetaInfo other =
+   (RegisteredBroadcastBackendStateMetaInfo) obj;
+
+   return Objects.equals(name, other.getName())
+   && Objects.equals(assignmentMode, 
other.getAssignmentMode())
+   && Objects.equals(keySerializer, 
other.getKeySerializer())
+   && Objects.equals(valueSerializer, 
other.getValueSerializer());
+   }
+
+   @Override
+   public int hashCode() {
+   int result = name.hashCode();
+   result = 31 * result + assignmentMode.hashCode();
+   result = 31 * result + keySerializer.hashCode();
+   result = 31 * result + valueSerializer.hashCode();
+   return result;
+   }
+
+   @Override
+   public String toString() {
+   return 

[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163933180
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
 ---
@@ -77,16 +90,29 @@ public void read(DataInputView in) throws IOException {
super.read(in);
 
int numKvStates = in.readShort();
-   stateMetaInfoSnapshots = new ArrayList<>(numKvStates);
+   operatorStateMetaInfoSnapshots = new ArrayList<>(numKvStates);
for (int i = 0; i < numKvStates; i++) {
-   stateMetaInfoSnapshots.add(
-   
OperatorBackendStateMetaInfoSnapshotReaderWriters
-   .getReaderForVersion(getReadVersion(), 
userCodeClassLoader)
-   .readStateMetaInfo(in));
+   operatorStateMetaInfoSnapshots.add(
+   
OperatorBackendStateMetaInfoSnapshotReaderWriters
+   
.getOperatorStateReaderForVersion(getReadVersion(), userCodeClassLoader)
+   
.readOperatorStateMetaInfo(in));
}
+
+   int numBroadcastStates = in.readShort();
--- End diff --

This here (and onwards) would fail if we're reading older version 
savepoints, because there was nothing written for this before.


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163936390
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
 ---
@@ -36,8 +36,9 @@
 * The modes that determine how an {@link OperatorStateHandle} is 
assigned to tasks during restore.
 */
public enum Mode {
-   SPLIT_DISTRIBUTE, // The operator state partitions in the state 
handle are split and distributed to one task each.
-   BROADCAST // The operator state partitions are broadcast to all 
task.
+   SPLIT_DISTRIBUTE,   // The operator state partitions in the 
state handle are split and distributed to one task each.
+   BROADCAST,  // The operator state 
partitions are broadcasted to all tasks.
+   UNIFORM_BROADCAST   // The operator states are identical, 
and they are broadcasted to all tasks.
--- End diff --

nit: can we either keep with spaces here, or at least tab them so that the 
3 comments are aligned?



---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163938102
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
 ---
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Objects;
+
+public class RegisteredBroadcastBackendStateMetaInfo {
+
+   /** The name of the state, as registered by the user. */
+   private final String name;
+
+   /** The mode how elements in this state are assigned to tasks during 
restore. */
+   private final OperatorStateHandle.Mode assignmentMode;
+
+   /** The type serializer for the keys in the map state. */
+   private final TypeSerializer keySerializer;
+
+   /** The type serializer for the values in the map state. */
+   private final TypeSerializer valueSerializer;
+
+   public RegisteredBroadcastBackendStateMetaInfo(
+   final String name,
+   final OperatorStateHandle.Mode assignmentMode,
+   final TypeSerializer keySerializer,
+   final TypeSerializer valueSerializer) {
+
+   Preconditions.checkArgument(assignmentMode != null && 
assignmentMode == OperatorStateHandle.Mode.UNIFORM_BROADCAST);
+
+   this.name = Preconditions.checkNotNull(name);
+   this.assignmentMode = assignmentMode;
+   this.keySerializer = Preconditions.checkNotNull(keySerializer);
+   this.valueSerializer = 
Preconditions.checkNotNull(valueSerializer);
+   }
+
+   public String getName() {
+   return name;
+   }
+
+   public TypeSerializer getKeySerializer() {
+   return keySerializer;
+   }
+
+   public TypeSerializer getValueSerializer() {
+   return valueSerializer;
+   }
+
+   public OperatorStateHandle.Mode getAssignmentMode() {
+   return assignmentMode;
+   }
+
+   public RegisteredBroadcastBackendStateMetaInfo.Snapshot 
snapshot() {
+   return new RegisteredBroadcastBackendStateMetaInfo.Snapshot<>(
+   name,
+   assignmentMode,
+   keySerializer.duplicate(),
+   valueSerializer.duplicate(),
+   keySerializer.snapshotConfiguration(),
+   valueSerializer.snapshotConfiguration());
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   if (obj == this) {
+   return true;
+   }
+
+   if (!(obj instanceof RegisteredBroadcastBackendStateMetaInfo)) {
+   return false;
+   }
+
+   final RegisteredBroadcastBackendStateMetaInfo other =
+   (RegisteredBroadcastBackendStateMetaInfo) obj;
+
+   return Objects.equals(name, other.getName())
+   && Objects.equals(assignmentMode, 
other.getAssignmentMode())
+   && Objects.equals(keySerializer, 
other.getKeySerializer())
+   && Objects.equals(valueSerializer, 
other.getValueSerializer());
+   }
+
+   @Override
+   public int hashCode() {
+   int result = name.hashCode();
+   result = 31 * result + assignmentMode.hashCode();
+   result = 31 * result + keySerializer.hashCode();
+   result = 31 * result + valueSerializer.hashCode();
+   return result;
+   }
+
+   @Override
+   public String toString() {
+   return 

[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163935065
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
 ---
@@ -77,16 +90,29 @@ public void read(DataInputView in) throws IOException {
super.read(in);
 
int numKvStates = in.readShort();
-   stateMetaInfoSnapshots = new ArrayList<>(numKvStates);
+   operatorStateMetaInfoSnapshots = new ArrayList<>(numKvStates);
for (int i = 0; i < numKvStates; i++) {
-   stateMetaInfoSnapshots.add(
-   
OperatorBackendStateMetaInfoSnapshotReaderWriters
-   .getReaderForVersion(getReadVersion(), 
userCodeClassLoader)
-   .readStateMetaInfo(in));
+   operatorStateMetaInfoSnapshots.add(
+   
OperatorBackendStateMetaInfoSnapshotReaderWriters
+   
.getOperatorStateReaderForVersion(getReadVersion(), userCodeClassLoader)
+   
.readOperatorStateMetaInfo(in));
}
+
+   int numBroadcastStates = in.readShort();
--- End diff --

One straightforward way to fix this is, is to uptick the current `VERSION` 
to 3, and here you do:

```
if (getReadVersion() >= 3) {
// read broadcast state stuff
}
```

so we only try to read broadcast state stuff if the written version in the 
savepoint is larger or equal to 3.


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163935632
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
 ---
@@ -211,4 +297,34 @@ public 
OperatorBackendStateMetaInfoReaderV2(ClassLoader userCodeClassLoader) {
return stateMetaInfo;
}
}
+
+   public static class BroadcastStateMetaInfoReaderV2 extends 
AbstractBroadcastStateMetaInfoReader {
+
+   public BroadcastStateMetaInfoReaderV2(final ClassLoader 
userCodeClassLoader) {
+   super(userCodeClassLoader);
+   }
+
+   @Override
+   public RegisteredBroadcastBackendStateMetaInfo.Snapshot 
readBroadcastStateMetaInfo(final DataInputView in) throws IOException {
+   RegisteredBroadcastBackendStateMetaInfo.Snapshot 
stateMetaInfo =
+   new 
RegisteredBroadcastBackendStateMetaInfo.Snapshot<>();
+
+   stateMetaInfo.setName(in.readUTF());
+   
stateMetaInfo.setAssignmentMode(OperatorStateHandle.Mode.values()[in.readByte()]);
+
+   Tuple2 
keySerializerAndConfig =
+   
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, 
userCodeClassLoader).get(0);
--- End diff --

If the `writeSerializersAndConfigsWithResilience` call was a single one in 
the writer, then here you can also just get all written serializers and configs 
with a single `readSerializersAndConfigsWithResilience`.
The returned list would be length 2 (order of the key / value serializer + 
config will be the same as how you wrote them).


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163932401
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
 ---
@@ -64,11 +70,18 @@ public int getVersion() {
public void write(DataOutputView out) throws IOException {
super.write(out);
 
-   out.writeShort(stateMetaInfoSnapshots.size());
-   for (RegisteredOperatorBackendStateMetaInfo.Snapshot kvState 
: stateMetaInfoSnapshots) {
+   out.writeShort(operatorStateMetaInfoSnapshots.size());
+   for (RegisteredOperatorBackendStateMetaInfo.Snapshot kvState 
: operatorStateMetaInfoSnapshots) {
+   OperatorBackendStateMetaInfoSnapshotReaderWriters
+   
.getOperatorStateWriterForVersion(VERSION, kvState)
+   .writeOperatorStateMetaInfo(out);
+   }
+
+   out.writeShort(broadcastStateMetaInfoSnapshots.size());
+   for (RegisteredBroadcastBackendStateMetaInfo.Snapshot 
kvState : broadcastStateMetaInfoSnapshots) {
--- End diff --

same here: the naming of the `kvState` variable here is actually a bit odd


---


[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339693#comment-16339693
 ] 

ASF GitHub Bot commented on FLINK-8345:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163932401
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
 ---
@@ -64,11 +70,18 @@ public int getVersion() {
public void write(DataOutputView out) throws IOException {
super.write(out);
 
-   out.writeShort(stateMetaInfoSnapshots.size());
-   for (RegisteredOperatorBackendStateMetaInfo.Snapshot kvState 
: stateMetaInfoSnapshots) {
+   out.writeShort(operatorStateMetaInfoSnapshots.size());
+   for (RegisteredOperatorBackendStateMetaInfo.Snapshot kvState 
: operatorStateMetaInfoSnapshots) {
+   OperatorBackendStateMetaInfoSnapshotReaderWriters
+   
.getOperatorStateWriterForVersion(VERSION, kvState)
+   .writeOperatorStateMetaInfo(out);
+   }
+
+   out.writeShort(broadcastStateMetaInfoSnapshots.size());
+   for (RegisteredBroadcastBackendStateMetaInfo.Snapshot 
kvState : broadcastStateMetaInfoSnapshots) {
--- End diff --

same here: the naming of the `kvState` variable here is actually a bit odd


> Iterate over keyed state on broadcast side of connect with broadcast.
> -
>
> Key: FLINK-8345
> URL: https://issues.apache.org/jira/browse/FLINK-8345
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339698#comment-16339698
 ] 

ASF GitHub Bot commented on FLINK-8345:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163938102
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
 ---
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Objects;
+
+public class RegisteredBroadcastBackendStateMetaInfo {
+
+   /** The name of the state, as registered by the user. */
+   private final String name;
+
+   /** The mode how elements in this state are assigned to tasks during 
restore. */
+   private final OperatorStateHandle.Mode assignmentMode;
+
+   /** The type serializer for the keys in the map state. */
+   private final TypeSerializer keySerializer;
+
+   /** The type serializer for the values in the map state. */
+   private final TypeSerializer valueSerializer;
+
+   public RegisteredBroadcastBackendStateMetaInfo(
+   final String name,
+   final OperatorStateHandle.Mode assignmentMode,
+   final TypeSerializer keySerializer,
+   final TypeSerializer valueSerializer) {
+
+   Preconditions.checkArgument(assignmentMode != null && 
assignmentMode == OperatorStateHandle.Mode.UNIFORM_BROADCAST);
+
+   this.name = Preconditions.checkNotNull(name);
+   this.assignmentMode = assignmentMode;
+   this.keySerializer = Preconditions.checkNotNull(keySerializer);
+   this.valueSerializer = 
Preconditions.checkNotNull(valueSerializer);
+   }
+
+   public String getName() {
+   return name;
+   }
+
+   public TypeSerializer getKeySerializer() {
+   return keySerializer;
+   }
+
+   public TypeSerializer getValueSerializer() {
+   return valueSerializer;
+   }
+
+   public OperatorStateHandle.Mode getAssignmentMode() {
+   return assignmentMode;
+   }
+
+   public RegisteredBroadcastBackendStateMetaInfo.Snapshot 
snapshot() {
+   return new RegisteredBroadcastBackendStateMetaInfo.Snapshot<>(
+   name,
+   assignmentMode,
+   keySerializer.duplicate(),
+   valueSerializer.duplicate(),
+   keySerializer.snapshotConfiguration(),
+   valueSerializer.snapshotConfiguration());
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   if (obj == this) {
+   return true;
+   }
+
+   if (!(obj instanceof RegisteredBroadcastBackendStateMetaInfo)) {
+   return false;
+   }
+
+   final RegisteredBroadcastBackendStateMetaInfo other =
+   (RegisteredBroadcastBackendStateMetaInfo) obj;
+
+   return Objects.equals(name, other.getName())
+   && Objects.equals(assignmentMode, 
other.getAssignmentMode())
+   && Objects.equals(keySerializer, 
other.getKeySerializer())
+   && Objects.equals(valueSerializer, 
other.getValueSerializer());
+   }
+
+   @Override
+   public int hashCode() {
+   int result = name.hashCode();
+   result = 31 * result + assignmentMode.hashCode();
+   result = 31 * result + 

[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163942745
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
 ---
@@ -77,16 +90,29 @@ public void read(DataInputView in) throws IOException {
super.read(in);
 
int numKvStates = in.readShort();
-   stateMetaInfoSnapshots = new ArrayList<>(numKvStates);
+   operatorStateMetaInfoSnapshots = new ArrayList<>(numKvStates);
for (int i = 0; i < numKvStates; i++) {
-   stateMetaInfoSnapshots.add(
-   
OperatorBackendStateMetaInfoSnapshotReaderWriters
-   .getReaderForVersion(getReadVersion(), 
userCodeClassLoader)
-   .readStateMetaInfo(in));
+   operatorStateMetaInfoSnapshots.add(
+   
OperatorBackendStateMetaInfoSnapshotReaderWriters
+   
.getOperatorStateReaderForVersion(getReadVersion(), userCodeClassLoader)
+   
.readOperatorStateMetaInfo(in));
}
+
+   int numBroadcastStates = in.readShort();
--- End diff --

I think somehow the migration test cases are not failing here, only because 
`in.readShort()` happens to return 0.


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163932956
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
 ---
@@ -35,18 +35,24 @@
 
public static final int VERSION = 2;
--- End diff --

It seems like the `OperatorBackendSerializationProxy` will have new binary 
formats after this change.
This should then have an uptick in the VERSION.

In general, I think the PR currently does not have any migration paths for 
previous versions (where there is no broadcast state meta info written).


---


[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339695#comment-16339695
 ] 

ASF GitHub Bot commented on FLINK-8345:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163933180
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
 ---
@@ -77,16 +90,29 @@ public void read(DataInputView in) throws IOException {
super.read(in);
 
int numKvStates = in.readShort();
-   stateMetaInfoSnapshots = new ArrayList<>(numKvStates);
+   operatorStateMetaInfoSnapshots = new ArrayList<>(numKvStates);
for (int i = 0; i < numKvStates; i++) {
-   stateMetaInfoSnapshots.add(
-   
OperatorBackendStateMetaInfoSnapshotReaderWriters
-   .getReaderForVersion(getReadVersion(), 
userCodeClassLoader)
-   .readStateMetaInfo(in));
+   operatorStateMetaInfoSnapshots.add(
+   
OperatorBackendStateMetaInfoSnapshotReaderWriters
+   
.getOperatorStateReaderForVersion(getReadVersion(), userCodeClassLoader)
+   
.readOperatorStateMetaInfo(in));
}
+
+   int numBroadcastStates = in.readShort();
--- End diff --

This here (and onwards) would fail if we're reading older version 
savepoints, because there was nothing written for this before.


> Iterate over keyed state on broadcast side of connect with broadcast.
> -
>
> Key: FLINK-8345
> URL: https://issues.apache.org/jira/browse/FLINK-8345
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339697#comment-16339697
 ] 

ASF GitHub Bot commented on FLINK-8345:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163935065
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
 ---
@@ -77,16 +90,29 @@ public void read(DataInputView in) throws IOException {
super.read(in);
 
int numKvStates = in.readShort();
-   stateMetaInfoSnapshots = new ArrayList<>(numKvStates);
+   operatorStateMetaInfoSnapshots = new ArrayList<>(numKvStates);
for (int i = 0; i < numKvStates; i++) {
-   stateMetaInfoSnapshots.add(
-   
OperatorBackendStateMetaInfoSnapshotReaderWriters
-   .getReaderForVersion(getReadVersion(), 
userCodeClassLoader)
-   .readStateMetaInfo(in));
+   operatorStateMetaInfoSnapshots.add(
+   
OperatorBackendStateMetaInfoSnapshotReaderWriters
+   
.getOperatorStateReaderForVersion(getReadVersion(), userCodeClassLoader)
+   
.readOperatorStateMetaInfo(in));
}
+
+   int numBroadcastStates = in.readShort();
--- End diff --

One straightforward way to fix this is, is to uptick the current `VERSION` 
to 3, and here you do:

```
if (getReadVersion() >= 3) {
// read broadcast state stuff
}
```

so we only try to read broadcast state stuff if the written version in the 
savepoint is larger or equal to 3.


> Iterate over keyed state on broadcast side of connect with broadcast.
> -
>
> Key: FLINK-8345
> URL: https://issues.apache.org/jira/browse/FLINK-8345
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339691#comment-16339691
 ] 

ASF GitHub Bot commented on FLINK-8345:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163932340
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
 ---
@@ -64,11 +70,18 @@ public int getVersion() {
public void write(DataOutputView out) throws IOException {
super.write(out);
 
-   out.writeShort(stateMetaInfoSnapshots.size());
-   for (RegisteredOperatorBackendStateMetaInfo.Snapshot kvState 
: stateMetaInfoSnapshots) {
+   out.writeShort(operatorStateMetaInfoSnapshots.size());
+   for (RegisteredOperatorBackendStateMetaInfo.Snapshot kvState 
: operatorStateMetaInfoSnapshots) {
--- End diff --

the naming of the `kvState` variable here is actually a bit odd


> Iterate over keyed state on broadcast side of connect with broadcast.
> -
>
> Key: FLINK-8345
> URL: https://issues.apache.org/jira/browse/FLINK-8345
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163934364
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
 ---
@@ -112,13 +142,40 @@ public void writeStateMetaInfo(DataOutputView out) 
throws IOException {
}
}
 
+   public static class BroadcastStateMetaInfoWriterV2 extends 
AbstractBroadcastStateMetaInfoWriter {
+
+   public BroadcastStateMetaInfoWriterV2(
+   final 
RegisteredBroadcastBackendStateMetaInfo.Snapshot broadcastStateMetaInfo) {
+   super(broadcastStateMetaInfo);
+   }
+
+   @Override
+   public void writeBroadcastStateMetaInfo(final DataOutputView 
out) throws IOException {
+   out.writeUTF(broadcastStateMetaInfo.getName());
+   
out.writeByte(broadcastStateMetaInfo.getAssignmentMode().ordinal());
+
+   // write in a way that allows us to be fault-tolerant 
and skip blocks in the case of java serialization failures
+   
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
+   out,
+   Collections.singletonList(new Tuple2<>(
+   
broadcastStateMetaInfo.getKeySerializer(),
+   
broadcastStateMetaInfo.getKeySerializerConfigSnapshot(;
+
+   
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
--- End diff --

Combining these two `writeSerializersAndConfigsWithResilience` calls into 
one call, with a single list containing both the key serializer and value 
serializer, would be more space-efficient in the written data:

```
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
out,
Arrays.asList(
Tuple2.of(keySerializer, keySerializerConfig),
Tuple2.of(valueSerializer, valueSerializerConfig));
```


---


[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339699#comment-16339699
 ] 

ASF GitHub Bot commented on FLINK-8345:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163934364
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
 ---
@@ -112,13 +142,40 @@ public void writeStateMetaInfo(DataOutputView out) 
throws IOException {
}
}
 
+   public static class BroadcastStateMetaInfoWriterV2 extends 
AbstractBroadcastStateMetaInfoWriter {
+
+   public BroadcastStateMetaInfoWriterV2(
+   final 
RegisteredBroadcastBackendStateMetaInfo.Snapshot broadcastStateMetaInfo) {
+   super(broadcastStateMetaInfo);
+   }
+
+   @Override
+   public void writeBroadcastStateMetaInfo(final DataOutputView 
out) throws IOException {
+   out.writeUTF(broadcastStateMetaInfo.getName());
+   
out.writeByte(broadcastStateMetaInfo.getAssignmentMode().ordinal());
+
+   // write in a way that allows us to be fault-tolerant 
and skip blocks in the case of java serialization failures
+   
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
+   out,
+   Collections.singletonList(new Tuple2<>(
+   
broadcastStateMetaInfo.getKeySerializer(),
+   
broadcastStateMetaInfo.getKeySerializerConfigSnapshot(;
+
+   
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
--- End diff --

Combining these two `writeSerializersAndConfigsWithResilience` calls into 
one call, with a single list containing both the key serializer and value 
serializer, would be more space-efficient in the written data:

```
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
out,
Arrays.asList(
Tuple2.of(keySerializer, keySerializerConfig),
Tuple2.of(valueSerializer, valueSerializerConfig));
```


> Iterate over keyed state on broadcast side of connect with broadcast.
> -
>
> Key: FLINK-8345
> URL: https://issues.apache.org/jira/browse/FLINK-8345
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339702#comment-16339702
 ] 

ASF GitHub Bot commented on FLINK-8345:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163938277
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
 ---
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Objects;
+
+public class RegisteredBroadcastBackendStateMetaInfo {
+
+   /** The name of the state, as registered by the user. */
+   private final String name;
+
+   /** The mode how elements in this state are assigned to tasks during 
restore. */
+   private final OperatorStateHandle.Mode assignmentMode;
+
+   /** The type serializer for the keys in the map state. */
+   private final TypeSerializer keySerializer;
+
+   /** The type serializer for the values in the map state. */
+   private final TypeSerializer valueSerializer;
+
+   public RegisteredBroadcastBackendStateMetaInfo(
+   final String name,
+   final OperatorStateHandle.Mode assignmentMode,
+   final TypeSerializer keySerializer,
+   final TypeSerializer valueSerializer) {
+
+   Preconditions.checkArgument(assignmentMode != null && 
assignmentMode == OperatorStateHandle.Mode.UNIFORM_BROADCAST);
+
+   this.name = Preconditions.checkNotNull(name);
+   this.assignmentMode = assignmentMode;
+   this.keySerializer = Preconditions.checkNotNull(keySerializer);
+   this.valueSerializer = 
Preconditions.checkNotNull(valueSerializer);
+   }
+
+   public String getName() {
+   return name;
+   }
+
+   public TypeSerializer getKeySerializer() {
+   return keySerializer;
+   }
+
+   public TypeSerializer getValueSerializer() {
+   return valueSerializer;
+   }
+
+   public OperatorStateHandle.Mode getAssignmentMode() {
+   return assignmentMode;
+   }
+
+   public RegisteredBroadcastBackendStateMetaInfo.Snapshot 
snapshot() {
+   return new RegisteredBroadcastBackendStateMetaInfo.Snapshot<>(
+   name,
+   assignmentMode,
+   keySerializer.duplicate(),
+   valueSerializer.duplicate(),
+   keySerializer.snapshotConfiguration(),
+   valueSerializer.snapshotConfiguration());
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   if (obj == this) {
+   return true;
+   }
+
+   if (!(obj instanceof RegisteredBroadcastBackendStateMetaInfo)) {
+   return false;
+   }
+
+   final RegisteredBroadcastBackendStateMetaInfo other =
+   (RegisteredBroadcastBackendStateMetaInfo) obj;
+
+   return Objects.equals(name, other.getName())
+   && Objects.equals(assignmentMode, 
other.getAssignmentMode())
+   && Objects.equals(keySerializer, 
other.getKeySerializer())
+   && Objects.equals(valueSerializer, 
other.getValueSerializer());
+   }
+
+   @Override
+   public int hashCode() {
+   int result = name.hashCode();
+   result = 31 * result + assignmentMode.hashCode();
+   result = 31 * result + 

[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339701#comment-16339701
 ] 

ASF GitHub Bot commented on FLINK-8345:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163942745
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
 ---
@@ -77,16 +90,29 @@ public void read(DataInputView in) throws IOException {
super.read(in);
 
int numKvStates = in.readShort();
-   stateMetaInfoSnapshots = new ArrayList<>(numKvStates);
+   operatorStateMetaInfoSnapshots = new ArrayList<>(numKvStates);
for (int i = 0; i < numKvStates; i++) {
-   stateMetaInfoSnapshots.add(
-   
OperatorBackendStateMetaInfoSnapshotReaderWriters
-   .getReaderForVersion(getReadVersion(), 
userCodeClassLoader)
-   .readStateMetaInfo(in));
+   operatorStateMetaInfoSnapshots.add(
+   
OperatorBackendStateMetaInfoSnapshotReaderWriters
+   
.getOperatorStateReaderForVersion(getReadVersion(), userCodeClassLoader)
+   
.readOperatorStateMetaInfo(in));
}
+
+   int numBroadcastStates = in.readShort();
--- End diff --

I think somehow the migration test cases are not failing here, only because 
`in.readShort()` happens to return 0.


> Iterate over keyed state on broadcast side of connect with broadcast.
> -
>
> Key: FLINK-8345
> URL: https://issues.apache.org/jira/browse/FLINK-8345
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339690#comment-16339690
 ] 

ASF GitHub Bot commented on FLINK-8345:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163932956
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
 ---
@@ -35,18 +35,24 @@
 
public static final int VERSION = 2;
--- End diff --

It seems like the `OperatorBackendSerializationProxy` will have new binary 
formats after this change.
This should then have an uptick in the VERSION.

In general, I think the PR currently does not have any migration paths for 
previous versions (where there is no broadcast state meta info written).


> Iterate over keyed state on broadcast side of connect with broadcast.
> -
>
> Key: FLINK-8345
> URL: https://issues.apache.org/jira/browse/FLINK-8345
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339692#comment-16339692
 ] 

ASF GitHub Bot commented on FLINK-8345:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163930707
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 ---
@@ -137,7 +155,12 @@ public ExecutionConfig getExecutionConfig() {
 
@Override
public Set getRegisteredStateNames() {
-   return registeredStates.keySet();
+   Set stateNames = new HashSet<>(
--- End diff --

Might not make sense to have a new `HashSet` every time 
`getRegisteredStateNames` is called.
OTOH, would it make sense to have a separate 
`getRegisteredBroadcastStateNames` on the interface?


> Iterate over keyed state on broadcast side of connect with broadcast.
> -
>
> Key: FLINK-8345
> URL: https://issues.apache.org/jira/browse/FLINK-8345
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163930707
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 ---
@@ -137,7 +155,12 @@ public ExecutionConfig getExecutionConfig() {
 
@Override
public Set getRegisteredStateNames() {
-   return registeredStates.keySet();
+   Set stateNames = new HashSet<>(
--- End diff --

Might not make sense to have a new `HashSet` every time 
`getRegisteredStateNames` is called.
OTOH, would it make sense to have a separate 
`getRegisteredBroadcastStateNames` on the interface?


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163932340
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
 ---
@@ -64,11 +70,18 @@ public int getVersion() {
public void write(DataOutputView out) throws IOException {
super.write(out);
 
-   out.writeShort(stateMetaInfoSnapshots.size());
-   for (RegisteredOperatorBackendStateMetaInfo.Snapshot kvState 
: stateMetaInfoSnapshots) {
+   out.writeShort(operatorStateMetaInfoSnapshots.size());
+   for (RegisteredOperatorBackendStateMetaInfo.Snapshot kvState 
: operatorStateMetaInfoSnapshots) {
--- End diff --

the naming of the `kvState` variable here is actually a bit odd


---


[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339700#comment-16339700
 ] 

ASF GitHub Bot commented on FLINK-8345:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163936390
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
 ---
@@ -36,8 +36,9 @@
 * The modes that determine how an {@link OperatorStateHandle} is 
assigned to tasks during restore.
 */
public enum Mode {
-   SPLIT_DISTRIBUTE, // The operator state partitions in the state 
handle are split and distributed to one task each.
-   BROADCAST // The operator state partitions are broadcast to all 
task.
+   SPLIT_DISTRIBUTE,   // The operator state partitions in the 
state handle are split and distributed to one task each.
+   BROADCAST,  // The operator state 
partitions are broadcasted to all tasks.
+   UNIFORM_BROADCAST   // The operator states are identical, 
and they are broadcasted to all tasks.
--- End diff --

nit: can we either keep with spaces here, or at least tab them so that the 
3 comments are aligned?



> Iterate over keyed state on broadcast side of connect with broadcast.
> -
>
> Key: FLINK-8345
> URL: https://issues.apache.org/jira/browse/FLINK-8345
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339696#comment-16339696
 ] 

ASF GitHub Bot commented on FLINK-8345:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163935632
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
 ---
@@ -211,4 +297,34 @@ public 
OperatorBackendStateMetaInfoReaderV2(ClassLoader userCodeClassLoader) {
return stateMetaInfo;
}
}
+
+   public static class BroadcastStateMetaInfoReaderV2 extends 
AbstractBroadcastStateMetaInfoReader {
+
+   public BroadcastStateMetaInfoReaderV2(final ClassLoader 
userCodeClassLoader) {
+   super(userCodeClassLoader);
+   }
+
+   @Override
+   public RegisteredBroadcastBackendStateMetaInfo.Snapshot 
readBroadcastStateMetaInfo(final DataInputView in) throws IOException {
+   RegisteredBroadcastBackendStateMetaInfo.Snapshot 
stateMetaInfo =
+   new 
RegisteredBroadcastBackendStateMetaInfo.Snapshot<>();
+
+   stateMetaInfo.setName(in.readUTF());
+   
stateMetaInfo.setAssignmentMode(OperatorStateHandle.Mode.values()[in.readByte()]);
+
+   Tuple2 
keySerializerAndConfig =
+   
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, 
userCodeClassLoader).get(0);
--- End diff --

If the `writeSerializersAndConfigsWithResilience` call was a single one in 
the writer, then here you can also just get all written serializers and configs 
with a single `readSerializersAndConfigsWithResilience`.
The returned list would be length 2 (order of the key / value serializer + 
config will be the same as how you wrote them).


> Iterate over keyed state on broadcast side of connect with broadcast.
> -
>
> Key: FLINK-8345
> URL: https://issues.apache.org/jira/browse/FLINK-8345
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163936903
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
 ---
@@ -36,8 +36,9 @@
 * The modes that determine how an {@link OperatorStateHandle} is 
assigned to tasks during restore.
 */
public enum Mode {
-   SPLIT_DISTRIBUTE, // The operator state partitions in the state 
handle are split and distributed to one task each.
-   BROADCAST // The operator state partitions are broadcast to all 
task.
+   SPLIT_DISTRIBUTE,   // The operator state partitions in the 
state handle are split and distributed to one task each.
+   BROADCAST,  // The operator state 
partitions are broadcasted to all tasks.
--- End diff --

Maybe naming this mode `BROADCAST` was not ideal in the first place 
(perhaps `UNION`, to correspond to the API name, would be better). 
Looking at the name / comments alone between `BROADCAST` and 
`UNIFORM_BROADCAST` is actually quite confusing.


---


[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339694#comment-16339694
 ] 

ASF GitHub Bot commented on FLINK-8345:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163936903
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
 ---
@@ -36,8 +36,9 @@
 * The modes that determine how an {@link OperatorStateHandle} is 
assigned to tasks during restore.
 */
public enum Mode {
-   SPLIT_DISTRIBUTE, // The operator state partitions in the state 
handle are split and distributed to one task each.
-   BROADCAST // The operator state partitions are broadcast to all 
task.
+   SPLIT_DISTRIBUTE,   // The operator state partitions in the 
state handle are split and distributed to one task each.
+   BROADCAST,  // The operator state 
partitions are broadcasted to all tasks.
--- End diff --

Maybe naming this mode `BROADCAST` was not ideal in the first place 
(perhaps `UNION`, to correspond to the API name, would be better). 
Looking at the name / comments alone between `BROADCAST` and 
`UNIFORM_BROADCAST` is actually quite confusing.


> Iterate over keyed state on broadcast side of connect with broadcast.
> -
>
> Key: FLINK-8345
> URL: https://issues.apache.org/jira/browse/FLINK-8345
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8513) Add documentation for connecting to non-AWS S3 endpoints

2018-01-25 Thread chris snow (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chris snow updated FLINK-8513:
--
Description: 
It would be useful if the documentation provided information on connecting to 
non-AWS S3 endpoints when using presto.  For example:

 

You need to configure both {{s3.access-key}} and {{s3.secret-key}} in Flink's 
{{flink-conf.yaml}}:
{code:java}
s3.access-key: your-access-key 
s3.secret-key: your-secret-key{code}
If you are using a non-AWS S3 endpoint (such as [IBM's Cloud Object 
Storage|https://www.ibm.com/cloud/object-storage]), you can configure the S3 
endpoint in Flink's {{flink-conf.yaml}}:
{code:java}
s3.endpoint: your-endpoint-hostname{code}

 

Source: [https://github.com/apache/flink/blob/master/docs/ops/deployment/aws.md]

 

  was:
It would be useful if the documentation provided information on connecting to 
non-AWS S3 endpoints when using presto.  For example:

 

You need to configure both {{s3.access-key}} and {{s3.secret-key}} in Flink's 
{{flink-conf.yaml}}:

{{s3.access-key: your-access-key s3.secret-key: your-secret-key }}++

If you are using a non-AWS S3 endpoint (such as [IBM's Cloud Object 
Storage|https://www.ibm.com/cloud/object-storage]), you can configure the S3 
endpoint in Flink's {{flink-conf.yaml}}:

{{s3.endpoint: your-endpoint-hostname }}

 

Source: https://github.com/apache/flink/blob/master/docs/ops/deployment/aws.md

 


> Add documentation for connecting to non-AWS S3 endpoints
> 
>
> Key: FLINK-8513
> URL: https://issues.apache.org/jira/browse/FLINK-8513
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: chris snow
>Priority: Trivial
>
> It would be useful if the documentation provided information on connecting to 
> non-AWS S3 endpoints when using presto.  For example:
>  
> 
> You need to configure both {{s3.access-key}} and {{s3.secret-key}} in Flink's 
> {{flink-conf.yaml}}:
> {code:java}
> s3.access-key: your-access-key 
> s3.secret-key: your-secret-key{code}
> If you are using a non-AWS S3 endpoint (such as [IBM's Cloud Object 
> Storage|https://www.ibm.com/cloud/object-storage]), you can configure the S3 
> endpoint in Flink's {{flink-conf.yaml}}:
> {code:java}
> s3.endpoint: your-endpoint-hostname{code}
> 
>  
> Source: 
> [https://github.com/apache/flink/blob/master/docs/ops/deployment/aws.md]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8513) Add documentation for connecting to non-AWS S3 endpoints

2018-01-25 Thread chris snow (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chris snow updated FLINK-8513:
--
Description: 
It would be useful if the documentation provided information on connecting to 
non-AWS S3 endpoints when using presto.  For example:

 

You need to configure both {{s3.access-key}} and {{s3.secret-key}} in Flink's 
{{flink-conf.yaml}}:

{{s3.access-key: your-access-key s3.secret-key: your-secret-key }}++

If you are using a non-AWS S3 endpoint (such as [IBM's Cloud Object 
Storage|https://www.ibm.com/cloud/object-storage]), you can configure the S3 
endpoint in Flink's {{flink-conf.yaml}}:

{{s3.endpoint: your-endpoint-hostname }}

 

Source: https://github.com/apache/flink/blob/master/docs/ops/deployment/aws.md

 

  was:It would be useful if the documentation provided information on 
connecting to non-AWS S3 endpoints when using presto.


> Add documentation for connecting to non-AWS S3 endpoints
> 
>
> Key: FLINK-8513
> URL: https://issues.apache.org/jira/browse/FLINK-8513
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: chris snow
>Priority: Trivial
>
> It would be useful if the documentation provided information on connecting to 
> non-AWS S3 endpoints when using presto.  For example:
>  
> 
> You need to configure both {{s3.access-key}} and {{s3.secret-key}} in Flink's 
> {{flink-conf.yaml}}:
> {{s3.access-key: your-access-key s3.secret-key: your-secret-key }}++
> If you are using a non-AWS S3 endpoint (such as [IBM's Cloud Object 
> Storage|https://www.ibm.com/cloud/object-storage]), you can configure the S3 
> endpoint in Flink's {{flink-conf.yaml}}:
> {{s3.endpoint: your-endpoint-hostname }}
> 
>  
> Source: https://github.com/apache/flink/blob/master/docs/ops/deployment/aws.md
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8513) Add documentation for connecting to non-AWS S3 endpoints

2018-01-25 Thread chris snow (JIRA)
chris snow created FLINK-8513:
-

 Summary: Add documentation for connecting to non-AWS S3 endpoints
 Key: FLINK-8513
 URL: https://issues.apache.org/jira/browse/FLINK-8513
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: chris snow


It would be useful if the documentation provided information on connecting to 
non-AWS S3 endpoints when using presto.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8506) fullRestarts Gauge not incremented when jobmanager got killed

2018-01-25 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339560#comment-16339560
 ] 

Steven Zhen Wu edited comment on FLINK-8506 at 1/25/18 6:45 PM:


Till, thanks for the explanation. Looks like we should clarify the doc, which 
says "since job submitted".

[https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html]

fullRestarts The total number of full restarts since this job was submitted (in 
milliseconds). Gauge

So it seems that we don't have any metric to capture jobmanager failover.


was (Author: stevenz3wu):
Till, thanks for the explanation. Looks like we should clarify the doc, which 
says "since job submitted".

[https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html]

fullRestarts The total number of full restarts since this job was submitted (in 
milliseconds). Gauge

So it seems that we don't any metric to capture jobmanager failover.

> fullRestarts Gauge not incremented when jobmanager got killed
> -
>
> Key: FLINK-8506
> URL: https://issues.apache.org/jira/browse/FLINK-8506
> Project: Flink
>  Issue Type: Bug
>Reporter: Steven Zhen Wu
>Priority: Major
>
> [~till.rohrmann] When jobmanager node got killed, it will cause job restart. 
> But in this case, we didn't see _fullRestarts_ guage got incremented. is this 
> expected or a bug?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphStore to D...

2018-01-25 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5310
  
Is it acceptable behavior that sometimes Graphs don't get deleted from disk?


---


[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339594#comment-16339594
 ] 

ASF GitHub Bot commented on FLINK-8453:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5310
  
Is it acceptable behavior that sometimes Graphs don't get deleted from disk?


> Add SerializableExecutionGraphStore to Dispatcher
> -
>
> Key: FLINK-8453
> URL: https://issues.apache.org/jira/browse/FLINK-8453
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it 
> can use to store completed jobs. This store can then be used to serve 
> historic job requests from the web UI, for example. The default 
> implementation should persist the jobs to disk and evict the in memory 
> instances once they grow to big in order to avoid memory leaks. Additionally, 
> the store should expire elements from disk after a user defined time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5311: [FLINK-8454] [flip6] Remove JobExecutionResultCach...

2018-01-25 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5311#discussion_r163923585
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -100,8 +97,6 @@
 
private final LeaderElectionService leaderElectionService;
 
-   private final JobExecutionResultCache jobExecutionResultCache = new 
JobExecutionResultCache();
--- End diff --

`JobExecutionResultCache` and `JobExecutionResultCacheTest`  should be also 
removed in this commit


---


[jira] [Commented] (FLINK-8454) Remove JobExecutionResultCache

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339591#comment-16339591
 ] 

ASF GitHub Bot commented on FLINK-8454:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5311#discussion_r163923585
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -100,8 +97,6 @@
 
private final LeaderElectionService leaderElectionService;
 
-   private final JobExecutionResultCache jobExecutionResultCache = new 
JobExecutionResultCache();
--- End diff --

`JobExecutionResultCache` and `JobExecutionResultCacheTest`  should be also 
removed in this commit


> Remove JobExecutionResultCache
> --
>
> Key: FLINK-8454
> URL: https://issues.apache.org/jira/browse/FLINK-8454
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> With the introduction of the {{SerializableExecutionGraphStore}} to the 
> {{Dispatcher}}, it is no longer necessary to store the {{JobResult}} in the 
> {{Dispatcher}}, because all information necessary to derive the {{JobResult}} 
> is contained in the {{SerializableExecutionGraphStore}}. In order to decrease 
> complexity, I propose to remove the {{JobExecutionResultCache}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339570#comment-16339570
 ] 

ASF GitHub Bot commented on FLINK-8453:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r163918048
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matchers;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link FileArchivedExecutionGraphStore}.
+ */
+@Category(Flip6.class)
+public class FileArchivedExecutionGraphStoreTest extends TestLogger {
+
+   private static final List GLOBALLY_TERMINAL_JOB_STATUS = new 
ArrayList<>(3);
--- End diff --

It looks like a constant, i.e., it shouldn't be mutable.
```
private static final List GLOBALLY_TERMINAL_JOB_STATUS = 
Collections.unmodifiableList(
Arrays.stream(JobStatus.values())
.filter(JobStatus::isGloballyTerminalState)
.collect(Collectors.toList()));
```
Using `@BeforeClass` is not idiomatic imo.



> Add SerializableExecutionGraphStore to Dispatcher
> -
>
> Key: FLINK-8453
> URL: https://issues.apache.org/jira/browse/FLINK-8453
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it 
> can use to store completed jobs. This store can then be used to serve 
> historic job requests from the web UI, for example. The default 
> implementation should persist the jobs to disk and evict the in memory 
> instances once they grow to big in order to avoid memory leaks. Additionally, 
> the store should expire elements from disk after a user defined time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

2018-01-25 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r163919241
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matchers;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link FileArchivedExecutionGraphStore}.
+ */
+@Category(Flip6.class)
+public class FileArchivedExecutionGraphStoreTest extends TestLogger {
+
+   private static final List GLOBALLY_TERMINAL_JOB_STATUS = new 
ArrayList<>(3);
+
+   private static final Random RANDOM = new Random();
--- End diff --

With `ThreadLocalRandom.current().nextInt(...)` you already have an 
available random instance which does not suffer from lock contention problems.


---


[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339572#comment-16339572
 ] 

ASF GitHub Bot commented on FLINK-8453:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r163919241
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matchers;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link FileArchivedExecutionGraphStore}.
+ */
+@Category(Flip6.class)
+public class FileArchivedExecutionGraphStoreTest extends TestLogger {
+
+   private static final List GLOBALLY_TERMINAL_JOB_STATUS = new 
ArrayList<>(3);
+
+   private static final Random RANDOM = new Random();
--- End diff --

With `ThreadLocalRandom.current().nextInt(...)` you already have an 
available random instance which does not suffer from lock contention problems.


> Add SerializableExecutionGraphStore to Dispatcher
> -
>
> Key: FLINK-8453
> URL: https://issues.apache.org/jira/browse/FLINK-8453
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it 
> can use to store completed jobs. This store can then be used to serve 
> historic job requests from the web UI, for example. The default 
> implementation should persist the jobs to disk and evict the in memory 
> instances once they grow to big in order to avoid memory leaks. Additionally, 
> the store should expire elements from disk after a user defined time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

2018-01-25 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r163918048
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matchers;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link FileArchivedExecutionGraphStore}.
+ */
+@Category(Flip6.class)
+public class FileArchivedExecutionGraphStoreTest extends TestLogger {
+
+   private static final List GLOBALLY_TERMINAL_JOB_STATUS = new 
ArrayList<>(3);
--- End diff --

It looks like a constant, i.e., it shouldn't be mutable.
```
private static final List GLOBALLY_TERMINAL_JOB_STATUS = 
Collections.unmodifiableList(
Arrays.stream(JobStatus.values())
.filter(JobStatus::isGloballyTerminalState)
.collect(Collectors.toList()));
```
Using `@BeforeClass` is not idiomatic imo.



---


[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339573#comment-16339573
 ] 

ASF GitHub Bot commented on FLINK-8453:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r163922121
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matchers;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link FileArchivedExecutionGraphStore}.
+ */
+@Category(Flip6.class)
+public class FileArchivedExecutionGraphStoreTest extends TestLogger {
+
+   private static final List GLOBALLY_TERMINAL_JOB_STATUS = new 
ArrayList<>(3);
+
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @BeforeClass
+   public static void setup() {
+   GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FINISHED);
+   GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FAILED);
+   GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.CANCELED);
+   }
+
+   /**
+* Tests that we can put {@link ArchivedExecutionGraph} into the
+* {@link FileArchivedExecutionGraphStore} and that the graph is 
persisted.
+*/
+   @Test
+   public void testPut() throws IOException {
+   final ArchivedExecutionGraph dummyExecutionGraph = new 
ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build();
+   final File rootDir = temporaryFolder.newFolder();
+
+   try (final FileArchivedExecutionGraphStore executionGraphStore 
= createDefaultExecutionGraphStore(rootDir)) {
+
+   final File storageDirectory = 
executionGraphStore.getStorageDir();
+
+   // check that the storage directory is empty
+   assertThat(storageDirectory.listFiles().length, 
Matchers.equalTo(0));
+
+   executionGraphStore.put(dummyExecutionGraph);
+
+   // check that we have persisted the given execution 
graph
+   assertThat(storageDirectory.listFiles().length, 
Matchers.equalTo(1));
+
+   
assertThat(executionGraphStore.get(dummyExecutionGraph.getJobID()), new 

[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339569#comment-16339569
 ] 

ASF GitHub Bot commented on FLINK-8453:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r163919921
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matchers;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link FileArchivedExecutionGraphStore}.
+ */
+@Category(Flip6.class)
+public class FileArchivedExecutionGraphStoreTest extends TestLogger {
+
+   private static final List GLOBALLY_TERMINAL_JOB_STATUS = new 
ArrayList<>(3);
+
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @BeforeClass
+   public static void setup() {
+   GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FINISHED);
+   GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FAILED);
+   GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.CANCELED);
+   }
+
+   /**
+* Tests that we can put {@link ArchivedExecutionGraph} into the
+* {@link FileArchivedExecutionGraphStore} and that the graph is 
persisted.
+*/
+   @Test
+   public void testPut() throws IOException {
+   final ArchivedExecutionGraph dummyExecutionGraph = new 
ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build();
+   final File rootDir = temporaryFolder.newFolder();
+
+   try (final FileArchivedExecutionGraphStore executionGraphStore 
= createDefaultExecutionGraphStore(rootDir)) {
+
+   final File storageDirectory = 
executionGraphStore.getStorageDir();
+
+   // check that the storage directory is empty
+   assertThat(storageDirectory.listFiles().length, 
Matchers.equalTo(0));
+
+   executionGraphStore.put(dummyExecutionGraph);
+
+   // check that we have persisted the given execution 
graph
+   assertThat(storageDirectory.listFiles().length, 
Matchers.equalTo(1));
+
+   
assertThat(executionGraphStore.get(dummyExecutionGraph.getJobID()), new 

[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339571#comment-16339571
 ] 

ASF GitHub Bot commented on FLINK-8453:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r163915273
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.BlobUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Store for {@link ArchivedExecutionGraph}. The store writes the archived 
execution graph to disk
+ * and keeps the most recently used execution graphs in a memory cache for 
faster serving. Moreover,
+ * the stored execution graphs are periodically cleaned up.
+ */
+public class FileArchivedExecutionGraphStore implements 
ArchivedExecutionGraphStore {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileArchivedExecutionGraphStore.class);
+
+   private final File storageDir;
+
+   private final Cache jobDetailsCache;
+
+   private final LoadingCache 
archivedExecutionGraphCache;
+
+   private final ScheduledFuture cleanupFuture;
+
+   private final Thread shutdownHook;
+
+   private int numFinishedJobs;
+
+   private int numFailedJobs;
+
+   private int numCanceledJobs;
+
+   public FileArchivedExecutionGraphStore(
+   File rootDir,
+   Time expirationTime,
+   long maximumCacheSizeBytes,
+   ScheduledExecutor scheduledExecutor) throws IOException 
{
+
+   final File storageDirectory = 
initExecutionGraphStorageDirectory(rootDir);
+
+   LOG.info(
+   "Initializing {}: Storage directory {}, expiration time 
{}, maximum cache size {} bytes.",
+   FileArchivedExecutionGraphStore.class.getSimpleName(),
+   storageDirectory,
+   expirationTime.toMilliseconds(),
+   maximumCacheSizeBytes);
+
+   this.storageDir = Preconditions.checkNotNull(storageDirectory);
+   Preconditions.checkArgument(
+

[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339567#comment-16339567
 ] 

ASF GitHub Bot commented on FLINK-8453:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r163921620
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java
 ---
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Simple {@link ScheduledExecutor} implementation for testing purposes.
+ */
+public class ManuallyTriggeredScheduledExecutor extends 
ManuallyTriggeredDirectExecutor implements ScheduledExecutor {
+
+   private final ConcurrentLinkedQueue scheduledTasks = 
new ConcurrentLinkedQueue<>();
+
+   @Override
+   public ScheduledFuture schedule(Runnable command, long delay, 
TimeUnit unit) {
+   return insertRunnable(command, false);
+   }
+
+   @Override
+   public  ScheduledFuture schedule(Callable callable, long 
delay, TimeUnit unit) {
+   final ScheduledTask scheduledTask = new 
ScheduledTask<>(callable, false);
+
+   scheduledTasks.offer(scheduledTask);
+
+   return scheduledTask;
+   }
+
+   @Override
+   public ScheduledFuture scheduleAtFixedRate(Runnable command, long 
initialDelay, long period, TimeUnit unit) {
+   return insertRunnable(command, true);
+   }
+
+   @Override
+   public ScheduledFuture scheduleWithFixedDelay(Runnable command, long 
initialDelay, long delay, TimeUnit unit) {
+   return insertRunnable(command, true);
+   }
+
+   /**
+* Triggers all registered tasks.
+*/
+   public void triggerScheduledTasks() {
+   final Iterator iterator = 
scheduledTasks.iterator();
+
+   while (iterator.hasNext()) {
+   final ScheduledTask scheduledTask = iterator.next();
+
+   scheduledTask.execute();
+
+   if (!scheduledTask.isPeriodic) {
+   iterator.remove();
+   }
+   }
+   }
+
+   private ScheduledFuture insertRunnable(Runnable command, boolean 
isPeriodic) {
+   final ScheduledTask scheduledTask = new ScheduledTask<>(
+   () -> {
+   command.run();
+   return null;
+   },
+   isPeriodic);
+
+   scheduledTasks.offer(scheduledTask);
+
+   return scheduledTask;
+   }
+
+   private static final class ScheduledTask implements 
ScheduledFuture {
+
+   private final Callable callable;
+
+   private final boolean isPeriodic;
+
+   private final CompletableFuture result;
+
+   private ScheduledTask(Callable callable, boolean isPeriodic) 
{
+   this.callable = Preconditions.checkNotNull(callable);
+   this.isPeriodic = isPeriodic;
+
+   this.result = new CompletableFuture<>();
+   }
+
+   public boolean isPeriodic() {
--- End diff --

nit: method is unused


> Add 

[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

2018-01-25 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r163915273
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.BlobUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Store for {@link ArchivedExecutionGraph}. The store writes the archived 
execution graph to disk
+ * and keeps the most recently used execution graphs in a memory cache for 
faster serving. Moreover,
+ * the stored execution graphs are periodically cleaned up.
+ */
+public class FileArchivedExecutionGraphStore implements 
ArchivedExecutionGraphStore {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileArchivedExecutionGraphStore.class);
+
+   private final File storageDir;
+
+   private final Cache jobDetailsCache;
+
+   private final LoadingCache 
archivedExecutionGraphCache;
+
+   private final ScheduledFuture cleanupFuture;
+
+   private final Thread shutdownHook;
+
+   private int numFinishedJobs;
+
+   private int numFailedJobs;
+
+   private int numCanceledJobs;
+
+   public FileArchivedExecutionGraphStore(
+   File rootDir,
+   Time expirationTime,
+   long maximumCacheSizeBytes,
+   ScheduledExecutor scheduledExecutor) throws IOException 
{
+
+   final File storageDirectory = 
initExecutionGraphStorageDirectory(rootDir);
+
+   LOG.info(
+   "Initializing {}: Storage directory {}, expiration time 
{}, maximum cache size {} bytes.",
+   FileArchivedExecutionGraphStore.class.getSimpleName(),
+   storageDirectory,
+   expirationTime.toMilliseconds(),
+   maximumCacheSizeBytes);
+
+   this.storageDir = Preconditions.checkNotNull(storageDirectory);
+   Preconditions.checkArgument(
+   storageDirectory.exists() && 
storageDirectory.isDirectory(),
+   "The storage directory must exist and be a directory.");
+   this.jobDetailsCache = CacheBuilder.newBuilder()
+   

[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

2018-01-25 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r163911570
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -486,11 +510,22 @@ private void onFatalError(Throwable throwable) {
fatalErrorHandler.onFatalError(throwable);
}
 
-   private void jobReachedGloballyTerminalState(AccessExecutionGraph 
accessExecutionGraph) {
-   final JobResult jobResult = 
JobResult.createFrom(accessExecutionGraph);
+   private void jobReachedGloballyTerminalState(ArchivedExecutionGraph 
archivedExecutionGraph) {
+   
Preconditions.checkArgument(archivedExecutionGraph.getState().isGloballyTerminalState(),
 "");
--- End diff --

The `errorMessage` is an empty string. Leave it out completely or put 
something meaningful. 


---


[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339568#comment-16339568
 ] 

ASF GitHub Bot commented on FLINK-8453:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r163911570
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -486,11 +510,22 @@ private void onFatalError(Throwable throwable) {
fatalErrorHandler.onFatalError(throwable);
}
 
-   private void jobReachedGloballyTerminalState(AccessExecutionGraph 
accessExecutionGraph) {
-   final JobResult jobResult = 
JobResult.createFrom(accessExecutionGraph);
+   private void jobReachedGloballyTerminalState(ArchivedExecutionGraph 
archivedExecutionGraph) {
+   
Preconditions.checkArgument(archivedExecutionGraph.getState().isGloballyTerminalState(),
 "");
--- End diff --

The `errorMessage` is an empty string. Leave it out completely or put 
something meaningful. 


> Add SerializableExecutionGraphStore to Dispatcher
> -
>
> Key: FLINK-8453
> URL: https://issues.apache.org/jira/browse/FLINK-8453
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it 
> can use to store completed jobs. This store can then be used to serve 
> historic job requests from the web UI, for example. The default 
> implementation should persist the jobs to disk and evict the in memory 
> instances once they grow to big in order to avoid memory leaks. Additionally, 
> the store should expire elements from disk after a user defined time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

2018-01-25 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r163921620
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java
 ---
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Simple {@link ScheduledExecutor} implementation for testing purposes.
+ */
+public class ManuallyTriggeredScheduledExecutor extends 
ManuallyTriggeredDirectExecutor implements ScheduledExecutor {
+
+   private final ConcurrentLinkedQueue scheduledTasks = 
new ConcurrentLinkedQueue<>();
+
+   @Override
+   public ScheduledFuture schedule(Runnable command, long delay, 
TimeUnit unit) {
+   return insertRunnable(command, false);
+   }
+
+   @Override
+   public  ScheduledFuture schedule(Callable callable, long 
delay, TimeUnit unit) {
+   final ScheduledTask scheduledTask = new 
ScheduledTask<>(callable, false);
+
+   scheduledTasks.offer(scheduledTask);
+
+   return scheduledTask;
+   }
+
+   @Override
+   public ScheduledFuture scheduleAtFixedRate(Runnable command, long 
initialDelay, long period, TimeUnit unit) {
+   return insertRunnable(command, true);
+   }
+
+   @Override
+   public ScheduledFuture scheduleWithFixedDelay(Runnable command, long 
initialDelay, long delay, TimeUnit unit) {
+   return insertRunnable(command, true);
+   }
+
+   /**
+* Triggers all registered tasks.
+*/
+   public void triggerScheduledTasks() {
+   final Iterator iterator = 
scheduledTasks.iterator();
+
+   while (iterator.hasNext()) {
+   final ScheduledTask scheduledTask = iterator.next();
+
+   scheduledTask.execute();
+
+   if (!scheduledTask.isPeriodic) {
+   iterator.remove();
+   }
+   }
+   }
+
+   private ScheduledFuture insertRunnable(Runnable command, boolean 
isPeriodic) {
+   final ScheduledTask scheduledTask = new ScheduledTask<>(
+   () -> {
+   command.run();
+   return null;
+   },
+   isPeriodic);
+
+   scheduledTasks.offer(scheduledTask);
+
+   return scheduledTask;
+   }
+
+   private static final class ScheduledTask implements 
ScheduledFuture {
+
+   private final Callable callable;
+
+   private final boolean isPeriodic;
+
+   private final CompletableFuture result;
+
+   private ScheduledTask(Callable callable, boolean isPeriodic) 
{
+   this.callable = Preconditions.checkNotNull(callable);
+   this.isPeriodic = isPeriodic;
+
+   this.result = new CompletableFuture<>();
+   }
+
+   public boolean isPeriodic() {
--- End diff --

nit: method is unused


---


[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

2018-01-25 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r163922121
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matchers;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link FileArchivedExecutionGraphStore}.
+ */
+@Category(Flip6.class)
+public class FileArchivedExecutionGraphStoreTest extends TestLogger {
+
+   private static final List GLOBALLY_TERMINAL_JOB_STATUS = new 
ArrayList<>(3);
+
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @BeforeClass
+   public static void setup() {
+   GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FINISHED);
+   GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FAILED);
+   GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.CANCELED);
+   }
+
+   /**
+* Tests that we can put {@link ArchivedExecutionGraph} into the
+* {@link FileArchivedExecutionGraphStore} and that the graph is 
persisted.
+*/
+   @Test
+   public void testPut() throws IOException {
+   final ArchivedExecutionGraph dummyExecutionGraph = new 
ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build();
+   final File rootDir = temporaryFolder.newFolder();
+
+   try (final FileArchivedExecutionGraphStore executionGraphStore 
= createDefaultExecutionGraphStore(rootDir)) {
+
+   final File storageDirectory = 
executionGraphStore.getStorageDir();
+
+   // check that the storage directory is empty
+   assertThat(storageDirectory.listFiles().length, 
Matchers.equalTo(0));
+
+   executionGraphStore.put(dummyExecutionGraph);
+
+   // check that we have persisted the given execution 
graph
+   assertThat(storageDirectory.listFiles().length, 
Matchers.equalTo(1));
+
+   
assertThat(executionGraphStore.get(dummyExecutionGraph.getJobID()), new 
PartialArchivedExecutionGraphMatcher(dummyExecutionGraph));
+   }
+   }
+
+   /**
+* Tests that null is returned if we request an unknown JobID.
+*/
+   @Test
+   public void testUnknownGet() throws IOException {
 

[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

2018-01-25 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r163919921
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matchers;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link FileArchivedExecutionGraphStore}.
+ */
+@Category(Flip6.class)
+public class FileArchivedExecutionGraphStoreTest extends TestLogger {
+
+   private static final List GLOBALLY_TERMINAL_JOB_STATUS = new 
ArrayList<>(3);
+
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @BeforeClass
+   public static void setup() {
+   GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FINISHED);
+   GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FAILED);
+   GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.CANCELED);
+   }
+
+   /**
+* Tests that we can put {@link ArchivedExecutionGraph} into the
+* {@link FileArchivedExecutionGraphStore} and that the graph is 
persisted.
+*/
+   @Test
+   public void testPut() throws IOException {
+   final ArchivedExecutionGraph dummyExecutionGraph = new 
ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build();
+   final File rootDir = temporaryFolder.newFolder();
+
+   try (final FileArchivedExecutionGraphStore executionGraphStore 
= createDefaultExecutionGraphStore(rootDir)) {
+
+   final File storageDirectory = 
executionGraphStore.getStorageDir();
+
+   // check that the storage directory is empty
+   assertThat(storageDirectory.listFiles().length, 
Matchers.equalTo(0));
+
+   executionGraphStore.put(dummyExecutionGraph);
+
+   // check that we have persisted the given execution 
graph
+   assertThat(storageDirectory.listFiles().length, 
Matchers.equalTo(1));
+
+   
assertThat(executionGraphStore.get(dummyExecutionGraph.getJobID()), new 
PartialArchivedExecutionGraphMatcher(dummyExecutionGraph));
--- End diff --

It is not obvious what the matcher is doing. How about:
`assertThat(...), isPredicateFulfilled(..))`

```
private static Matcher 

[jira] [Commented] (FLINK-8506) fullRestarts Gauge not incremented when jobmanager got killed

2018-01-25 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339560#comment-16339560
 ] 

Steven Zhen Wu commented on FLINK-8506:
---

Till, thanks for the explanation. Looks like we should clarify the doc, which 
says "since job submitted".

[https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html]

fullRestarts The total number of full restarts since this job was submitted (in 
milliseconds). Gauge

So it seems that we don't any metric to capture jobmanager failover.

> fullRestarts Gauge not incremented when jobmanager got killed
> -
>
> Key: FLINK-8506
> URL: https://issues.apache.org/jira/browse/FLINK-8506
> Project: Flink
>  Issue Type: Bug
>Reporter: Steven Zhen Wu
>Priority: Major
>
> [~till.rohrmann] When jobmanager node got killed, it will cause job restart. 
> But in this case, we didn't see _fullRestarts_ guage got incremented. is this 
> expected or a bug?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5364: Flink 8472 1.4

2018-01-25 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/5364

Flink 8472 1.4

## What is the purpose of the change

Extend all migration tests, to include verifying restore from Flink 1.4 
savepoints.

This includes extending:
- `WindowOperatorMigrationTest`
- `CEPMigrationTest`
- `StatefulJobSavepointMigrationTestITCase` (Scala API migration)
- `StatefulJobSavepointMigrationTestITCase` (Java API migration)
- `FlinkKinesisConsumerMigrationTest`
- `FlinkKafkaConsumerBaseMigrationTest`
- `ContinuousFileProcessingMigrationTest`
- `BucketingSinkMigrationTest`

This PR should also be forward-ported to the `master` branch to cover Flink 
1.5.

## Brief change log

All commits except 1ce3e6c are simply adding `MigrationVersion.v1_4` to the 
test parameters and adding the corresponding test savepoint files.

1ce3e6c is a refactor of `StatefulJobSavepointMigrationFrom12ITCase` and 
`StatefulJobSavepointMigrationFrom13ITCase` to a single 
`StatefulJobSavepointMigrationITCase` class.

## Verifying this change

This is a test refactor / extension, so all existing tests should cover the 
changes.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-8472-1.4

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5364.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5364


commit 28a1c4d403cc978d60af765f1e67bfb4bedd93c4
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-25T15:08:53Z

[FLINK-8472] [DataStream, test] Extend WindowOperatorMigrationTest for 
Flink 1.4

commit 803c641b6c4e03431a960826f5a531423db3d5ef
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-25T15:13:14Z

[FLINK-8472] [cep, test] Extend CEPMigrationTest for Flink 1.4

commit 6f6505232c7294a0c30e7cef9fbf2f9218e478b1
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-25T15:20:31Z

[FLINK-8472] [scala, test] Extend StatefulJobSavepointMigrationITCase for 
Flink 1.4

commit 1ce3e6cd331d8791d28474d9838070005d4e37a3
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-25T17:25:45Z

[FLINK-8472] [DataStream, test] Refactor 
StatefulJobSavepointFrom*MigrationITCase to single ITCase

This commit refactors the StatefulJobSavepointFrom12MigrationITCase and
StatefulJobSavepointFrom13MigrationITCase to a single class,
StatefulJobSavepointMigrationITCase. The new ITCase is parameterized to
ensure that all previous versions and state backend variants are
covered.

commit 696b5b3f3927f63c1c0e7e550db14427dbe7a0cb
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-25T17:37:05Z

[FLINK-8472] [DataStream, test] Extend StatefulJobSavepointMigrationITCase 
for Flink 1.4

commit 1d308604bf832c856b8b6f3d1a33d189ddab80e8
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-25T15:23:17Z

[FLINK-8472] [kinesis, test] Extend FlinkKinesisConsumerMigrationTest for 
Flink 1.4

commit 2de2943ffed3f1ab8928e3d9be96c5c6d7dd4f96
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-25T16:23:22Z

[FLINK-8472] [kafka, test] Extend FlinkKafkaConsumerBaseMigrationTest for 
Flink 1.4

commit bfee6ce9f842e7e2785ffb664001b42e2f7e5a44
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-25T16:28:17Z

[FLINK-8472] [fs, test] Extend ContinuousFileProcessingMigrationTest for 
Flink 1.4

commit 141236219b0afd4093ae891398564f39539875e0
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-25T16:32:28Z

[FLINK-8472] [fs, test] Extend BucketingSinkMigrationTest for Flink 1.4

commit ceb00c7b210dca4f4c2e2bd5f2f399d5f0880934
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-25T16:45:47Z

[hotfix] [test] Remove stale savepoint files no longer used by migration 
tests

This includes:
- Removing MigrationVersion.v1_1, since compatilbity for 1.1 is no
  longer supported 

[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339532#comment-16339532
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5240
  
Thanks for the feedback @fhueske. I hope I could address most of it. I 
think we should merge this PR (if you agree) and add more PRs for this issue as 
the next steps. I suggest the following subtasks:

- Add validation for the CSV format
- Add full CsvTableSourceFactory support (incl. proctime, rowtime, and 
schema mapping)
- Add a JSON schema parser to the JSON and logic for creating a table 
source from it
- Add validation for the JSON format
- Add validation for the Rowtime descriptor
- Add validation for StreamTableDescriptor
- Add validation for BatchTableDescriptor
- Add KafkaTableSource factories 

What do you think?


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5240: [FLINK-8240] [table] Create unified interfaces to configu...

2018-01-25 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5240
  
Thanks for the feedback @fhueske. I hope I could address most of it. I 
think we should merge this PR (if you agree) and add more PRs for this issue as 
the next steps. I suggest the following subtasks:

- Add validation for the CSV format
- Add full CsvTableSourceFactory support (incl. proctime, rowtime, and 
schema mapping)
- Add a JSON schema parser to the JSON and logic for creating a table 
source from it
- Add validation for the JSON format
- Add validation for the Rowtime descriptor
- Add validation for StreamTableDescriptor
- Add validation for BatchTableDescriptor
- Add KafkaTableSource factories 

What do you think?


---


[jira] [Commented] (FLINK-8450) Make JobMaster/DispatcherGateway#requestJob type safe

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339509#comment-16339509
 ] 

ASF GitHub Bot commented on FLINK-8450:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5309
  
Thanks for the review @GJL. I've addressed your comments in cf9d24d and 
rebased onto the latest master.


> Make JobMaster/DispatcherGateway#requestJob type safe
> -
>
> Key: FLINK-8450
> URL: https://issues.apache.org/jira/browse/FLINK-8450
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Currently, the {{RestfulGateway#requestJob}} returns a 
> {{CompletableFuture}}. Since {{AccessExecutionGraph}} 
> is non serializable it could fail if we execute this RPC from a remote 
> system. In order to make it typesafe we should change its signature to 
> {{SerializableExecutionGraph}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5309: [FLINK-8450] [flip6] Make JobMaster/DispatcherGateway#req...

2018-01-25 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5309
  
Thanks for the review @GJL. I've addressed your comments in cf9d24d and 
rebased onto the latest master.


---


[jira] [Commented] (FLINK-8450) Make JobMaster/DispatcherGateway#requestJob type safe

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339507#comment-16339507
 ] 

ASF GitHub Bot commented on FLINK-8450:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5309#discussion_r163905384
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
 ---
@@ -228,10 +246,10 @@ public void testConcurrentAccess() throws Exception {
Collection allExecutionGraphs = 
allExecutionGraphFutures.get();
--- End diff --

True, thanks for the info. Didn't know `ExecutorCompletionService` before.


> Make JobMaster/DispatcherGateway#requestJob type safe
> -
>
> Key: FLINK-8450
> URL: https://issues.apache.org/jira/browse/FLINK-8450
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Currently, the {{RestfulGateway#requestJob}} returns a 
> {{CompletableFuture}}. Since {{AccessExecutionGraph}} 
> is non serializable it could fail if we execute this RPC from a remote 
> system. In order to make it typesafe we should change its signature to 
> {{SerializableExecutionGraph}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5309: [FLINK-8450] [flip6] Make JobMaster/DispatcherGate...

2018-01-25 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5309#discussion_r163905384
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
 ---
@@ -228,10 +246,10 @@ public void testConcurrentAccess() throws Exception {
Collection allExecutionGraphs = 
allExecutionGraphFutures.get();
--- End diff --

True, thanks for the info. Didn't know `ExecutorCompletionService` before.


---


[jira] [Commented] (FLINK-8450) Make JobMaster/DispatcherGateway#requestJob type safe

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339504#comment-16339504
 ] 

ASF GitHub Bot commented on FLINK-8450:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5309#discussion_r163904466
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
 ---
@@ -199,12 +212,17 @@ public void testCacheEntryCleanup() throws Exception {
public void testConcurrentAccess() throws Exception {
final Time timeout = Time.milliseconds(100L);
final Time timeToLive = Time.hours(1L);
-   final JobID jobId = new JobID();
-
-   final AccessExecutionGraph accessExecutionGraph = 
mock(AccessExecutionGraph.class);
 
-   final JobManagerGateway jobManagerGateway = 
mock(JobManagerGateway.class);
-   when(jobManagerGateway.requestJob(eq(jobId), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph));
+   final AtomicInteger requestJobCalls = new AtomicInteger(0);
--- End diff --

True, will change it.


> Make JobMaster/DispatcherGateway#requestJob type safe
> -
>
> Key: FLINK-8450
> URL: https://issues.apache.org/jira/browse/FLINK-8450
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Currently, the {{RestfulGateway#requestJob}} returns a 
> {{CompletableFuture}}. Since {{AccessExecutionGraph}} 
> is non serializable it could fail if we execute this RPC from a remote 
> system. In order to make it typesafe we should change its signature to 
> {{SerializableExecutionGraph}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5309: [FLINK-8450] [flip6] Make JobMaster/DispatcherGate...

2018-01-25 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5309#discussion_r163904466
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
 ---
@@ -199,12 +212,17 @@ public void testCacheEntryCleanup() throws Exception {
public void testConcurrentAccess() throws Exception {
final Time timeout = Time.milliseconds(100L);
final Time timeToLive = Time.hours(1L);
-   final JobID jobId = new JobID();
-
-   final AccessExecutionGraph accessExecutionGraph = 
mock(AccessExecutionGraph.class);
 
-   final JobManagerGateway jobManagerGateway = 
mock(JobManagerGateway.class);
-   when(jobManagerGateway.requestJob(eq(jobId), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph));
+   final AtomicInteger requestJobCalls = new AtomicInteger(0);
--- End diff --

True, will change it.


---


[jira] [Commented] (FLINK-8450) Make JobMaster/DispatcherGateway#requestJob type safe

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339499#comment-16339499
 ] 

ASF GitHub Bot commented on FLINK-8450:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5309#discussion_r163902431
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
 ---
@@ -30,62 +30,69 @@
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
+import org.hamcrest.Matchers;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests for the {@link ExecutionGraphCache}.
  */
 public class ExecutionGraphCacheTest extends TestLogger {
 
+   private static ArchivedExecutionGraph expectedExecutionGraph;
+   private static final JobID expectedJobId = new JobID();
+
+   @BeforeClass
+   public static void setup() {
+   expectedExecutionGraph = new 
ArchivedExecutionGraphBuilder().build();
+   }
+
/**
 * Tests that we can cache AccessExecutionGraphs over multiple accesses.
 */
@Test
public void testExecutionGraphCaching() throws Exception {
final Time timeout = Time.milliseconds(100L);
final Time timeToLive = Time.hours(1L);
-   final JobID jobId = new JobID();
-   final AccessExecutionGraph accessExecutionGraph = 
mock(AccessExecutionGraph.class);
 
-   final JobManagerGateway jobManagerGateway = 
mock(JobManagerGateway.class);
-   when(jobManagerGateway.requestJob(eq(jobId), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph));
+   final CountingRestfulGateway restfulGateway = 
createCountingRestfulGateway(expectedJobId, 
CompletableFuture.completedFuture(expectedExecutionGraph));
 
try (ExecutionGraphCache executionGraphCache = new 
ExecutionGraphCache(timeout, timeToLive)) {
-   CompletableFuture 
accessExecutionGraphFuture = executionGraphCache.getExecutionGraph(jobId, 
jobManagerGateway);
+   CompletableFuture 
accessExecutionGraphFuture = 
executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
 
-   assertEquals(accessExecutionGraph, 
accessExecutionGraphFuture.get());
+   assertEquals(expectedExecutionGraph, 
accessExecutionGraphFuture.get());
 
-   CompletableFuture 
accessExecutionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, 
jobManagerGateway);
+   CompletableFuture 
accessExecutionGraphFuture2 = 
executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
 
-   assertEquals(accessExecutionGraph, 
accessExecutionGraphFuture2.get());
+   assertEquals(expectedExecutionGraph, 
accessExecutionGraphFuture2.get());
--- End diff --

True, will remove it.


> Make JobMaster/DispatcherGateway#requestJob type safe
> -
>
> Key: FLINK-8450
> URL: 

  1   2   3   >