[jira] [Commented] (FLINK-6448) Web UI TaskManager view: Rename 'Free Memory' to 'JVM Heap'

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16000299#comment-16000299
 ] 

ASF GitHub Bot commented on FLINK-6448:
---

Github user yanghua closed the pull request at:

https://github.com/apache/flink/pull/3840


> Web UI TaskManager view: Rename 'Free Memory' to 'JVM Heap'
> ---
>
> Key: FLINK-6448
> URL: https://issues.apache.org/jira/browse/FLINK-6448
> Project: Flink
>  Issue Type: Sub-task
>  Components: Webfrontend
>Reporter: Stephan Ewen
>  Labels: easyfix, starter
>
> In the TaskManager view, the laben 'Free Memory' is wrong / misleading and 
> should be 'JVM Heap Size' instead.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-6448) Web UI TaskManager view: Rename 'Free Memory' to 'JVM Heap'

2017-05-07 Thread yanghua (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yanghua reassigned FLINK-6448:
--

Assignee: (was: yanghua)

> Web UI TaskManager view: Rename 'Free Memory' to 'JVM Heap'
> ---
>
> Key: FLINK-6448
> URL: https://issues.apache.org/jira/browse/FLINK-6448
> Project: Flink
>  Issue Type: Sub-task
>  Components: Webfrontend
>Reporter: Stephan Ewen
>  Labels: easyfix, starter
>
> In the TaskManager view, the laben 'Free Memory' is wrong / misleading and 
> should be 'JVM Heap Size' instead.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3840: [FLINK-6448] Web UI TaskManager view: Rename 'Free...

2017-05-07 Thread yanghua
Github user yanghua closed the pull request at:

https://github.com/apache/flink/pull/3840


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-6451) Web UI: Rename 'Metrics' view to 'Task Metrics'

2017-05-07 Thread yanghua (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yanghua reassigned FLINK-6451:
--

Assignee: (was: yanghua)

> Web UI: Rename 'Metrics' view to 'Task Metrics'
> ---
>
> Key: FLINK-6451
> URL: https://issues.apache.org/jira/browse/FLINK-6451
> Project: Flink
>  Issue Type: Sub-task
>  Components: Webfrontend
>Reporter: Stephan Ewen
>  Labels: easyfix, starter
>
> In the UI, under the {{Overview}} of a specific job, the tab {{Metrics}} 
> shows metrics for tasks only, and not all available metrics.
> We should rename that to {{Task Metrics}}. That also differentiates the view 
> clearly from the job-level metrics view proposed in FLINK-6449



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6448) Web UI TaskManager view: Rename 'Free Memory' to 'JVM Heap'

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16000282#comment-16000282
 ] 

ASF GitHub Bot commented on FLINK-6448:
---

GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/3840

[FLINK-6448] Web UI TaskManager view: Rename 'Free Memory' to 'JVM Heap'

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-6448

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3840.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3840


commit d0ef1efdeb1a83867f0763c05e4e2baf41f8faf8
Author: vinoyang 
Date:   2017-05-03T14:37:31Z

Merge pull request #1 from apache/master

update from master

commit f59cc12e63f0cf1379c8ae4f61c8e65c12d025e9
Author: vinoyang 
Date:   2017-05-07T07:31:14Z

[FLINK-6448] Web UI TaskManager view: Rename 'Free Memory' to 'JVM Heap'

commit 4ac392450c84ba12dd6fbabd995a3fe5aefde462
Author: vinoyang 
Date:   2017-05-07T07:44:58Z

[FLINK-6451] Web UI: Rename 'Metrics' view to 'Task Metrics'

commit 9c210740d36db715270693314107b273f8c44ccf
Author: vinoyang 
Date:   2017-05-07T09:33:29Z

Revert "[FLINK-6451] Web UI: Rename 'Metrics' view to 'Task Metrics'"

This reverts commit 4ac392450c84ba12dd6fbabd995a3fe5aefde462.

commit 73785fe348d252781fa4acba7bef57b50e55f54f
Author: vinoyang 
Date:   2017-05-07T13:18:21Z

[FLINK-6451] Web UI: Rename 'Metrics' view to 'Task Metrics'

commit 605c60a362dc44a851581ca0b3780bebd89ae7c5
Author: vinoyang 
Date:   2017-05-08T05:13:38Z

Revert "[FLINK-6448] Web UI TaskManager view: Rename 'Free Memory' to 'JVM 
Heap'"

This reverts commit f59cc12e63f0cf1379c8ae4f61c8e65c12d025e9.

commit 8aeaf7d9ea2ae516b3d4519eddbcbdb90dd9b3ce
Author: vinoyang 
Date:   2017-05-08T05:14:25Z

Revert "[FLINK-6451] Web UI: Rename 'Metrics' view to 'Task Metrics'"

This reverts commit 4ac392450c84ba12dd6fbabd995a3fe5aefde462.

commit 37ce1d257ac2911c7fa9910369d247b42a0044ba
Author: vinoyang 
Date:   2017-05-08T05:19:35Z

[FLINK-6448] Web UI TaskManager view: Rename 'Free Memory' to 'JVM Heap'




> Web UI TaskManager view: Rename 'Free Memory' to 'JVM Heap'
> ---
>
> Key: FLINK-6448
> URL: https://issues.apache.org/jira/browse/FLINK-6448
> Project: Flink
>  Issue Type: Sub-task
>  Components: Webfrontend
>Reporter: Stephan Ewen
>Assignee: yanghua
>  Labels: easyfix, starter
>
> In the TaskManager view, the laben 'Free Memory' is wrong / misleading and 
> should be 'JVM Heap Size' instead.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3840: [FLINK-6448] Web UI TaskManager view: Rename 'Free...

2017-05-07 Thread yanghua
GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/3840

[FLINK-6448] Web UI TaskManager view: Rename 'Free Memory' to 'JVM Heap'

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-6448

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3840.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3840


commit d0ef1efdeb1a83867f0763c05e4e2baf41f8faf8
Author: vinoyang 
Date:   2017-05-03T14:37:31Z

Merge pull request #1 from apache/master

update from master

commit f59cc12e63f0cf1379c8ae4f61c8e65c12d025e9
Author: vinoyang 
Date:   2017-05-07T07:31:14Z

[FLINK-6448] Web UI TaskManager view: Rename 'Free Memory' to 'JVM Heap'

commit 4ac392450c84ba12dd6fbabd995a3fe5aefde462
Author: vinoyang 
Date:   2017-05-07T07:44:58Z

[FLINK-6451] Web UI: Rename 'Metrics' view to 'Task Metrics'

commit 9c210740d36db715270693314107b273f8c44ccf
Author: vinoyang 
Date:   2017-05-07T09:33:29Z

Revert "[FLINK-6451] Web UI: Rename 'Metrics' view to 'Task Metrics'"

This reverts commit 4ac392450c84ba12dd6fbabd995a3fe5aefde462.

commit 73785fe348d252781fa4acba7bef57b50e55f54f
Author: vinoyang 
Date:   2017-05-07T13:18:21Z

[FLINK-6451] Web UI: Rename 'Metrics' view to 'Task Metrics'

commit 605c60a362dc44a851581ca0b3780bebd89ae7c5
Author: vinoyang 
Date:   2017-05-08T05:13:38Z

Revert "[FLINK-6448] Web UI TaskManager view: Rename 'Free Memory' to 'JVM 
Heap'"

This reverts commit f59cc12e63f0cf1379c8ae4f61c8e65c12d025e9.

commit 8aeaf7d9ea2ae516b3d4519eddbcbdb90dd9b3ce
Author: vinoyang 
Date:   2017-05-08T05:14:25Z

Revert "[FLINK-6451] Web UI: Rename 'Metrics' view to 'Task Metrics'"

This reverts commit 4ac392450c84ba12dd6fbabd995a3fe5aefde462.

commit 37ce1d257ac2911c7fa9910369d247b42a0044ba
Author: vinoyang 
Date:   2017-05-08T05:19:35Z

[FLINK-6448] Web UI TaskManager view: Rename 'Free Memory' to 'JVM Heap'




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6478) Add documentation on how to upgrade serializers for managed state

2017-05-07 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-6478:
--

 Summary: Add documentation on how to upgrade serializers for 
managed state
 Key: FLINK-6478
 URL: https://issues.apache.org/jira/browse/FLINK-6478
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Tzu-Li (Gordon) Tai
Priority: Critical
 Fix For: 1.3.0


There needs to be a documentation that explains how to use the new serializer 
upgrade APIs in {{TypeSerializer}}, and how the methods work with checkpoints. 
This documentation should probably be placed under "Application development --> 
Streaming --> Working with State".

Ideally, it should also come with a minimal example for users that perhaps use 
serialization frameworks that already have built-in backwards compatibility 
(such as Thrift).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6397) MultipleProgramsTestBase does not reset ContextEnvironment

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16000236#comment-16000236
 ] 

ASF GitHub Bot commented on FLINK-6397:
---

Github user ifndef-SleePy commented on a diff in the pull request:

https://github.com/apache/flink/pull/3810#discussion_r115170542
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
 ---
@@ -80,29 +80,36 @@

protected final TestExecutionMode mode;
 
-   
+   private TestEnvironment testEnvironment;
+
+   private CollectionTestEnvironment collectionTestEnvironment;
+
public MultipleProgramsTestBase(TestExecutionMode mode) {
this.mode = mode;
-   
+
switch(mode){
case CLUSTER:
-   new TestEnvironment(cluster, 4).setAsContext();
+   testEnvironment = new TestEnvironment(cluster, 
4);
--- End diff --

Thank you for responding. Last my comment is not correct. Actually what I 
want to say is that we can not move these code to \@BeforeClass and 
\@AfterClass. But we can move them to \@Before and \@After, however it will 
bring a little overhead since \@Before and \@After will be called for each test 
method. I pushed a new commit using \@Before and \@After.


> MultipleProgramsTestBase does not reset ContextEnvironment
> --
>
> Key: FLINK-6397
> URL: https://issues.apache.org/jira/browse/FLINK-6397
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.3.0
>Reporter: Chesnay Schepler
>Assignee: Biao Liu
>
> The MultipleProgramsTestBase sets a new TestEnvironment as a context 
> environment but never explicitly unsets it, which can result subsequent tests 
> categorically failing.
> The CustomDistributionITCase doesn't unset the context either; and some 
> streaming test that i haven't quite nailed down yet.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6178) Allow upgrades to state serializers

2017-05-07 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16000235#comment-16000235
 ] 

Ted Yu commented on FLINK-6178:
---

Alright.

Thanks, Gordon.

> Allow upgrades to state serializers
> ---
>
> Key: FLINK-6178
> URL: https://issues.apache.org/jira/browse/FLINK-6178
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.3.0
>
>
> Currently, users are locked in with the serializer implementation used to 
> write their state.
> This is suboptimal, as generally for users, it could easily be possible that 
> they wish to change their serialization formats / state schemas and types in 
> the future.
> This is an umbrella JIRA for the required tasks to make this possible.
> Here's an overview description of what to expect for the overall outcome of 
> this JIRA (the specific details are outlined in their respective subtasks):
> Ideally, the main user-facing change this would result in is that users 
> implementing their custom {{TypeSerializer}} s will also need to implement 
> hook methods that identify whether or not there is a change to the serialized 
> format or even a change to the serialized data type. It would be the user's 
> responsibility that the {{deserialize}} method can bridge the change between 
> the old / new formats.
> For Flink's built-in serializers that are automatically built using the 
> user's configuration (most notably the more complex {{KryoSerializer}} and 
> {{GenericArraySerializer}}), Flink should be able to automatically 
> "reconfigure" them using the new configuration, so that the reconfigured 
> versions can be used to de- / serialize previous state. This would require 
> knowledge of the previous configuration of the serializer, therefore 
> "serializer configuration metadata" will be added to savepoints.
> Note that for the first version of this, although additional infrastructure 
> (e.g. serializer reconfigure hooks, serializer configuration metadata in 
> savepoints) will be added to potentially allow Kryo version upgrade, this 
> JIRA will not cover this. Kryo has breaking binary formats across major 
> versions, and will most likely need some further changes. Therefore, for the 
> {{KryoSerializer}}, "upgrading" it simply means changes in the registration 
> of specific / default serializers, at least for now.
> Finally, we would need to add a "convertState" phase to the task lifecycle, 
> that takes place after the "open" phase and before checkpointing starts / the 
> task starts running. It can only happen after "open", because only then can 
> we be certain if any reconfiguration of state serialization has occurred, and 
> state needs to be converted. Ideally, the code for the "convertState" is 
> designed so that it can be easily exposed as an offline tool in the future.
> For this JIRA, we should simply assume that after {{open()}}, we have all the 
> required information and serializers are appropriately reconfigured. 
> [~srichter] is currently planning to deprecate RuntimeContext state 
> registration methods in favor of a new interface that enforces eager state 
> registration, so that we may have all the info after {{open()}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3810: [FLINK-6397] MultipleProgramsTestBase does not res...

2017-05-07 Thread ifndef-SleePy
Github user ifndef-SleePy commented on a diff in the pull request:

https://github.com/apache/flink/pull/3810#discussion_r115170542
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
 ---
@@ -80,29 +80,36 @@

protected final TestExecutionMode mode;
 
-   
+   private TestEnvironment testEnvironment;
+
+   private CollectionTestEnvironment collectionTestEnvironment;
+
public MultipleProgramsTestBase(TestExecutionMode mode) {
this.mode = mode;
-   
+
switch(mode){
case CLUSTER:
-   new TestEnvironment(cluster, 4).setAsContext();
+   testEnvironment = new TestEnvironment(cluster, 
4);
--- End diff --

Thank you for responding. Last my comment is not correct. Actually what I 
want to say is that we can not move these code to \@BeforeClass and 
\@AfterClass. But we can move them to \@Before and \@After, however it will 
bring a little overhead since \@Before and \@After will be called for each test 
method. I pushed a new commit using \@Before and \@After.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-05-07 Thread shuai.xu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16000232#comment-16000232
 ] 

shuai.xu commented on FLINK-6434:
-

[~till.rohrmann] This seems can not fix the bug totally, as between 
allocateSlot(allcationID1) and failAllocation(allcoationID1), another free slot 
with allocationID2 may fulfill the pending request, and the allocatedSlots 
record it will allocationID2, failAllocation(allcoationID1) can not release it, 
the slot is still leaked.

> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6178) Allow upgrades to state serializers

2017-05-07 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16000231#comment-16000231
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-6178:


[~te...@apache.org] regarding the equals: the base `super.equals(obj)` already 
performs the class check.

> Allow upgrades to state serializers
> ---
>
> Key: FLINK-6178
> URL: https://issues.apache.org/jira/browse/FLINK-6178
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.3.0
>
>
> Currently, users are locked in with the serializer implementation used to 
> write their state.
> This is suboptimal, as generally for users, it could easily be possible that 
> they wish to change their serialization formats / state schemas and types in 
> the future.
> This is an umbrella JIRA for the required tasks to make this possible.
> Here's an overview description of what to expect for the overall outcome of 
> this JIRA (the specific details are outlined in their respective subtasks):
> Ideally, the main user-facing change this would result in is that users 
> implementing their custom {{TypeSerializer}} s will also need to implement 
> hook methods that identify whether or not there is a change to the serialized 
> format or even a change to the serialized data type. It would be the user's 
> responsibility that the {{deserialize}} method can bridge the change between 
> the old / new formats.
> For Flink's built-in serializers that are automatically built using the 
> user's configuration (most notably the more complex {{KryoSerializer}} and 
> {{GenericArraySerializer}}), Flink should be able to automatically 
> "reconfigure" them using the new configuration, so that the reconfigured 
> versions can be used to de- / serialize previous state. This would require 
> knowledge of the previous configuration of the serializer, therefore 
> "serializer configuration metadata" will be added to savepoints.
> Note that for the first version of this, although additional infrastructure 
> (e.g. serializer reconfigure hooks, serializer configuration metadata in 
> savepoints) will be added to potentially allow Kryo version upgrade, this 
> JIRA will not cover this. Kryo has breaking binary formats across major 
> versions, and will most likely need some further changes. Therefore, for the 
> {{KryoSerializer}}, "upgrading" it simply means changes in the registration 
> of specific / default serializers, at least for now.
> Finally, we would need to add a "convertState" phase to the task lifecycle, 
> that takes place after the "open" phase and before checkpointing starts / the 
> task starts running. It can only happen after "open", because only then can 
> we be certain if any reconfiguration of state serialization has occurred, and 
> state needs to be converted. Ideally, the code for the "convertState" is 
> designed so that it can be easily exposed as an offline tool in the future.
> For this JIRA, we should simply assume that after {{open()}}, we have all the 
> required information and serializers are appropriately reconfigured. 
> [~srichter] is currently planning to deprecate RuntimeContext state 
> registration methods in favor of a new interface that enforces eager state 
> registration, so that we may have all the info after {{open()}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6462) Add requiresOver interface for AggregateFunction

2017-05-07 Thread sunjincheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-6462:
---
Summary: Add requiresOver interface for  AggregateFunction  (was: Add 
requires over indicators for  OVER UDAF)

> Add requiresOver interface for  AggregateFunction
> -
>
> Key: FLINK-6462
> URL: https://issues.apache.org/jira/browse/FLINK-6462
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> The window function in the standard database only supports the "over" window, 
> such as `LAG`,` LEAD`, `FIRST_VALUE`,` LAST_VALUE`. These window functions do 
> not apply to `Slide`,` Tumble`, `Session` window. So when the user-defined 
> AGG need to be clearly defined. In calcite `SqlAggFunction` will using 
> `requiresOver` to distinguish. 
> In this JIRA. will deal with this feature.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6178) Allow upgrades to state serializers

2017-05-07 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16000228#comment-16000228
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-6178:


[~te...@apache.org] the version of config snapshots is just an indicator. The 
intention was that if users had to change anything in the config snapshot 
(written info, serialization format etc.) they simply uptick the version value. 
Does that make sense?

> Allow upgrades to state serializers
> ---
>
> Key: FLINK-6178
> URL: https://issues.apache.org/jira/browse/FLINK-6178
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.3.0
>
>
> Currently, users are locked in with the serializer implementation used to 
> write their state.
> This is suboptimal, as generally for users, it could easily be possible that 
> they wish to change their serialization formats / state schemas and types in 
> the future.
> This is an umbrella JIRA for the required tasks to make this possible.
> Here's an overview description of what to expect for the overall outcome of 
> this JIRA (the specific details are outlined in their respective subtasks):
> Ideally, the main user-facing change this would result in is that users 
> implementing their custom {{TypeSerializer}} s will also need to implement 
> hook methods that identify whether or not there is a change to the serialized 
> format or even a change to the serialized data type. It would be the user's 
> responsibility that the {{deserialize}} method can bridge the change between 
> the old / new formats.
> For Flink's built-in serializers that are automatically built using the 
> user's configuration (most notably the more complex {{KryoSerializer}} and 
> {{GenericArraySerializer}}), Flink should be able to automatically 
> "reconfigure" them using the new configuration, so that the reconfigured 
> versions can be used to de- / serialize previous state. This would require 
> knowledge of the previous configuration of the serializer, therefore 
> "serializer configuration metadata" will be added to savepoints.
> Note that for the first version of this, although additional infrastructure 
> (e.g. serializer reconfigure hooks, serializer configuration metadata in 
> savepoints) will be added to potentially allow Kryo version upgrade, this 
> JIRA will not cover this. Kryo has breaking binary formats across major 
> versions, and will most likely need some further changes. Therefore, for the 
> {{KryoSerializer}}, "upgrading" it simply means changes in the registration 
> of specific / default serializers, at least for now.
> Finally, we would need to add a "convertState" phase to the task lifecycle, 
> that takes place after the "open" phase and before checkpointing starts / the 
> task starts running. It can only happen after "open", because only then can 
> we be certain if any reconfiguration of state serialization has occurred, and 
> state needs to be converted. Ideally, the code for the "convertState" is 
> designed so that it can be easily exposed as an offline tool in the future.
> For this JIRA, we should simply assume that after {{open()}}, we have all the 
> required information and serializers are appropriately reconfigured. 
> [~srichter] is currently planning to deprecate RuntimeContext state 
> registration methods in favor of a new interface that enforces eager state 
> registration, so that we may have all the info after {{open()}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3736: [FLINK-6013][metrics] Add Datadog HTTP metrics reporter

2017-05-07 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/3736
  
@zentol cool! Let me know how to get this into 1.3 :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6013) Add Datadog HTTP metrics reporter

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16000225#comment-16000225
 ] 

ASF GitHub Bot commented on FLINK-6013:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/3736
  
@zentol cool! Let me know how to get this into 1.3 :)


> Add Datadog HTTP metrics reporter
> -
>
> Key: FLINK-6013
> URL: https://issues.apache.org/jira/browse/FLINK-6013
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.3.0
>
>
> We at OfferUp use Datadog a lot for metrics and dashboards, and I believe a 
> lot other companies also do.
> Flink right now only has a StatsD metrics reporter, and users have to set up 
> Datadog Agent in order to receive metrics from StatsD and transport them to 
> Datadog. We don't like this approach.
> We prefer to have a Datadog metrics reporter directly contacting Datadog http 
> endpoint.
> I'll take this ticket myself.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3802: Add Evenly Graph Generator to Flink Gelly

2017-05-07 Thread fanzhidongyzby
Github user fanzhidongyzby commented on the issue:

https://github.com/apache/flink/pull/3802
  
@greghogan , ok, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6376) when deploy flink cluster on the yarn, it is lack of hdfs delegation token.

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16000162#comment-16000162
 ] 

ASF GitHub Bot commented on FLINK-6376:
---

Github user Rucongzhang commented on the issue:

https://github.com/apache/flink/pull/3776
  
After resolving this problem, we find another problem, when we configure 
the keytab 、principal, and  add the HDFS delegation token,  the JM 、TM also use 
this token, but not keytab when communication with HDFS. When token is expired, 
no one in flink to refresh the token.  
But the purpose of adding this token , which is only used for  yarn node 
manager. We now is resolving this problem. Thanks!


> when deploy flink cluster on the yarn, it is lack of hdfs delegation token.
> ---
>
> Key: FLINK-6376
> URL: https://issues.apache.org/jira/browse/FLINK-6376
> Project: Flink
>  Issue Type: Bug
>  Components: Security, YARN
>Reporter: zhangrucong1982
>Assignee: zhangrucong1982
>
> 1、I use the flink of version 1.2.0. And  I deploy the flink cluster on the 
> yarn. The hadoop version is 2.7.2.
> 2、I use flink in security model with the keytab and principal. And the key 
> configuration is :security.kerberos.login.keytab: /home/ketab/test.keytab 
> 、security.kerberos.login.principal: test.
> 3、The yarn configuration is default and enable the yarn log aggregation 
> configuration" yarn.log-aggregation-enable : true";
> 4、 Deploying the flink cluster  on the yarn,  the yarn Node manager occur the 
> following failure when aggregation the log in HDFS. The basic reason is lack 
> of HDFS  delegation token. 
>  java.io.IOException: Failed on local exception: java.io.IOException: 
> org.apache.hadoop.security.AccessControlException: Client cannot authenticate 
> via:[TOKEN, KERBEROS]; Host Details : local host is: 
> "SZV1000258954/10.162.181.24"; destination host is: "SZV1000258954":25000;
> at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:796)
> at org.apache.hadoop.ipc.Client.call(Client.java:1515)
> at org.apache.hadoop.ipc.Client.call(Client.java:1447)
> at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
> at com.sun.proxy.$Proxy26.getFileInfo(Unknown Source)
> at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:802)
> at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:201)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
> at com.sun.proxy.$Proxy27.getFileInfo(Unknown Source)
> at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1919)
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1500)
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1496)
> at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1496)
> at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService.checkExists(LogAggregationService.java:271)
> at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService.access$100(LogAggregationService.java:68)
> at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService$1.run(LogAggregationService.java:299)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1769)
> at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService.createAppDir(LogAggregationService.java:284)
> at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService.initAppAggregator(LogAggregationService.java:390)
> at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService.initApp(LogAggregationService.java:342)
> at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService.handle(LogAggregationService.java:470)
> at 
> org.apache.ha

[GitHub] flink issue #3776: [FLINK-6376]when deploy flink cluster on the yarn, it is ...

2017-05-07 Thread Rucongzhang
Github user Rucongzhang commented on the issue:

https://github.com/apache/flink/pull/3776
  
After resolving this problem, we find another problem, when we configure 
the keytab 、principal, and  add the HDFS delegation token,  the JM 、TM also 
use this token, but not keytab when communication with HDFS. When token is 
expired, no one in flink to refresh the token.  
But the purpose of adding this token , which is only used for  yarn node 
manager. We now is resolving this problem. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-6477) The first time to click Taskmanager cannot get the actual data

2017-05-07 Thread zhihao chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhihao chen updated FLINK-6477:
---
Attachment: errDisplay.jpg

> The first time to click Taskmanager cannot get the actual data
> --
>
> Key: FLINK-6477
> URL: https://issues.apache.org/jira/browse/FLINK-6477
> Project: Flink
>  Issue Type: Bug
>  Components: Web Client
>Affects Versions: 1.2.0
>Reporter: zhihao chen
>Assignee: zhihao chen
> Attachments: errDisplay.jpg
>
>
> Flink web page first click Taskmanager to get less than the actual data, when 
> the parameter “jobmanager.web.refresh-interval” is set to a larger value, eg: 
> 180, if you do not manually refresh the page you need to wait time after 
> the timeout normal display



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6477) The first time to click Taskmanager cannot get the actual data

2017-05-07 Thread zhihao chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhihao chen updated FLINK-6477:
---
Affects Version/s: 1.2.0
  Component/s: Web Client

> The first time to click Taskmanager cannot get the actual data
> --
>
> Key: FLINK-6477
> URL: https://issues.apache.org/jira/browse/FLINK-6477
> Project: Flink
>  Issue Type: Bug
>  Components: Web Client
>Affects Versions: 1.2.0
>Reporter: zhihao chen
>Assignee: zhihao chen
>
> Flink web page first click Taskmanager to get less than the actual data, when 
> the parameter “jobmanager.web.refresh-interval” is set to a larger value, eg: 
> 180, if you do not manually refresh the page you need to wait time after 
> the timeout normal display



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6477) The first time to click Taskmanager cannot get the actual data

2017-05-07 Thread zhihao chen (JIRA)
zhihao chen created FLINK-6477:
--

 Summary: The first time to click Taskmanager cannot get the actual 
data
 Key: FLINK-6477
 URL: https://issues.apache.org/jira/browse/FLINK-6477
 Project: Flink
  Issue Type: Bug
Reporter: zhihao chen
Assignee: zhihao chen


Flink web page first click Taskmanager to get less than the actual data, when 
the parameter “jobmanager.web.refresh-interval” is set to a larger value, eg: 
180, if you do not manually refresh the page you need to wait time after 
the timeout normal display



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6178) Allow upgrades to state serializers

2017-05-07 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16000141#comment-16000141
 ] 

Ted Yu commented on FLINK-6178:
---

For EnumSerializerConfigSnapshot :
{code}
+   public boolean equals(Object obj) {
+ return super.equals(obj)
+ && Arrays.equals(
+   enumConstants,
+   ((EnumSerializerConfigSnapshot) obj).getEnumConstants());
{code}
Should there be a check that obj is instance of EnumSerializerConfigSnapshot 
before casting ?

> Allow upgrades to state serializers
> ---
>
> Key: FLINK-6178
> URL: https://issues.apache.org/jira/browse/FLINK-6178
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.3.0
>
>
> Currently, users are locked in with the serializer implementation used to 
> write their state.
> This is suboptimal, as generally for users, it could easily be possible that 
> they wish to change their serialization formats / state schemas and types in 
> the future.
> This is an umbrella JIRA for the required tasks to make this possible.
> Here's an overview description of what to expect for the overall outcome of 
> this JIRA (the specific details are outlined in their respective subtasks):
> Ideally, the main user-facing change this would result in is that users 
> implementing their custom {{TypeSerializer}} s will also need to implement 
> hook methods that identify whether or not there is a change to the serialized 
> format or even a change to the serialized data type. It would be the user's 
> responsibility that the {{deserialize}} method can bridge the change between 
> the old / new formats.
> For Flink's built-in serializers that are automatically built using the 
> user's configuration (most notably the more complex {{KryoSerializer}} and 
> {{GenericArraySerializer}}), Flink should be able to automatically 
> "reconfigure" them using the new configuration, so that the reconfigured 
> versions can be used to de- / serialize previous state. This would require 
> knowledge of the previous configuration of the serializer, therefore 
> "serializer configuration metadata" will be added to savepoints.
> Note that for the first version of this, although additional infrastructure 
> (e.g. serializer reconfigure hooks, serializer configuration metadata in 
> savepoints) will be added to potentially allow Kryo version upgrade, this 
> JIRA will not cover this. Kryo has breaking binary formats across major 
> versions, and will most likely need some further changes. Therefore, for the 
> {{KryoSerializer}}, "upgrading" it simply means changes in the registration 
> of specific / default serializers, at least for now.
> Finally, we would need to add a "convertState" phase to the task lifecycle, 
> that takes place after the "open" phase and before checkpointing starts / the 
> task starts running. It can only happen after "open", because only then can 
> we be certain if any reconfiguration of state serialization has occurred, and 
> state needs to be converted. Ideally, the code for the "convertState" is 
> designed so that it can be easily exposed as an offline tool in the future.
> For this JIRA, we should simply assume that after {{open()}}, we have all the 
> required information and serializers are appropriately reconfigured. 
> [~srichter] is currently planning to deprecate RuntimeContext state 
> registration methods in favor of a new interface that enforces eager state 
> registration, so that we may have all the info after {{open()}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6178) Allow upgrades to state serializers

2017-05-07 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16000127#comment-16000127
 ] 

Ted Yu commented on FLINK-6178:
---

WritableSerializerConfigSnapshot#getVersion() returns an int.
Should we distinguish between major vs. minor versions ?

> Allow upgrades to state serializers
> ---
>
> Key: FLINK-6178
> URL: https://issues.apache.org/jira/browse/FLINK-6178
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.3.0
>
>
> Currently, users are locked in with the serializer implementation used to 
> write their state.
> This is suboptimal, as generally for users, it could easily be possible that 
> they wish to change their serialization formats / state schemas and types in 
> the future.
> This is an umbrella JIRA for the required tasks to make this possible.
> Here's an overview description of what to expect for the overall outcome of 
> this JIRA (the specific details are outlined in their respective subtasks):
> Ideally, the main user-facing change this would result in is that users 
> implementing their custom {{TypeSerializer}} s will also need to implement 
> hook methods that identify whether or not there is a change to the serialized 
> format or even a change to the serialized data type. It would be the user's 
> responsibility that the {{deserialize}} method can bridge the change between 
> the old / new formats.
> For Flink's built-in serializers that are automatically built using the 
> user's configuration (most notably the more complex {{KryoSerializer}} and 
> {{GenericArraySerializer}}), Flink should be able to automatically 
> "reconfigure" them using the new configuration, so that the reconfigured 
> versions can be used to de- / serialize previous state. This would require 
> knowledge of the previous configuration of the serializer, therefore 
> "serializer configuration metadata" will be added to savepoints.
> Note that for the first version of this, although additional infrastructure 
> (e.g. serializer reconfigure hooks, serializer configuration metadata in 
> savepoints) will be added to potentially allow Kryo version upgrade, this 
> JIRA will not cover this. Kryo has breaking binary formats across major 
> versions, and will most likely need some further changes. Therefore, for the 
> {{KryoSerializer}}, "upgrading" it simply means changes in the registration 
> of specific / default serializers, at least for now.
> Finally, we would need to add a "convertState" phase to the task lifecycle, 
> that takes place after the "open" phase and before checkpointing starts / the 
> task starts running. It can only happen after "open", because only then can 
> we be certain if any reconfiguration of state serialization has occurred, and 
> state needs to be converted. Ideally, the code for the "convertState" is 
> designed so that it can be easily exposed as an offline tool in the future.
> For this JIRA, we should simply assume that after {{open()}}, we have all the 
> required information and serializers are appropriately reconfigured. 
> [~srichter] is currently planning to deprecate RuntimeContext state 
> registration methods in favor of a new interface that enforces eager state 
> registration, so that we may have all the info after {{open()}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3736: [FLINK-6013][metrics] Add Datadog HTTP metrics reporter

2017-05-07 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3736
  
No, I think we addressed all issues :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6013) Add Datadog HTTP metrics reporter

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1673#comment-1673
 ] 

ASF GitHub Bot commented on FLINK-6013:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3736
  
No, I think we addressed all issues :)


> Add Datadog HTTP metrics reporter
> -
>
> Key: FLINK-6013
> URL: https://issues.apache.org/jira/browse/FLINK-6013
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.3.0
>
>
> We at OfferUp use Datadog a lot for metrics and dashboards, and I believe a 
> lot other companies also do.
> Flink right now only has a StatsD metrics reporter, and users have to set up 
> Datadog Agent in order to receive metrics from StatsD and transport them to 
> Datadog. We don't like this approach.
> We prefer to have a Datadog metrics reporter directly contacting Datadog http 
> endpoint.
> I'll take this ticket myself.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6474) Potential loss of precision in 32 bit integer multiplication

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1669#comment-1669
 ] 

ASF GitHub Bot commented on FLINK-6474:
---

Github user tedyu closed the pull request at:

https://github.com/apache/flink/pull/3839


> Potential loss of precision in 32 bit integer multiplication
> 
>
> Key: FLINK-6474
> URL: https://issues.apache.org/jira/browse/FLINK-6474
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> In TaskManagerServicesConfiguration#parseNetworkEnvironmentConfiguration
> {code}
> if (!hasNewNetworkBufConf(configuration)) {
>   // map old config to new one:
>   networkBufMin = networkBufMax = numNetworkBuffers * pageSize;
> {code}
> networkBufMax is a long.
> However the multiplication is done in 32 bit integer, leading to potential 
> loss of precision.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (FLINK-6474) Potential loss of precision in 32 bit integer multiplication

2017-05-07 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu resolved FLINK-6474.
---
Resolution: Fixed
  Assignee: Ted Yu

> Potential loss of precision in 32 bit integer multiplication
> 
>
> Key: FLINK-6474
> URL: https://issues.apache.org/jira/browse/FLINK-6474
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Minor
>
> In TaskManagerServicesConfiguration#parseNetworkEnvironmentConfiguration
> {code}
> if (!hasNewNetworkBufConf(configuration)) {
>   // map old config to new one:
>   networkBufMin = networkBufMax = numNetworkBuffers * pageSize;
> {code}
> networkBufMax is a long.
> However the multiplication is done in 32 bit integer, leading to potential 
> loss of precision.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3839: FLINK-6474 Potential loss of precision in 32 bit i...

2017-05-07 Thread tedyu
Github user tedyu closed the pull request at:

https://github.com/apache/flink/pull/3839


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6471) RocksDBStateBackendTest#testCancelRunningSnapshot sometimes fails

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1662#comment-1662
 ] 

ASF GitHub Bot commented on FLINK-6471:
---

Github user StefanRRichter closed the pull request at:

https://github.com/apache/flink/pull/3837


> RocksDBStateBackendTest#testCancelRunningSnapshot sometimes fails
> -
>
> Key: FLINK-6471
> URL: https://issues.apache.org/jira/browse/FLINK-6471
> Project: Flink
>  Issue Type: Test
>Reporter: Ted Yu
>Assignee: Stefan Richter
>Priority: Minor
> Fix For: 1.3.0
>
>
> I got the following test failure based on commit 
> f37988c19adc30d324cde83c54f2fa5d36efb9e7 :
> {code}
> testCancelRunningSnapshot[1](org.apache.flink.contrib.streaming.state.RocksDBStateBackendTest)
>   Time elapsed: 0.166 sec  <<< FAILURE!
> java.lang.AssertionError: null
>   at org.junit.Assert.fail(Assert.java:86)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at org.junit.Assert.assertTrue(Assert.java:52)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackendTest.testCancelRunningSnapshot(RocksDBStateBackendTest.java:313)
> Results :
> Failed tests:
>   RocksDBStateBackendTest.testCancelRunningSnapshot:313 null
> {code}
> The following assertion fails:
> {code}
> assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3837: [FLINK-6471] [checkpoint] Fix RocksDBStateBackendTest

2017-05-07 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3837
  
Merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6471) RocksDBStateBackendTest#testCancelRunningSnapshot sometimes fails

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1661#comment-1661
 ] 

ASF GitHub Bot commented on FLINK-6471:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3837
  
Merged.


> RocksDBStateBackendTest#testCancelRunningSnapshot sometimes fails
> -
>
> Key: FLINK-6471
> URL: https://issues.apache.org/jira/browse/FLINK-6471
> Project: Flink
>  Issue Type: Test
>Reporter: Ted Yu
>Assignee: Stefan Richter
>Priority: Minor
> Fix For: 1.3.0
>
>
> I got the following test failure based on commit 
> f37988c19adc30d324cde83c54f2fa5d36efb9e7 :
> {code}
> testCancelRunningSnapshot[1](org.apache.flink.contrib.streaming.state.RocksDBStateBackendTest)
>   Time elapsed: 0.166 sec  <<< FAILURE!
> java.lang.AssertionError: null
>   at org.junit.Assert.fail(Assert.java:86)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at org.junit.Assert.assertTrue(Assert.java:52)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackendTest.testCancelRunningSnapshot(RocksDBStateBackendTest.java:313)
> Results :
> Failed tests:
>   RocksDBStateBackendTest.testCancelRunningSnapshot:313 null
> {code}
> The following assertion fails:
> {code}
> assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3837: [FLINK-6471] [checkpoint] Fix RocksDBStateBackendT...

2017-05-07 Thread StefanRRichter
Github user StefanRRichter closed the pull request at:

https://github.com/apache/flink/pull/3837


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6474) Potential loss of precision in 32 bit integer multiplication

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1658#comment-1658
 ] 

ASF GitHub Bot commented on FLINK-6474:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3839
  
Merged manually in 38003c2829. Please close this PR and the jira.


> Potential loss of precision in 32 bit integer multiplication
> 
>
> Key: FLINK-6474
> URL: https://issues.apache.org/jira/browse/FLINK-6474
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> In TaskManagerServicesConfiguration#parseNetworkEnvironmentConfiguration
> {code}
> if (!hasNewNetworkBufConf(configuration)) {
>   // map old config to new one:
>   networkBufMin = networkBufMax = numNetworkBuffers * pageSize;
> {code}
> networkBufMax is a long.
> However the multiplication is done in 32 bit integer, leading to potential 
> loss of precision.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3839: FLINK-6474 Potential loss of precision in 32 bit integer ...

2017-05-07 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3839
  
Merged manually in 38003c2829. Please close this PR and the jira.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3839: FLINK-6474 Potential loss of precision in 32 bit integer ...

2017-05-07 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3839
  
LGTM +1, will merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6474) Potential loss of precision in 32 bit integer multiplication

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1650#comment-1650
 ] 

ASF GitHub Bot commented on FLINK-6474:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3839
  
LGTM +1, will merge.


> Potential loss of precision in 32 bit integer multiplication
> 
>
> Key: FLINK-6474
> URL: https://issues.apache.org/jira/browse/FLINK-6474
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> In TaskManagerServicesConfiguration#parseNetworkEnvironmentConfiguration
> {code}
> if (!hasNewNetworkBufConf(configuration)) {
>   // map old config to new one:
>   networkBufMin = networkBufMax = numNetworkBuffers * pageSize;
> {code}
> networkBufMax is a long.
> However the multiplication is done in 32 bit integer, leading to potential 
> loss of precision.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6474) Potential loss of precision in 32 bit integer multiplication

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1644#comment-1644
 ] 

ASF GitHub Bot commented on FLINK-6474:
---

Github user tedyu commented on the issue:

https://github.com/apache/flink/pull/3839
  
@StefanRRichter
Can you take a look ?


> Potential loss of precision in 32 bit integer multiplication
> 
>
> Key: FLINK-6474
> URL: https://issues.apache.org/jira/browse/FLINK-6474
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> In TaskManagerServicesConfiguration#parseNetworkEnvironmentConfiguration
> {code}
> if (!hasNewNetworkBufConf(configuration)) {
>   // map old config to new one:
>   networkBufMin = networkBufMax = numNetworkBuffers * pageSize;
> {code}
> networkBufMax is a long.
> However the multiplication is done in 32 bit integer, leading to potential 
> loss of precision.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3839: FLINK-6474 Potential loss of precision in 32 bit integer ...

2017-05-07 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/flink/pull/3839
  
@StefanRRichter
Can you take a look ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-6475) Incremental snapshots in RocksDB hold lock during async file upload

2017-05-07 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-6475.
-
Resolution: Fixed

Fixed in 38003c2829.

> Incremental snapshots in RocksDB hold lock during async file upload
> ---
>
> Key: FLINK-6475
> URL: https://issues.apache.org/jira/browse/FLINK-6475
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Critical
> Fix For: 1.3.0
>
>
> The implementation of incremental checkpoints in RocksDB mistakenly holds the 
> {{asyncSnapshotLock}} during the whole async part, effectively blocking all 
> asynchronous processing. Holding the lock is only required in the synchronous 
> part, while the backup to local FS is running.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6471) RocksDBStateBackendTest#testCancelRunningSnapshot sometimes fails

2017-05-07 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-6471.
-
   Resolution: Fixed
 Assignee: Stefan Richter
Fix Version/s: 1.3.0

Fixed in b8ffacb1b8.

> RocksDBStateBackendTest#testCancelRunningSnapshot sometimes fails
> -
>
> Key: FLINK-6471
> URL: https://issues.apache.org/jira/browse/FLINK-6471
> Project: Flink
>  Issue Type: Test
>Reporter: Ted Yu
>Assignee: Stefan Richter
>Priority: Minor
> Fix For: 1.3.0
>
>
> I got the following test failure based on commit 
> f37988c19adc30d324cde83c54f2fa5d36efb9e7 :
> {code}
> testCancelRunningSnapshot[1](org.apache.flink.contrib.streaming.state.RocksDBStateBackendTest)
>   Time elapsed: 0.166 sec  <<< FAILURE!
> java.lang.AssertionError: null
>   at org.junit.Assert.fail(Assert.java:86)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at org.junit.Assert.assertTrue(Assert.java:52)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackendTest.testCancelRunningSnapshot(RocksDBStateBackendTest.java:313)
> Results :
> Failed tests:
>   RocksDBStateBackendTest.testCancelRunningSnapshot:313 null
> {code}
> The following assertion fails:
> {code}
> assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6178) Allow upgrades to state serializers

2017-05-07 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-6178.
--

> Allow upgrades to state serializers
> ---
>
> Key: FLINK-6178
> URL: https://issues.apache.org/jira/browse/FLINK-6178
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.3.0
>
>
> Currently, users are locked in with the serializer implementation used to 
> write their state.
> This is suboptimal, as generally for users, it could easily be possible that 
> they wish to change their serialization formats / state schemas and types in 
> the future.
> This is an umbrella JIRA for the required tasks to make this possible.
> Here's an overview description of what to expect for the overall outcome of 
> this JIRA (the specific details are outlined in their respective subtasks):
> Ideally, the main user-facing change this would result in is that users 
> implementing their custom {{TypeSerializer}} s will also need to implement 
> hook methods that identify whether or not there is a change to the serialized 
> format or even a change to the serialized data type. It would be the user's 
> responsibility that the {{deserialize}} method can bridge the change between 
> the old / new formats.
> For Flink's built-in serializers that are automatically built using the 
> user's configuration (most notably the more complex {{KryoSerializer}} and 
> {{GenericArraySerializer}}), Flink should be able to automatically 
> "reconfigure" them using the new configuration, so that the reconfigured 
> versions can be used to de- / serialize previous state. This would require 
> knowledge of the previous configuration of the serializer, therefore 
> "serializer configuration metadata" will be added to savepoints.
> Note that for the first version of this, although additional infrastructure 
> (e.g. serializer reconfigure hooks, serializer configuration metadata in 
> savepoints) will be added to potentially allow Kryo version upgrade, this 
> JIRA will not cover this. Kryo has breaking binary formats across major 
> versions, and will most likely need some further changes. Therefore, for the 
> {{KryoSerializer}}, "upgrading" it simply means changes in the registration 
> of specific / default serializers, at least for now.
> Finally, we would need to add a "convertState" phase to the task lifecycle, 
> that takes place after the "open" phase and before checkpointing starts / the 
> task starts running. It can only happen after "open", because only then can 
> we be certain if any reconfiguration of state serialization has occurred, and 
> state needs to be converted. Ideally, the code for the "convertState" is 
> designed so that it can be easily exposed as an offline tool in the future.
> For this JIRA, we should simply assume that after {{open()}}, we have all the 
> required information and serializers are appropriately reconfigured. 
> [~srichter] is currently planning to deprecate RuntimeContext state 
> registration methods in favor of a new interface that enforces eager state 
> registration, so that we may have all the info after {{open()}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6191) Make non-primitive, internal built-in serializers reconfigurable

2017-05-07 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-6191.
--

> Make non-primitive, internal built-in serializers reconfigurable
> 
>
> Key: FLINK-6191
> URL: https://issues.apache.org/jira/browse/FLINK-6191
> Project: Flink
>  Issue Type: Sub-task
>  Components: Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.3.0
>
>
> This sub-task follows after FLINK-6190.
> For non-primitive type serializers internally created by Flink, we need to 
> allow them to be reconfigurable whenever we detect a change between the 
> previous and current serializer configuration.
> Most notably, this is relevant for the {{KryoSerializer}} and 
> {{PojoSerializer}} which are affected by the configuration, as well as 
> composite types which can potentially have nested serializers (e.g. 
> {{GenericArraySerializer}}).
> Since not all serializers require / reconfiguration, we propose to have a 
> extended abstract base class for these:
> {code}
> @Internal
> public abstract class ReconfigurableTypeSerializer extends 
> TypeSerializer {
> void abstract reconfigure(SerializersConfig serializersConfig);
> }
> {code}
> This class is also used as a tag, to check if a serializer needs to be 
> reconfigured when serializer configuration change is detected.
> Note that type serializer reconfiguration is only a mechanic internal to 
> Flink. User custom serializers cannot rely on reconfiguration to bridge 
> upgrades; they should be responsible that the {{deserialize}} method is able 
> to read old state.
> For the {{KryoSerializer}}, reconfiguration is basically making sure that all 
> previous registrations are existent in the exact same order, and new 
> registrations are only appended. This allows the reconfigured serializer to 
> be able to read old state.
> For the {{PojoSerializer}} and other serializers that may have nested 
> serializers, reconfiguration should basically be a {{reconfigure}} call from 
> the top serializer, traversing through all nested serializers and 
> reconfiguring them, until there are no more reconfigurable serializers.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6425) Integrate serializer reconfiguration into state restore flow to activate serializer upgrades

2017-05-07 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6425?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-6425:
---
Fix Version/s: 1.3.0

> Integrate serializer reconfiguration into state restore flow to activate 
> serializer upgrades
> 
>
> Key: FLINK-6425
> URL: https://issues.apache.org/jira/browse/FLINK-6425
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.3.0
>
>
> With FLINK-6191, {{TypeSerializer}} will be reconfigurable.
> From the state backends' point of view, serializer reconfiguration doubles as 
> a mechanism to determine how serializer upgrades should be handled.
> The general idea is that state checkpoints should contain the following as 
> the state's metainfo:
> - the previous serializer
> - snapshot of the previous serializer's configuration
> The upgrade flow is as follows:
> 1. On restore, try to deserialize the previous old serializer. 
> Deserialization may fail if a) the serializer no longer exists in classpath, 
> or b) the serializer class is not longer valid (i.e., implementation changed 
> and resulted in different serialVersionUID). In this case, use a dummy 
> serializer as a placeholder. This dummy serializer is currently the 
> {{ClassNotFoundProxySerializer}} in the code.
> 2. Deserialize the configuration snapshot of the previous old serializer. The 
> configuration snapshot must be successfully deserialized, otherwise the state 
> restore fails.
> 3. When we get the new registered serializer for the state (could be a 
> completely new serializer, the same serializer with different 
> implementations, or the exact same serializer untouched; either way they are 
> seen as a new serializer), we use the configuration snapshot of the old 
> serializer to reconfigure the new serializer.
> This completes the upgrade of the old serializer.  However, depending on the 
> result of the upgrade, state conversion needs to take place (for now, if 
> state conversion is required, we just fail the job as this functionality 
> isn't available yet). The results could be:
> - Compatible: restore success + serializer upgraded.
> - Compatible, but serialization schema changed: serializer upgraded but 
> requires state conversion, without the requirement that the old serializer 
> needs to be present.
> - Incompatible: serializer upgraded requires state conversion, but requires 
> the old serializer to be present (i.e., can not be the dummy 
> {{ClassNotFoundProxySerializer}}).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6190) Write "Serializer Configurations" metainfo along with state

2017-05-07 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-6190:
---
Fix Version/s: 1.3.0

> Write "Serializer Configurations" metainfo along with state
> ---
>
> Key: FLINK-6190
> URL: https://issues.apache.org/jira/browse/FLINK-6190
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.3.0
>
>
> In order for serializers to be able to be reconfigured on restore, we need 
> knowledge of the previous serializer configurations, e.g. what types were 
> registered, with which specific / default serializers, and especially for 
> Kryo, the order they were registered.
> For this, we will need serializer configuration metainfo to be self-contained 
> within the written state.
> For the implementation, we propose to include the following changes:
> - Have a new separate {{SerializersConfig}} class that is extracted from 
> {{ExecutionConfig}}. This new class should contain only the 
> serializer-related configurations (e.g., {{registeredKryoTypes}}, 
> {{registeredPojoTypes}}, etc.). The {{SerializersConfig}} class should only 
> be internally used, and therefore annotated with {{Internal}}. Users should 
> still use the {{ExecutionConfig}} to configure serializers.
> - For serializers that previously require a {{ExecutionConfig}} in 
> constructors, try changing them to take a {{SerializersConfig}} instead.
> - Introduce {{SerializersConfigSerializationProxy}}, which is in charge of 
> serializing the current {{SerializersConfig}} when writing state to streams. 
> This proxy defines the the serialized format of serializer configurations, 
> therefore should  be a {{VersionedIOReadableWritable}} as we may change the 
> format / information written in the future.
> - Add {{SerializersConfigSerializationProxy}} into state backends 
> serialization proxies (e.g. {{KeyedBackendSerializationProxy}}) so that the 
> serializer configuration is written into state. Need to additionally make 
> sure backwards compatibility of previous-version backend serialization 
> proxies.
> For the initial version, we propose to include the following within the 
> written serialization config metadata (ordered):
> 1. {{registeredPojoTypes}}
> 2. {{Throwable.class}} --> 
> {{org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer}} default 
> Flink-specific registry for serializing throwables.
> 3. {{defaultKryoSerializers}}
> 4. {{defaultKryoSerializerClasses}}
> 5. Kryo registrations for all primitive types (and boxed versions). This is 
> to allow compatibility in case the built-in registrations for the primitive 
> types change in Kryo in the future.
> 6. {{registeredTypesWithKryoSerializers}}
> 7. {{registeredTypesWithKryoSerializerClasses}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6425) Integrate serializer reconfiguration into state restore flow to activate serializer upgrades

2017-05-07 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6425?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-6425.
--

> Integrate serializer reconfiguration into state restore flow to activate 
> serializer upgrades
> 
>
> Key: FLINK-6425
> URL: https://issues.apache.org/jira/browse/FLINK-6425
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.3.0
>
>
> With FLINK-6191, {{TypeSerializer}} will be reconfigurable.
> From the state backends' point of view, serializer reconfiguration doubles as 
> a mechanism to determine how serializer upgrades should be handled.
> The general idea is that state checkpoints should contain the following as 
> the state's metainfo:
> - the previous serializer
> - snapshot of the previous serializer's configuration
> The upgrade flow is as follows:
> 1. On restore, try to deserialize the previous old serializer. 
> Deserialization may fail if a) the serializer no longer exists in classpath, 
> or b) the serializer class is not longer valid (i.e., implementation changed 
> and resulted in different serialVersionUID). In this case, use a dummy 
> serializer as a placeholder. This dummy serializer is currently the 
> {{ClassNotFoundProxySerializer}} in the code.
> 2. Deserialize the configuration snapshot of the previous old serializer. The 
> configuration snapshot must be successfully deserialized, otherwise the state 
> restore fails.
> 3. When we get the new registered serializer for the state (could be a 
> completely new serializer, the same serializer with different 
> implementations, or the exact same serializer untouched; either way they are 
> seen as a new serializer), we use the configuration snapshot of the old 
> serializer to reconfigure the new serializer.
> This completes the upgrade of the old serializer.  However, depending on the 
> result of the upgrade, state conversion needs to take place (for now, if 
> state conversion is required, we just fail the job as this functionality 
> isn't available yet). The results could be:
> - Compatible: restore success + serializer upgraded.
> - Compatible, but serialization schema changed: serializer upgraded but 
> requires state conversion, without the requirement that the old serializer 
> needs to be present.
> - Incompatible: serializer upgraded requires state conversion, but requires 
> the old serializer to be present (i.e., can not be the dummy 
> {{ClassNotFoundProxySerializer}}).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6191) Make non-primitive, internal built-in serializers reconfigurable

2017-05-07 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-6191:
---
Fix Version/s: 1.3.0

> Make non-primitive, internal built-in serializers reconfigurable
> 
>
> Key: FLINK-6191
> URL: https://issues.apache.org/jira/browse/FLINK-6191
> Project: Flink
>  Issue Type: Sub-task
>  Components: Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.3.0
>
>
> This sub-task follows after FLINK-6190.
> For non-primitive type serializers internally created by Flink, we need to 
> allow them to be reconfigurable whenever we detect a change between the 
> previous and current serializer configuration.
> Most notably, this is relevant for the {{KryoSerializer}} and 
> {{PojoSerializer}} which are affected by the configuration, as well as 
> composite types which can potentially have nested serializers (e.g. 
> {{GenericArraySerializer}}).
> Since not all serializers require / reconfiguration, we propose to have a 
> extended abstract base class for these:
> {code}
> @Internal
> public abstract class ReconfigurableTypeSerializer extends 
> TypeSerializer {
> void abstract reconfigure(SerializersConfig serializersConfig);
> }
> {code}
> This class is also used as a tag, to check if a serializer needs to be 
> reconfigured when serializer configuration change is detected.
> Note that type serializer reconfiguration is only a mechanic internal to 
> Flink. User custom serializers cannot rely on reconfiguration to bridge 
> upgrades; they should be responsible that the {{deserialize}} method is able 
> to read old state.
> For the {{KryoSerializer}}, reconfiguration is basically making sure that all 
> previous registrations are existent in the exact same order, and new 
> registrations are only appended. This allows the reconfigured serializer to 
> be able to read old state.
> For the {{PojoSerializer}} and other serializers that may have nested 
> serializers, reconfiguration should basically be a {{reconfigure}} call from 
> the top serializer, traversing through all nested serializers and 
> reconfiguring them, until there are no more reconfigurable serializers.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6190) Write "Serializer Configurations" metainfo along with state

2017-05-07 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-6190.
--

> Write "Serializer Configurations" metainfo along with state
> ---
>
> Key: FLINK-6190
> URL: https://issues.apache.org/jira/browse/FLINK-6190
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.3.0
>
>
> In order for serializers to be able to be reconfigured on restore, we need 
> knowledge of the previous serializer configurations, e.g. what types were 
> registered, with which specific / default serializers, and especially for 
> Kryo, the order they were registered.
> For this, we will need serializer configuration metainfo to be self-contained 
> within the written state.
> For the implementation, we propose to include the following changes:
> - Have a new separate {{SerializersConfig}} class that is extracted from 
> {{ExecutionConfig}}. This new class should contain only the 
> serializer-related configurations (e.g., {{registeredKryoTypes}}, 
> {{registeredPojoTypes}}, etc.). The {{SerializersConfig}} class should only 
> be internally used, and therefore annotated with {{Internal}}. Users should 
> still use the {{ExecutionConfig}} to configure serializers.
> - For serializers that previously require a {{ExecutionConfig}} in 
> constructors, try changing them to take a {{SerializersConfig}} instead.
> - Introduce {{SerializersConfigSerializationProxy}}, which is in charge of 
> serializing the current {{SerializersConfig}} when writing state to streams. 
> This proxy defines the the serialized format of serializer configurations, 
> therefore should  be a {{VersionedIOReadableWritable}} as we may change the 
> format / information written in the future.
> - Add {{SerializersConfigSerializationProxy}} into state backends 
> serialization proxies (e.g. {{KeyedBackendSerializationProxy}}) so that the 
> serializer configuration is written into state. Need to additionally make 
> sure backwards compatibility of previous-version backend serialization 
> proxies.
> For the initial version, we propose to include the following within the 
> written serialization config metadata (ordered):
> 1. {{registeredPojoTypes}}
> 2. {{Throwable.class}} --> 
> {{org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer}} default 
> Flink-specific registry for serializing throwables.
> 3. {{defaultKryoSerializers}}
> 4. {{defaultKryoSerializerClasses}}
> 5. Kryo registrations for all primitive types (and boxed versions). This is 
> to allow compatibility in case the built-in registrations for the primitive 
> types change in Kryo in the future.
> 6. {{registeredTypesWithKryoSerializers}}
> 7. {{registeredTypesWithKryoSerializerClasses}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (FLINK-6191) Make non-primitive, internal built-in serializers reconfigurable

2017-05-07 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-6191.

Resolution: Fixed

Merged for 1.3.0 with 
https://git-wip-us.apache.org/repos/asf/flink/commit/63c04a5

> Make non-primitive, internal built-in serializers reconfigurable
> 
>
> Key: FLINK-6191
> URL: https://issues.apache.org/jira/browse/FLINK-6191
> Project: Flink
>  Issue Type: Sub-task
>  Components: Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> This sub-task follows after FLINK-6190.
> For non-primitive type serializers internally created by Flink, we need to 
> allow them to be reconfigurable whenever we detect a change between the 
> previous and current serializer configuration.
> Most notably, this is relevant for the {{KryoSerializer}} and 
> {{PojoSerializer}} which are affected by the configuration, as well as 
> composite types which can potentially have nested serializers (e.g. 
> {{GenericArraySerializer}}).
> Since not all serializers require / reconfiguration, we propose to have a 
> extended abstract base class for these:
> {code}
> @Internal
> public abstract class ReconfigurableTypeSerializer extends 
> TypeSerializer {
> void abstract reconfigure(SerializersConfig serializersConfig);
> }
> {code}
> This class is also used as a tag, to check if a serializer needs to be 
> reconfigured when serializer configuration change is detected.
> Note that type serializer reconfiguration is only a mechanic internal to 
> Flink. User custom serializers cannot rely on reconfiguration to bridge 
> upgrades; they should be responsible that the {{deserialize}} method is able 
> to read old state.
> For the {{KryoSerializer}}, reconfiguration is basically making sure that all 
> previous registrations are existent in the exact same order, and new 
> registrations are only appended. This allows the reconfigured serializer to 
> be able to read old state.
> For the {{PojoSerializer}} and other serializers that may have nested 
> serializers, reconfiguration should basically be a {{reconfigure}} call from 
> the top serializer, traversing through all nested serializers and 
> reconfiguring them, until there are no more reconfigurable serializers.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (FLINK-6425) Integrate serializer reconfiguration into state restore flow to activate serializer upgrades

2017-05-07 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6425?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-6425.

Resolution: Fixed

Merged for 1.3.0 with 
https://git-wip-us.apache.org/repos/asf/flink/commit/63c04a5

> Integrate serializer reconfiguration into state restore flow to activate 
> serializer upgrades
> 
>
> Key: FLINK-6425
> URL: https://issues.apache.org/jira/browse/FLINK-6425
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> With FLINK-6191, {{TypeSerializer}} will be reconfigurable.
> From the state backends' point of view, serializer reconfiguration doubles as 
> a mechanism to determine how serializer upgrades should be handled.
> The general idea is that state checkpoints should contain the following as 
> the state's metainfo:
> - the previous serializer
> - snapshot of the previous serializer's configuration
> The upgrade flow is as follows:
> 1. On restore, try to deserialize the previous old serializer. 
> Deserialization may fail if a) the serializer no longer exists in classpath, 
> or b) the serializer class is not longer valid (i.e., implementation changed 
> and resulted in different serialVersionUID). In this case, use a dummy 
> serializer as a placeholder. This dummy serializer is currently the 
> {{ClassNotFoundProxySerializer}} in the code.
> 2. Deserialize the configuration snapshot of the previous old serializer. The 
> configuration snapshot must be successfully deserialized, otherwise the state 
> restore fails.
> 3. When we get the new registered serializer for the state (could be a 
> completely new serializer, the same serializer with different 
> implementations, or the exact same serializer untouched; either way they are 
> seen as a new serializer), we use the configuration snapshot of the old 
> serializer to reconfigure the new serializer.
> This completes the upgrade of the old serializer.  However, depending on the 
> result of the upgrade, state conversion needs to take place (for now, if 
> state conversion is required, we just fail the job as this functionality 
> isn't available yet). The results could be:
> - Compatible: restore success + serializer upgraded.
> - Compatible, but serialization schema changed: serializer upgraded but 
> requires state conversion, without the requirement that the old serializer 
> needs to be present.
> - Incompatible: serializer upgraded requires state conversion, but requires 
> the old serializer to be present (i.e., can not be the dummy 
> {{ClassNotFoundProxySerializer}}).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6425) Integrate serializer reconfiguration into state restore flow to activate serializer upgrades

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1617#comment-1617
 ] 

ASF GitHub Bot commented on FLINK-6425:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3834


> Integrate serializer reconfiguration into state restore flow to activate 
> serializer upgrades
> 
>
> Key: FLINK-6425
> URL: https://issues.apache.org/jira/browse/FLINK-6425
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> With FLINK-6191, {{TypeSerializer}} will be reconfigurable.
> From the state backends' point of view, serializer reconfiguration doubles as 
> a mechanism to determine how serializer upgrades should be handled.
> The general idea is that state checkpoints should contain the following as 
> the state's metainfo:
> - the previous serializer
> - snapshot of the previous serializer's configuration
> The upgrade flow is as follows:
> 1. On restore, try to deserialize the previous old serializer. 
> Deserialization may fail if a) the serializer no longer exists in classpath, 
> or b) the serializer class is not longer valid (i.e., implementation changed 
> and resulted in different serialVersionUID). In this case, use a dummy 
> serializer as a placeholder. This dummy serializer is currently the 
> {{ClassNotFoundProxySerializer}} in the code.
> 2. Deserialize the configuration snapshot of the previous old serializer. The 
> configuration snapshot must be successfully deserialized, otherwise the state 
> restore fails.
> 3. When we get the new registered serializer for the state (could be a 
> completely new serializer, the same serializer with different 
> implementations, or the exact same serializer untouched; either way they are 
> seen as a new serializer), we use the configuration snapshot of the old 
> serializer to reconfigure the new serializer.
> This completes the upgrade of the old serializer.  However, depending on the 
> result of the upgrade, state conversion needs to take place (for now, if 
> state conversion is required, we just fail the job as this functionality 
> isn't available yet). The results could be:
> - Compatible: restore success + serializer upgraded.
> - Compatible, but serialization schema changed: serializer upgraded but 
> requires state conversion, without the requirement that the old serializer 
> needs to be present.
> - Incompatible: serializer upgraded requires state conversion, but requires 
> the old serializer to be present (i.e., can not be the dummy 
> {{ClassNotFoundProxySerializer}}).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (FLINK-6178) Allow upgrades to state serializers

2017-05-07 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-6178.

Resolution: Fixed

Merged for 1.3.0 with 
https://git-wip-us.apache.org/repos/asf/flink/commit/63c04a5

> Allow upgrades to state serializers
> ---
>
> Key: FLINK-6178
> URL: https://issues.apache.org/jira/browse/FLINK-6178
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.3.0
>
>
> Currently, users are locked in with the serializer implementation used to 
> write their state.
> This is suboptimal, as generally for users, it could easily be possible that 
> they wish to change their serialization formats / state schemas and types in 
> the future.
> This is an umbrella JIRA for the required tasks to make this possible.
> Here's an overview description of what to expect for the overall outcome of 
> this JIRA (the specific details are outlined in their respective subtasks):
> Ideally, the main user-facing change this would result in is that users 
> implementing their custom {{TypeSerializer}} s will also need to implement 
> hook methods that identify whether or not there is a change to the serialized 
> format or even a change to the serialized data type. It would be the user's 
> responsibility that the {{deserialize}} method can bridge the change between 
> the old / new formats.
> For Flink's built-in serializers that are automatically built using the 
> user's configuration (most notably the more complex {{KryoSerializer}} and 
> {{GenericArraySerializer}}), Flink should be able to automatically 
> "reconfigure" them using the new configuration, so that the reconfigured 
> versions can be used to de- / serialize previous state. This would require 
> knowledge of the previous configuration of the serializer, therefore 
> "serializer configuration metadata" will be added to savepoints.
> Note that for the first version of this, although additional infrastructure 
> (e.g. serializer reconfigure hooks, serializer configuration metadata in 
> savepoints) will be added to potentially allow Kryo version upgrade, this 
> JIRA will not cover this. Kryo has breaking binary formats across major 
> versions, and will most likely need some further changes. Therefore, for the 
> {{KryoSerializer}}, "upgrading" it simply means changes in the registration 
> of specific / default serializers, at least for now.
> Finally, we would need to add a "convertState" phase to the task lifecycle, 
> that takes place after the "open" phase and before checkpointing starts / the 
> task starts running. It can only happen after "open", because only then can 
> we be certain if any reconfiguration of state serialization has occurred, and 
> state needs to be converted. Ideally, the code for the "convertState" is 
> designed so that it can be easily exposed as an offline tool in the future.
> For this JIRA, we should simply assume that after {{open()}}, we have all the 
> required information and serializers are appropriately reconfigured. 
> [~srichter] is currently planning to deprecate RuntimeContext state 
> registration methods in favor of a new interface that enforces eager state 
> registration, so that we may have all the info after {{open()}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (FLINK-6190) Write "Serializer Configurations" metainfo along with state

2017-05-07 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-6190.

Resolution: Fixed

Merged for 1.3.0 with 
https://git-wip-us.apache.org/repos/asf/flink/commit/63c04a5

> Write "Serializer Configurations" metainfo along with state
> ---
>
> Key: FLINK-6190
> URL: https://issues.apache.org/jira/browse/FLINK-6190
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> In order for serializers to be able to be reconfigured on restore, we need 
> knowledge of the previous serializer configurations, e.g. what types were 
> registered, with which specific / default serializers, and especially for 
> Kryo, the order they were registered.
> For this, we will need serializer configuration metainfo to be self-contained 
> within the written state.
> For the implementation, we propose to include the following changes:
> - Have a new separate {{SerializersConfig}} class that is extracted from 
> {{ExecutionConfig}}. This new class should contain only the 
> serializer-related configurations (e.g., {{registeredKryoTypes}}, 
> {{registeredPojoTypes}}, etc.). The {{SerializersConfig}} class should only 
> be internally used, and therefore annotated with {{Internal}}. Users should 
> still use the {{ExecutionConfig}} to configure serializers.
> - For serializers that previously require a {{ExecutionConfig}} in 
> constructors, try changing them to take a {{SerializersConfig}} instead.
> - Introduce {{SerializersConfigSerializationProxy}}, which is in charge of 
> serializing the current {{SerializersConfig}} when writing state to streams. 
> This proxy defines the the serialized format of serializer configurations, 
> therefore should  be a {{VersionedIOReadableWritable}} as we may change the 
> format / information written in the future.
> - Add {{SerializersConfigSerializationProxy}} into state backends 
> serialization proxies (e.g. {{KeyedBackendSerializationProxy}}) so that the 
> serializer configuration is written into state. Need to additionally make 
> sure backwards compatibility of previous-version backend serialization 
> proxies.
> For the initial version, we propose to include the following within the 
> written serialization config metadata (ordered):
> 1. {{registeredPojoTypes}}
> 2. {{Throwable.class}} --> 
> {{org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer}} default 
> Flink-specific registry for serializing throwables.
> 3. {{defaultKryoSerializers}}
> 4. {{defaultKryoSerializerClasses}}
> 5. Kryo registrations for all primitive types (and boxed versions). This is 
> to allow compatibility in case the built-in registrations for the primitive 
> types change in Kryo in the future.
> 6. {{registeredTypesWithKryoSerializers}}
> 7. {{registeredTypesWithKryoSerializerClasses}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6190) Write "Serializer Configurations" metainfo along with state

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1618#comment-1618
 ] 

ASF GitHub Bot commented on FLINK-6190:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3804


> Write "Serializer Configurations" metainfo along with state
> ---
>
> Key: FLINK-6190
> URL: https://issues.apache.org/jira/browse/FLINK-6190
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> In order for serializers to be able to be reconfigured on restore, we need 
> knowledge of the previous serializer configurations, e.g. what types were 
> registered, with which specific / default serializers, and especially for 
> Kryo, the order they were registered.
> For this, we will need serializer configuration metainfo to be self-contained 
> within the written state.
> For the implementation, we propose to include the following changes:
> - Have a new separate {{SerializersConfig}} class that is extracted from 
> {{ExecutionConfig}}. This new class should contain only the 
> serializer-related configurations (e.g., {{registeredKryoTypes}}, 
> {{registeredPojoTypes}}, etc.). The {{SerializersConfig}} class should only 
> be internally used, and therefore annotated with {{Internal}}. Users should 
> still use the {{ExecutionConfig}} to configure serializers.
> - For serializers that previously require a {{ExecutionConfig}} in 
> constructors, try changing them to take a {{SerializersConfig}} instead.
> - Introduce {{SerializersConfigSerializationProxy}}, which is in charge of 
> serializing the current {{SerializersConfig}} when writing state to streams. 
> This proxy defines the the serialized format of serializer configurations, 
> therefore should  be a {{VersionedIOReadableWritable}} as we may change the 
> format / information written in the future.
> - Add {{SerializersConfigSerializationProxy}} into state backends 
> serialization proxies (e.g. {{KeyedBackendSerializationProxy}}) so that the 
> serializer configuration is written into state. Need to additionally make 
> sure backwards compatibility of previous-version backend serialization 
> proxies.
> For the initial version, we propose to include the following within the 
> written serialization config metadata (ordered):
> 1. {{registeredPojoTypes}}
> 2. {{Throwable.class}} --> 
> {{org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer}} default 
> Flink-specific registry for serializing throwables.
> 3. {{defaultKryoSerializers}}
> 4. {{defaultKryoSerializerClasses}}
> 5. Kryo registrations for all primitive types (and boxed versions). This is 
> to allow compatibility in case the built-in registrations for the primitive 
> types change in Kryo in the future.
> 6. {{registeredTypesWithKryoSerializers}}
> 7. {{registeredTypesWithKryoSerializerClasses}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3834: [FLINK-6425] [runtime] Activate serializer upgrade...

2017-05-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3834


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3804: [FLINK-6190] [core] Reconfigurable TypeSerializers

2017-05-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3804


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6474) Potential loss of precision in 32 bit integer multiplication

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1607#comment-1607
 ] 

ASF GitHub Bot commented on FLINK-6474:
---

GitHub user tedyu opened a pull request:

https://github.com/apache/flink/pull/3839

FLINK-6474 Potential loss of precision in 32 bit integer multiplication

Cast numNetworkBuffers to long before multiplication.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tedyu/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3839.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3839


commit 3f35aab20f4f0c3b11dbee3cfd5bbbe0249a9aa5
Author: tedyu 
Date:   2017-05-07T19:20:27Z

FLINK-6474 Potential loss of precision in 32 bit integer multiplication




> Potential loss of precision in 32 bit integer multiplication
> 
>
> Key: FLINK-6474
> URL: https://issues.apache.org/jira/browse/FLINK-6474
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> In TaskManagerServicesConfiguration#parseNetworkEnvironmentConfiguration
> {code}
> if (!hasNewNetworkBufConf(configuration)) {
>   // map old config to new one:
>   networkBufMin = networkBufMax = numNetworkBuffers * pageSize;
> {code}
> networkBufMax is a long.
> However the multiplication is done in 32 bit integer, leading to potential 
> loss of precision.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3839: FLINK-6474 Potential loss of precision in 32 bit i...

2017-05-07 Thread tedyu
GitHub user tedyu opened a pull request:

https://github.com/apache/flink/pull/3839

FLINK-6474 Potential loss of precision in 32 bit integer multiplication

Cast numNetworkBuffers to long before multiplication.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tedyu/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3839.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3839


commit 3f35aab20f4f0c3b11dbee3cfd5bbbe0249a9aa5
Author: tedyu 
Date:   2017-05-07T19:20:27Z

FLINK-6474 Potential loss of precision in 32 bit integer multiplication




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6476) Table environment register row data stream

2017-05-07 Thread radu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1591#comment-1591
 ] 

radu commented on FLINK-6476:
-

[~fhueske] [~sunjincheng121] [~shijinkui] [~stefano.bortoli] [~Yuhong_kyo] 
[~twalthr]

I found that there is a lack of support for registering as table source streams 
of type row. I believe we should support this to be more generic in the way we 
create table sources. Potenially we can transmit together with the stream the 
rowtype by creating a dedicated interface.

What do you think?

> Table environment register row data stream
> --
>
> Key: FLINK-6476
> URL: https://issues.apache.org/jira/browse/FLINK-6476
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
> Environment: java/scala
>Reporter: radu
>Assignee: radu
>  Labels: feature, patch
>
> Registering as table source streams with Row is currently not possible:
> Java:
> DataStream ds = ...
> tableEnv.registerDataStream("MyTableRow", ds, "a, b, c ...");
> org.apache.flink.table.api.TableException: Source of type Row(f0: Integer, 
> f1: Long, f2: Integer, f3: String, f4: Integer) cannot be converted into 
> Table.
>   at 
> org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:680)
>   at 
> org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:363)
>   at 
> org.apache.flink.table.api.java.StreamTableEnvironment.registerDataStream(StreamTableEnvironment.scala:133)
>   at 
> org.apache.flink.table.api.java.stream.sql.SqlITCase.testRow2(SqlITCase.java:92)
> Scala:
> val ds:DataStream[Row] = ...
> tableEnv.registerDataStream("MyTableRow", ds, "a, b, c, d, e");
> org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to 
> org.apache.flink.api.common.typeutils.CompositeType
> This can be supported by extending the in the  
> org.apache.flink.table.api.TableEnvironment
> getFieldInfo()
> and by constructing the StreamTableSource correspondingly



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6476) Table environment register row data stream

2017-05-07 Thread radu (JIRA)
radu created FLINK-6476:
---

 Summary: Table environment register row data stream
 Key: FLINK-6476
 URL: https://issues.apache.org/jira/browse/FLINK-6476
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
 Environment: java/scala
Reporter: radu
Assignee: radu


Registering as table source streams with Row is currently not possible:

Java:

DataStream ds = ...
tableEnv.registerDataStream("MyTableRow", ds, "a, b, c ...");


org.apache.flink.table.api.TableException: Source of type Row(f0: Integer, f1: 
Long, f2: Integer, f3: String, f4: Integer) cannot be converted into Table.
at 
org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:680)
at 
org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:363)
at 
org.apache.flink.table.api.java.StreamTableEnvironment.registerDataStream(StreamTableEnvironment.scala:133)
at 
org.apache.flink.table.api.java.stream.sql.SqlITCase.testRow2(SqlITCase.java:92)


Scala:

val ds:DataStream[Row] = ...
tableEnv.registerDataStream("MyTableRow", ds, "a, b, c, d, e");

org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to 
org.apache.flink.api.common.typeutils.CompositeType


This can be supported by extending the in the  
org.apache.flink.table.api.TableEnvironment

getFieldInfo()

and by constructing the StreamTableSource correspondingly



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6475) Incremental snapshots in RocksDB hold lock during async file upload

2017-05-07 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-6475:
-

 Summary: Incremental snapshots in RocksDB hold lock during async 
file upload
 Key: FLINK-6475
 URL: https://issues.apache.org/jira/browse/FLINK-6475
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Reporter: Stefan Richter
Assignee: Stefan Richter
Priority: Critical
 Fix For: 1.3.0


The implementation of incremental checkpoints in RocksDB mistakenly holds the 
{{asyncSnapshotLock}} during the whole async part, effectively blocking all 
asynchronous processing. Holding the lock is only required in the synchronous 
part, while the backup to local FS is running.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6474) Potential loss of precision in 32 bit integer multiplication

2017-05-07 Thread Ted Yu (JIRA)
Ted Yu created FLINK-6474:
-

 Summary: Potential loss of precision in 32 bit integer 
multiplication
 Key: FLINK-6474
 URL: https://issues.apache.org/jira/browse/FLINK-6474
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu
Priority: Minor


In TaskManagerServicesConfiguration#parseNetworkEnvironmentConfiguration
{code}
if (!hasNewNetworkBufConf(configuration)) {
  // map old config to new one:
  networkBufMin = networkBufMax = numNetworkBuffers * pageSize;
{code}
networkBufMax is a long.
However the multiplication is done in 32 bit integer, leading to potential loss 
of precision.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999873#comment-15999873
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

GitHub user zohar-mizrahi opened a pull request:

https://github.com/apache/flink/pull/3838

[FLINK-5886] Python API for streaming applications

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zohar-mizrahi/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3838.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3838


commit c1333c3424897caa615683d3494b41e7ab88d45d
Author: Zohar Mizrahi 
Date:   2016-11-15T12:46:36Z

[FLINK-5886] Python API for streaming applications




> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2017-05-07 Thread zohar-mizrahi
GitHub user zohar-mizrahi opened a pull request:

https://github.com/apache/flink/pull/3838

[FLINK-5886] Python API for streaming applications

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zohar-mizrahi/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3838.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3838


commit c1333c3424897caa615683d3494b41e7ab88d45d
Author: Zohar Mizrahi 
Date:   2016-11-15T12:46:36Z

[FLINK-5886] Python API for streaming applications




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6473) Add OVER window support for batch tables

2017-05-07 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-6473:


 Summary: Add OVER window support for batch tables
 Key: FLINK-6473
 URL: https://issues.apache.org/jira/browse/FLINK-6473
 Project: Flink
  Issue Type: New Feature
  Components: Table API & SQL
Reporter: Fabian Hueske


Add support for OVER windows for batch tables. 

Since OVER windows are supported for streaming tables, this issue is not about 
the API (which is available) but about adding the execution strategies and 
translation for OVER windows on batch tables.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6471) RocksDBStateBackendTest#testCancelRunningSnapshot sometimes fails

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999854#comment-15999854
 ] 

ASF GitHub Bot commented on FLINK-6471:
---

Github user tedyu commented on the issue:

https://github.com/apache/flink/pull/3837
  
lgtm


> RocksDBStateBackendTest#testCancelRunningSnapshot sometimes fails
> -
>
> Key: FLINK-6471
> URL: https://issues.apache.org/jira/browse/FLINK-6471
> Project: Flink
>  Issue Type: Test
>Reporter: Ted Yu
>Priority: Minor
>
> I got the following test failure based on commit 
> f37988c19adc30d324cde83c54f2fa5d36efb9e7 :
> {code}
> testCancelRunningSnapshot[1](org.apache.flink.contrib.streaming.state.RocksDBStateBackendTest)
>   Time elapsed: 0.166 sec  <<< FAILURE!
> java.lang.AssertionError: null
>   at org.junit.Assert.fail(Assert.java:86)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at org.junit.Assert.assertTrue(Assert.java:52)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackendTest.testCancelRunningSnapshot(RocksDBStateBackendTest.java:313)
> Results :
> Failed tests:
>   RocksDBStateBackendTest.testCancelRunningSnapshot:313 null
> {code}
> The following assertion fails:
> {code}
> assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3837: [FLINK-6471] [checkpoint] Fix RocksDBStateBackendTest

2017-05-07 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/flink/pull/3837
  
lgtm


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-4681) Add SessionRow row-windows for batch tables.

2017-05-07 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-4681.

Resolution: Won't Do

> Add SessionRow row-windows for batch tables.
> 
>
> Key: FLINK-4681
> URL: https://issues.apache.org/jira/browse/FLINK-4681
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>
> Add SessionRow row-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-4679) Add TumbleRow row-windows to Table API

2017-05-07 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-4679.

Resolution: Won't Do

> Add TumbleRow row-windows to Table API
> --
>
> Key: FLINK-4679
> URL: https://issues.apache.org/jira/browse/FLINK-4679
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Jark Wu
>
> Add TumbleRow row-windows for streaming tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  
> TumbleRow row windows are a syntactic shortcut for a special type of SQL OVER 
> windows, i.e., windows of the from 
> {code}
> SELECT STREAM rowtime,
>   productId,
>   units,
>   SUM(units) OVER (PARTITION BY FLOOR(rowtime() TO HOUR)) AS 
> unitsSinceTopOfHour
> FROM Orders;
> {code}
> i.e., OVER windows which are partitioned by the time attribute. 
> This issue is about extending the Table API with the TumbleRow shortcut. It 
> should reuse the OVER window translation and runtime code implemented by 
> FLINK-5653, FLINK-5654, FLINK-5655, FLINK-5656, FLINK-5657, and FLINK-5658.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-4678) Add SessionRow row-windows to Table API

2017-05-07 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-4678.

Resolution: Won't Do

> Add SessionRow row-windows to Table API
> ---
>
> Key: FLINK-4678
> URL: https://issues.apache.org/jira/browse/FLINK-4678
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>
> Add SessionRow row-windows for streaming tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  
> SessionRow row windows are a syntactic shortcut for a special type of SQL 
> OVER windows, i.e., windows of the from (note this is not valid SQL):
> {code}
> SELECT STREAM rowtime,
>   productId,
>   units,
>   SUM(units) OVER (PARTITION BY sessionId(rowtime, INTERVAL '5' MINUTES) )) 
> AS unitsSinceTopOfHour
> FROM Orders;
> {code}
> i.e., OVER windows which are partitioned by a special function sessionId over 
> the time attribute (the second argument defines the gap between sessions).
> This issue is about extending the Table API with the SessionRow shortcut. It 
> should reuse a lot of the OVER window translation and runtime code 
> implemented by FLINK-5653, FLINK-5654, FLINK-5655, FLINK-5656, FLINK-5657, 
> and FLINK-5658 but will likely require to add a special type of window 
> boundary inside of Calcite to hold the session semantics.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5047) Add sliding group-windows for batch tables

2017-05-07 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-5047:
-
Issue Type: New Feature  (was: Sub-task)
Parent: (was: FLINK-4557)

> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as in the 1) approach. The benefits of this is that it a) is 
> based on the implementation that supports non-combinable aggregates (which is 
> required in any case) and b) that it does not require the implementation of 
> the SQL WINDOW operator. Internally, this can be implemented again as an 
> optimizer rule that translates the SlidingWindow into a pre-aggregating 
> TublingWindow and a final SlidingWindow (with replication).
> see FLINK-4692 for more discussion



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-4683) Add SlideRow row-windows for batch tables

2017-05-07 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-4683.

Resolution: Won't Do

> Add SlideRow row-windows for batch tables
> -
>
> Key: FLINK-4683
> URL: https://issues.apache.org/jira/browse/FLINK-4683
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Add SlideRow row-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-4682) Add TumbleRow row-windows for batch tables.

2017-05-07 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-4682.

Resolution: Won't Do

> Add TumbleRow row-windows for batch tables.
> ---
>
> Key: FLINK-4682
> URL: https://issues.apache.org/jira/browse/FLINK-4682
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>
> Add TumbleRow row-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6033) Support UNNEST query in the stream SQL API

2017-05-07 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-6033.

   Resolution: Implemented
Fix Version/s: 1.3.0

Implemented with fe4e96a726dd32fb948db050b975312e120e2461

> Support UNNEST query in the stream SQL API
> --
>
> Key: FLINK-6033
> URL: https://issues.apache.org/jira/browse/FLINK-6033
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
> Fix For: 1.3.0
>
>
> It would be nice to support the {{UNNEST}} keyword in the stream SQL API. 
> The keyword is widely used in queries that relate to nested fields.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6257) Post-pass OVER windows

2017-05-07 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-6257.

   Resolution: Done
Fix Version/s: 1.3.0

Done with 9f2293cfdab960246fe1aea1d705eea18a011761

> Post-pass OVER windows
> --
>
> Key: FLINK-6257
> URL: https://issues.apache.org/jira/browse/FLINK-6257
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>Priority: Critical
> Fix For: 1.3.0
>
>
> The OVER windows have been implemented by several contributors.
> We should do a post pass over the contributed code and improve a few things.
> * Functionality
> ** Currently every time attribute is allowed as ORDER BY attribute. We must 
> check that this is actually a time indicator ({{procTime()}}, {{rowTime()}}) 
> and that the order is ASCENDING.
> * Documentation
> ** Add documentation for OVER windows
> * Code style
> ** Consistent naming of {{ProcessFunctions}} and methods
> * Tests
> ** Move the OVER window tests out of SqlITCase into a dedicated class
> ** Move the OVER window tests out of WindowAggregateTest into a dedicated 
> class
> ** Add tests based on the test harness for all {{ProcessFunctions}} similar 
> to {{BoundedProcessingOverRangeProcessFunction}}. The tests should include 
> exact boundary checks for range windows and check for proper parallelization 
> with multiple keys.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3836: [FLINK-6448] Web UI TaskManager view: Rename 'Free...

2017-05-07 Thread yanghua
Github user yanghua closed the pull request at:

https://github.com/apache/flink/pull/3836


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6448) Web UI TaskManager view: Rename 'Free Memory' to 'JVM Heap'

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999849#comment-15999849
 ] 

ASF GitHub Bot commented on FLINK-6448:
---

Github user yanghua closed the pull request at:

https://github.com/apache/flink/pull/3836


> Web UI TaskManager view: Rename 'Free Memory' to 'JVM Heap'
> ---
>
> Key: FLINK-6448
> URL: https://issues.apache.org/jira/browse/FLINK-6448
> Project: Flink
>  Issue Type: Sub-task
>  Components: Webfrontend
>Reporter: Stephan Ewen
>Assignee: yanghua
>  Labels: easyfix, starter
>
> In the TaskManager view, the laben 'Free Memory' is wrong / misleading and 
> should be 'JVM Heap Size' instead.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6471) RocksDBStateBackendTest#testCancelRunningSnapshot sometimes fails

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999839#comment-15999839
 ] 

ASF GitHub Bot commented on FLINK-6471:
---

GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/3837

[FLINK-6471] [checkpoint] Fix RocksDBStateBackendTest

`RocksDBStateBackendTest::testCancelRunningSnapshot` sometimes failes. This 
is cause by a problem that I fixed in `BlockerCheckpointStreamFactory`. 

However, it seems like for unknown reason (probably a move refactoring + 
merge) this class existed twice, with all duplicated code. This PR deletes the 
copy and uses the one correct implementation everywhere.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink FLINK-6471

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3837.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3837






> RocksDBStateBackendTest#testCancelRunningSnapshot sometimes fails
> -
>
> Key: FLINK-6471
> URL: https://issues.apache.org/jira/browse/FLINK-6471
> Project: Flink
>  Issue Type: Test
>Reporter: Ted Yu
>Priority: Minor
>
> I got the following test failure based on commit 
> f37988c19adc30d324cde83c54f2fa5d36efb9e7 :
> {code}
> testCancelRunningSnapshot[1](org.apache.flink.contrib.streaming.state.RocksDBStateBackendTest)
>   Time elapsed: 0.166 sec  <<< FAILURE!
> java.lang.AssertionError: null
>   at org.junit.Assert.fail(Assert.java:86)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at org.junit.Assert.assertTrue(Assert.java:52)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackendTest.testCancelRunningSnapshot(RocksDBStateBackendTest.java:313)
> Results :
> Failed tests:
>   RocksDBStateBackendTest.testCancelRunningSnapshot:313 null
> {code}
> The following assertion fails:
> {code}
> assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3837: [FLINK-6471] [checkpoint] Fix RocksDBStateBackendT...

2017-05-07 Thread StefanRRichter
GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/3837

[FLINK-6471] [checkpoint] Fix RocksDBStateBackendTest

`RocksDBStateBackendTest::testCancelRunningSnapshot` sometimes failes. This 
is cause by a problem that I fixed in `BlockerCheckpointStreamFactory`. 

However, it seems like for unknown reason (probably a move refactoring + 
merge) this class existed twice, with all duplicated code. This PR deletes the 
copy and uses the one correct implementation everywhere.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink FLINK-6471

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3837.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3837






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6471) RocksDBStateBackendTest#testCancelRunningSnapshot sometimes fails

2017-05-07 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999837#comment-15999837
 ] 

Stefan Richter commented on FLINK-6471:
---

Thanks for reporting this problem.

This is cause by a problem that I fixed in `BlockerCheckpointStreamFactory`. 
However, it seems like for unknown reason (probably a move refactoring + merge) 
this class existed twice, with all duplicated code. I will delete the copy and 
just use the one, correct implementation everywhere.

> RocksDBStateBackendTest#testCancelRunningSnapshot sometimes fails
> -
>
> Key: FLINK-6471
> URL: https://issues.apache.org/jira/browse/FLINK-6471
> Project: Flink
>  Issue Type: Test
>Reporter: Ted Yu
>Priority: Minor
>
> I got the following test failure based on commit 
> f37988c19adc30d324cde83c54f2fa5d36efb9e7 :
> {code}
> testCancelRunningSnapshot[1](org.apache.flink.contrib.streaming.state.RocksDBStateBackendTest)
>   Time elapsed: 0.166 sec  <<< FAILURE!
> java.lang.AssertionError: null
>   at org.junit.Assert.fail(Assert.java:86)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at org.junit.Assert.assertTrue(Assert.java:52)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackendTest.testCancelRunningSnapshot(RocksDBStateBackendTest.java:313)
> Results :
> Failed tests:
>   RocksDBStateBackendTest.testCancelRunningSnapshot:313 null
> {code}
> The following assertion fails:
> {code}
> assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6425) Integrate serializer reconfiguration into state restore flow to activate serializer upgrades

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999833#comment-15999833
 ] 

ASF GitHub Bot commented on FLINK-6425:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3834
  
Thanks a lot @StefanRRichter for the review, especially on the weekends :)

I'll proceed to merge this after addressing also the comment on the 
priority flow between the serialized old serializer and the convert serializer.


> Integrate serializer reconfiguration into state restore flow to activate 
> serializer upgrades
> 
>
> Key: FLINK-6425
> URL: https://issues.apache.org/jira/browse/FLINK-6425
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> With FLINK-6191, {{TypeSerializer}} will be reconfigurable.
> From the state backends' point of view, serializer reconfiguration doubles as 
> a mechanism to determine how serializer upgrades should be handled.
> The general idea is that state checkpoints should contain the following as 
> the state's metainfo:
> - the previous serializer
> - snapshot of the previous serializer's configuration
> The upgrade flow is as follows:
> 1. On restore, try to deserialize the previous old serializer. 
> Deserialization may fail if a) the serializer no longer exists in classpath, 
> or b) the serializer class is not longer valid (i.e., implementation changed 
> and resulted in different serialVersionUID). In this case, use a dummy 
> serializer as a placeholder. This dummy serializer is currently the 
> {{ClassNotFoundProxySerializer}} in the code.
> 2. Deserialize the configuration snapshot of the previous old serializer. The 
> configuration snapshot must be successfully deserialized, otherwise the state 
> restore fails.
> 3. When we get the new registered serializer for the state (could be a 
> completely new serializer, the same serializer with different 
> implementations, or the exact same serializer untouched; either way they are 
> seen as a new serializer), we use the configuration snapshot of the old 
> serializer to reconfigure the new serializer.
> This completes the upgrade of the old serializer.  However, depending on the 
> result of the upgrade, state conversion needs to take place (for now, if 
> state conversion is required, we just fail the job as this functionality 
> isn't available yet). The results could be:
> - Compatible: restore success + serializer upgraded.
> - Compatible, but serialization schema changed: serializer upgraded but 
> requires state conversion, without the requirement that the old serializer 
> needs to be present.
> - Incompatible: serializer upgraded requires state conversion, but requires 
> the old serializer to be present (i.e., can not be the dummy 
> {{ClassNotFoundProxySerializer}}).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3834: [FLINK-6425] [runtime] Activate serializer upgrades in st...

2017-05-07 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3834
  
Thanks a lot @StefanRRichter for the review, especially on the weekends :)

I'll proceed to merge this after addressing also the comment on the 
priority flow between the serialized old serializer and the convert serializer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3834: [FLINK-6425] [runtime] Activate serializer upgrades in st...

2017-05-07 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3834
  
I was already familiar with the high level design of this PR from my 
discussions with @tzulitai. As the feature freeze is already tomorrow, I had to 
focus my review on the functionality of the design (i.e. if all cases in 
backwards compatibility that we want to cover are possible and future plans 
like state transformation can base upon this work). I also had a look into a 
couple of important tests, but could not go through all the implementation 
details for now.

Overall, I think this is very good work and improves a ton of things w.r.t. 
everything that has to do with serialization format updates and backwards 
compatibility. I suggest that we proceed to merge this, so that the code makes 
it into the release. We can still do potential refinements and fixes in the QA 
phase, in case they are needed.

@tzulitai if you agree, please feel free to merge this. +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6425) Integrate serializer reconfiguration into state restore flow to activate serializer upgrades

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999832#comment-15999832
 ] 

ASF GitHub Bot commented on FLINK-6425:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3834
  
I was already familiar with the high level design of this PR from my 
discussions with @tzulitai. As the feature freeze is already tomorrow, I had to 
focus my review on the functionality of the design (i.e. if all cases in 
backwards compatibility that we want to cover are possible and future plans 
like state transformation can base upon this work). I also had a look into a 
couple of important tests, but could not go through all the implementation 
details for now.

Overall, I think this is very good work and improves a ton of things w.r.t. 
everything that has to do with serialization format updates and backwards 
compatibility. I suggest that we proceed to merge this, so that the code makes 
it into the release. We can still do potential refinements and fixes in the QA 
phase, in case they are needed.

@tzulitai if you agree, please feel free to merge this. +1


> Integrate serializer reconfiguration into state restore flow to activate 
> serializer upgrades
> 
>
> Key: FLINK-6425
> URL: https://issues.apache.org/jira/browse/FLINK-6425
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> With FLINK-6191, {{TypeSerializer}} will be reconfigurable.
> From the state backends' point of view, serializer reconfiguration doubles as 
> a mechanism to determine how serializer upgrades should be handled.
> The general idea is that state checkpoints should contain the following as 
> the state's metainfo:
> - the previous serializer
> - snapshot of the previous serializer's configuration
> The upgrade flow is as follows:
> 1. On restore, try to deserialize the previous old serializer. 
> Deserialization may fail if a) the serializer no longer exists in classpath, 
> or b) the serializer class is not longer valid (i.e., implementation changed 
> and resulted in different serialVersionUID). In this case, use a dummy 
> serializer as a placeholder. This dummy serializer is currently the 
> {{ClassNotFoundProxySerializer}} in the code.
> 2. Deserialize the configuration snapshot of the previous old serializer. The 
> configuration snapshot must be successfully deserialized, otherwise the state 
> restore fails.
> 3. When we get the new registered serializer for the state (could be a 
> completely new serializer, the same serializer with different 
> implementations, or the exact same serializer untouched; either way they are 
> seen as a new serializer), we use the configuration snapshot of the old 
> serializer to reconfigure the new serializer.
> This completes the upgrade of the old serializer.  However, depending on the 
> result of the upgrade, state conversion needs to take place (for now, if 
> state conversion is required, we just fail the job as this functionality 
> isn't available yet). The results could be:
> - Compatible: restore success + serializer upgraded.
> - Compatible, but serialization schema changed: serializer upgraded but 
> requires state conversion, without the requirement that the old serializer 
> needs to be present.
> - Incompatible: serializer upgraded requires state conversion, but requires 
> the old serializer to be present (i.e., can not be the dummy 
> {{ClassNotFoundProxySerializer}}).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6425) Integrate serializer reconfiguration into state restore flow to activate serializer upgrades

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999826#comment-15999826
 ] 

ASF GitHub Bot commented on FLINK-6425:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3834#discussion_r115141563
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
 ---
@@ -161,7 +162,93 @@
 
public abstract int hashCode();
 
-   public boolean canRestoreFrom(TypeSerializer other) {
-   return equals(other);
+   // 

+   // Serializer configuration snapshotting & reconfiguring
+   // 

+
+   /**
+* Create a snapshot of the serializer's current configuration to be 
stored along with the managed state it is
+* registered to (if any - this method is only relevant if this 
serializer is registered for serialization of
+* managed state).
+*
+* The configuration snapshot should contain information about the 
serializer's parameter settings and its
+* serialization format. When a new serializer is registered to 
serialize the same managed state that this
+* serializer was registered to, the returned configuration snapshot 
can be used to check with the new serializer
+* if any data migration needs to take place.
+*
+* Implementations can also return the singleton {@link 
ForwardCompatibleSerializationFormatConfig#INSTANCE}
+* configuration if they guarantee forwards compatibility. For example, 
implementations that use serialization
+* frameworks with built-in serialization compatibility, such as https://thrift.apache.org/>Thrift or
+* https://developers.google.com/protocol-buffers/>Protobuf, is suitable 
for this usage pattern. By
+* returning the {@link 
ForwardCompatibleSerializationFormatConfig#INSTANCE}, this informs Flink that 
when managed
+* state serialized using this serializer is restored, there is no need 
to check for migration with the new
+* serializer for the same state. In other words, new serializers are 
always assumed to be fully compatible for the
+* serialized state.
+*
+* @see TypeSerializerConfigSnapshot
+* @see ForwardCompatibleSerializationFormatConfig
+*
+* @return snapshot of the serializer's current configuration.
+*/
+   public abstract TypeSerializerConfigSnapshot snapshotConfiguration();
+
+   /**
+* Get the migration strategy to use this serializer based on the 
configuration snapshot of a preceding
+* serializer that was registered for serialization of the same managed 
state (if any - this method is only
+* relevant if this serializer is registered for serialization of 
managed state).
+*
+* Implementations need to return the resolved migration strategy. 
The strategy can be one of the following:
+* 
+* {@link MigrationStrategy#noMigration()}: this signals Flink 
that this serializer is compatible, or
+* has been reconfigured to be compatible, to continue reading old 
data, and that the
+* serialization schema remains the same. No migration needs to be 
performed.
+*
+* {@link 
MigrationStrategy#migrateWithFallbackDeserializer(TypeSerializer)}: this 
signals Flink that
+* migration needs to be performed, because this serializer is not 
compatible, or cannot be reconfigured to be
+* compatible, for old data. Furthermore, in the case that the 
preceding serializer cannot be found or
+* restored to read the old data, the provided fallback 
deserializer can be used.
+*
+* {@link MigrationStrategy#migrate()}: this signals Flink that 
migration needs to be performed, because
+* this serializer is not compatible, or cannot be reconfigured to 
be compatible, for old data.
+* 
+*
+* This method is guaranteed to only be invoked if the preceding 
serializer's configuration snapshot is not the
+* singleton {@link 
ForwardCompatibleSerializationFormatConfig#INSTANCE} configuration. In such 
cases, Flink always
+* assume that the migration strategy is {@link 
MigrationStrategy#migrate()}.
+*
+* @see MigrationStrategy
+*
+* @param configSnapshot configuration snapshot of a preceding 
serializer for the same managed state
+*
+* @return the result of the reconfiguration.
+*/
+   protected abstract MigrationStrategy 
getMigrationStrategy(TypeSerializerConfigSnapshot configSnapshot);
 

[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999827#comment-15999827
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3522
  
I agree that we can change the Strings to some proper types later. Besides 
what I mentioned before, I just thought that even if a string is the underlying 
data type for the moment, having it wrapped in a named `Pointer` class to make 
code more readable and "typesafe".


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6425) Integrate serializer reconfiguration into state restore flow to activate serializer upgrades

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999828#comment-15999828
 ] 

ASF GitHub Bot commented on FLINK-6425:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3834#discussion_r115141585
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
 ---
@@ -161,7 +162,93 @@
 
public abstract int hashCode();
 
-   public boolean canRestoreFrom(TypeSerializer other) {
-   return equals(other);
+   // 

+   // Serializer configuration snapshotting & reconfiguring
+   // 

+
+   /**
+* Create a snapshot of the serializer's current configuration to be 
stored along with the managed state it is
+* registered to (if any - this method is only relevant if this 
serializer is registered for serialization of
+* managed state).
+*
+* The configuration snapshot should contain information about the 
serializer's parameter settings and its
+* serialization format. When a new serializer is registered to 
serialize the same managed state that this
+* serializer was registered to, the returned configuration snapshot 
can be used to check with the new serializer
+* if any data migration needs to take place.
+*
+* Implementations can also return the singleton {@link 
ForwardCompatibleSerializationFormatConfig#INSTANCE}
+* configuration if they guarantee forwards compatibility. For example, 
implementations that use serialization
+* frameworks with built-in serialization compatibility, such as https://thrift.apache.org/>Thrift or
+* https://developers.google.com/protocol-buffers/>Protobuf, is suitable 
for this usage pattern. By
+* returning the {@link 
ForwardCompatibleSerializationFormatConfig#INSTANCE}, this informs Flink that 
when managed
+* state serialized using this serializer is restored, there is no need 
to check for migration with the new
+* serializer for the same state. In other words, new serializers are 
always assumed to be fully compatible for the
+* serialized state.
+*
+* @see TypeSerializerConfigSnapshot
+* @see ForwardCompatibleSerializationFormatConfig
+*
+* @return snapshot of the serializer's current configuration.
+*/
+   public abstract TypeSerializerConfigSnapshot snapshotConfiguration();
+
+   /**
+* Get the migration strategy to use this serializer based on the 
configuration snapshot of a preceding
+* serializer that was registered for serialization of the same managed 
state (if any - this method is only
+* relevant if this serializer is registered for serialization of 
managed state).
+*
+* Implementations need to return the resolved migration strategy. 
The strategy can be one of the following:
+* 
+* {@link MigrationStrategy#noMigration()}: this signals Flink 
that this serializer is compatible, or
+* has been reconfigured to be compatible, to continue reading old 
data, and that the
+* serialization schema remains the same. No migration needs to be 
performed.
+*
+* {@link 
MigrationStrategy#migrateWithFallbackDeserializer(TypeSerializer)}: this 
signals Flink that
+* migration needs to be performed, because this serializer is not 
compatible, or cannot be reconfigured to be
+* compatible, for old data. Furthermore, in the case that the 
preceding serializer cannot be found or
+* restored to read the old data, the provided fallback 
deserializer can be used.
+*
+* {@link MigrationStrategy#migrate()}: this signals Flink that 
migration needs to be performed, because
+* this serializer is not compatible, or cannot be reconfigured to 
be compatible, for old data.
+* 
+*
+* This method is guaranteed to only be invoked if the preceding 
serializer's configuration snapshot is not the
+* singleton {@link 
ForwardCompatibleSerializationFormatConfig#INSTANCE} configuration. In such 
cases, Flink always
+* assume that the migration strategy is {@link 
MigrationStrategy#migrate()}.
+*
+* @see MigrationStrategy
+*
+* @param configSnapshot configuration snapshot of a preceding 
serializer for the same managed state
+*
+* @return the result of the reconfiguration.
+*/
+   protected abstract MigrationStrategy 
getMigrationStrategy(TypeSerializerConfigSnapshot configSnapshot);
 

[GitHub] flink pull request #3834: [FLINK-6425] [runtime] Activate serializer upgrade...

2017-05-07 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3834#discussion_r115141585
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
 ---
@@ -161,7 +162,93 @@
 
public abstract int hashCode();
 
-   public boolean canRestoreFrom(TypeSerializer other) {
-   return equals(other);
+   // 

+   // Serializer configuration snapshotting & reconfiguring
+   // 

+
+   /**
+* Create a snapshot of the serializer's current configuration to be 
stored along with the managed state it is
+* registered to (if any - this method is only relevant if this 
serializer is registered for serialization of
+* managed state).
+*
+* The configuration snapshot should contain information about the 
serializer's parameter settings and its
+* serialization format. When a new serializer is registered to 
serialize the same managed state that this
+* serializer was registered to, the returned configuration snapshot 
can be used to check with the new serializer
+* if any data migration needs to take place.
+*
+* Implementations can also return the singleton {@link 
ForwardCompatibleSerializationFormatConfig#INSTANCE}
+* configuration if they guarantee forwards compatibility. For example, 
implementations that use serialization
+* frameworks with built-in serialization compatibility, such as https://thrift.apache.org/>Thrift or
+* https://developers.google.com/protocol-buffers/>Protobuf, is suitable 
for this usage pattern. By
+* returning the {@link 
ForwardCompatibleSerializationFormatConfig#INSTANCE}, this informs Flink that 
when managed
+* state serialized using this serializer is restored, there is no need 
to check for migration with the new
+* serializer for the same state. In other words, new serializers are 
always assumed to be fully compatible for the
+* serialized state.
+*
+* @see TypeSerializerConfigSnapshot
+* @see ForwardCompatibleSerializationFormatConfig
+*
+* @return snapshot of the serializer's current configuration.
+*/
+   public abstract TypeSerializerConfigSnapshot snapshotConfiguration();
+
+   /**
+* Get the migration strategy to use this serializer based on the 
configuration snapshot of a preceding
+* serializer that was registered for serialization of the same managed 
state (if any - this method is only
+* relevant if this serializer is registered for serialization of 
managed state).
+*
+* Implementations need to return the resolved migration strategy. 
The strategy can be one of the following:
+* 
+* {@link MigrationStrategy#noMigration()}: this signals Flink 
that this serializer is compatible, or
+* has been reconfigured to be compatible, to continue reading old 
data, and that the
+* serialization schema remains the same. No migration needs to be 
performed.
+*
+* {@link 
MigrationStrategy#migrateWithFallbackDeserializer(TypeSerializer)}: this 
signals Flink that
+* migration needs to be performed, because this serializer is not 
compatible, or cannot be reconfigured to be
+* compatible, for old data. Furthermore, in the case that the 
preceding serializer cannot be found or
+* restored to read the old data, the provided fallback 
deserializer can be used.
+*
+* {@link MigrationStrategy#migrate()}: this signals Flink that 
migration needs to be performed, because
+* this serializer is not compatible, or cannot be reconfigured to 
be compatible, for old data.
+* 
+*
+* This method is guaranteed to only be invoked if the preceding 
serializer's configuration snapshot is not the
+* singleton {@link 
ForwardCompatibleSerializationFormatConfig#INSTANCE} configuration. In such 
cases, Flink always
+* assume that the migration strategy is {@link 
MigrationStrategy#migrate()}.
+*
+* @see MigrationStrategy
+*
+* @param configSnapshot configuration snapshot of a preceding 
serializer for the same managed state
+*
+* @return the result of the reconfiguration.
+*/
+   protected abstract MigrationStrategy 
getMigrationStrategy(TypeSerializerConfigSnapshot configSnapshot);
--- End diff --

Agreed. Refined the naming of this with 81dd0eb


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, 

[GitHub] flink issue #3522: [FLINK-5823] [checkpoints] State Backends also handle Che...

2017-05-07 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3522
  
I agree that we can change the Strings to some proper types later. Besides 
what I mentioned before, I just thought that even if a string is the underlying 
data type for the moment, having it wrapped in a named `Pointer` class to make 
code more readable and "typesafe".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6425) Integrate serializer reconfiguration into state restore flow to activate serializer upgrades

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999824#comment-15999824
 ] 

ASF GitHub Bot commented on FLINK-6425:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3834#discussion_r115141548
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1473,22 +1481,92 @@ void restore(Collection 
restoreStateHandles) throws Exception
protected  ColumnFamilyHandle getColumnFamily(
StateDescriptor descriptor, TypeSerializer 
namespaceSerializer) throws IOException {
 
-   Tuple2> stateInfo =
+   Tuple2> stateInfo =
kvStateInformation.get(descriptor.getName());
 
-   RegisteredBackendStateMetaInfo newMetaInfo = new 
RegisteredBackendStateMetaInfo<>(
-   descriptor.getType(),
-   descriptor.getName(),
-   namespaceSerializer,
-   descriptor.getSerializer());
+   RegisteredKeyedBackendStateMetaInfo newMetaInfo = new 
RegisteredKeyedBackendStateMetaInfo<>(
+   descriptor.getType(),
+   descriptor.getName(),
+   namespaceSerializer,
+   descriptor.getSerializer());
 
if (stateInfo != null) {
-   if (newMetaInfo.canRestoreFrom(stateInfo.f1)) {
+   // TODO with eager registration in place, these checks 
should be moved to restore()
+
+   RegisteredKeyedBackendStateMetaInfo.Snapshot 
restoredMetaInfo =
+   
restoredKvStateMetaInfos.get(descriptor.getName());
+
+   Preconditions.checkState(
+   
newMetaInfo.getName().equals(restoredMetaInfo.getName()),
+   "Incompatible state names. " +
+   "Was [" + restoredMetaInfo.getName() + 
"], " +
+   "registered with [" + 
newMetaInfo.getName() + "].");
+
+   if 
(!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)
+   && 
!restoredMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)) {
+
+   Preconditions.checkState(
+   
newMetaInfo.getStateType().equals(restoredMetaInfo.getStateType()),
+   "Incompatible state types. " +
+   "Was [" + 
restoredMetaInfo.getStateType() + "], " +
+   "registered with [" + 
newMetaInfo.getStateType() + "].");
+   }
+
+   // check serializer migration strategies to determine 
if state migration is required
+
+   boolean requireMigration = false;
+
+   // only check migration strategy if there is a restored 
configuration snapshot;
+   // there wouldn't be one if we were restored from an 
older version checkpoint,
+   // in which case we can only simply assume that 
migration is not required
+
+   if 
(restoredMetaInfo.getNamespaceSerializerConfigSnapshot() != null) {
+   MigrationStrategy namespaceMigrationStrategy 
= newMetaInfo.getNamespaceSerializer()
+   
.getMigrationStrategyFor(restoredMetaInfo.getNamespaceSerializerConfigSnapshot());
+
+   TypeSerializer finalOldNamespaceSerializer;
+   if 
(namespaceMigrationStrategy.requireMigration()) {
+   requireMigration = true;
+
+   if 
(namespaceMigrationStrategy.getFallbackDeserializer() != null) {
+   finalOldNamespaceSerializer = 
namespaceMigrationStrategy.getFallbackDeserializer();
+   } else if 
(restoredMetaInfo.getNamespaceSerializer() != null
+   && 
!(restoredMetaInfo.getNamespaceSerializer() instanceof 
MigrationNamespaceSerializerProxy)) {
+   finalOldNamespaceSerializer = 
restoredMetaInfo.getNamespaceSerializer();
+   } else {
+   throw new RuntimeException(
+   "State migration 
required, but there is no available serializer ca

[GitHub] flink pull request #3834: [FLINK-6425] [runtime] Activate serializer upgrade...

2017-05-07 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3834#discussion_r115141563
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
 ---
@@ -161,7 +162,93 @@
 
public abstract int hashCode();
 
-   public boolean canRestoreFrom(TypeSerializer other) {
-   return equals(other);
+   // 

+   // Serializer configuration snapshotting & reconfiguring
+   // 

+
+   /**
+* Create a snapshot of the serializer's current configuration to be 
stored along with the managed state it is
+* registered to (if any - this method is only relevant if this 
serializer is registered for serialization of
+* managed state).
+*
+* The configuration snapshot should contain information about the 
serializer's parameter settings and its
+* serialization format. When a new serializer is registered to 
serialize the same managed state that this
+* serializer was registered to, the returned configuration snapshot 
can be used to check with the new serializer
+* if any data migration needs to take place.
+*
+* Implementations can also return the singleton {@link 
ForwardCompatibleSerializationFormatConfig#INSTANCE}
+* configuration if they guarantee forwards compatibility. For example, 
implementations that use serialization
+* frameworks with built-in serialization compatibility, such as https://thrift.apache.org/>Thrift or
+* https://developers.google.com/protocol-buffers/>Protobuf, is suitable 
for this usage pattern. By
+* returning the {@link 
ForwardCompatibleSerializationFormatConfig#INSTANCE}, this informs Flink that 
when managed
+* state serialized using this serializer is restored, there is no need 
to check for migration with the new
+* serializer for the same state. In other words, new serializers are 
always assumed to be fully compatible for the
+* serialized state.
+*
+* @see TypeSerializerConfigSnapshot
+* @see ForwardCompatibleSerializationFormatConfig
+*
+* @return snapshot of the serializer's current configuration.
+*/
+   public abstract TypeSerializerConfigSnapshot snapshotConfiguration();
+
+   /**
+* Get the migration strategy to use this serializer based on the 
configuration snapshot of a preceding
+* serializer that was registered for serialization of the same managed 
state (if any - this method is only
+* relevant if this serializer is registered for serialization of 
managed state).
+*
+* Implementations need to return the resolved migration strategy. 
The strategy can be one of the following:
+* 
+* {@link MigrationStrategy#noMigration()}: this signals Flink 
that this serializer is compatible, or
+* has been reconfigured to be compatible, to continue reading old 
data, and that the
+* serialization schema remains the same. No migration needs to be 
performed.
+*
+* {@link 
MigrationStrategy#migrateWithFallbackDeserializer(TypeSerializer)}: this 
signals Flink that
+* migration needs to be performed, because this serializer is not 
compatible, or cannot be reconfigured to be
+* compatible, for old data. Furthermore, in the case that the 
preceding serializer cannot be found or
+* restored to read the old data, the provided fallback 
deserializer can be used.
+*
+* {@link MigrationStrategy#migrate()}: this signals Flink that 
migration needs to be performed, because
+* this serializer is not compatible, or cannot be reconfigured to 
be compatible, for old data.
+* 
+*
+* This method is guaranteed to only be invoked if the preceding 
serializer's configuration snapshot is not the
+* singleton {@link 
ForwardCompatibleSerializationFormatConfig#INSTANCE} configuration. In such 
cases, Flink always
+* assume that the migration strategy is {@link 
MigrationStrategy#migrate()}.
+*
+* @see MigrationStrategy
+*
+* @param configSnapshot configuration snapshot of a preceding 
serializer for the same managed state
+*
+* @return the result of the reconfiguration.
+*/
+   protected abstract MigrationStrategy 
getMigrationStrategy(TypeSerializerConfigSnapshot configSnapshot);
+
+   /**
+* Get the migration strategy to use this serializer based on the 
configuration snapshot of a preceding
+* serializer that was registered for serialization of the same managed 
state (if any - this method is only
+

[jira] [Commented] (FLINK-6425) Integrate serializer reconfiguration into state restore flow to activate serializer upgrades

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999823#comment-15999823
 ] 

ASF GitHub Bot commented on FLINK-6425:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3834#discussion_r115141541
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1473,22 +1481,92 @@ void restore(Collection 
restoreStateHandles) throws Exception
protected  ColumnFamilyHandle getColumnFamily(
StateDescriptor descriptor, TypeSerializer 
namespaceSerializer) throws IOException {
 
-   Tuple2> stateInfo =
+   Tuple2> stateInfo =
kvStateInformation.get(descriptor.getName());
 
-   RegisteredBackendStateMetaInfo newMetaInfo = new 
RegisteredBackendStateMetaInfo<>(
-   descriptor.getType(),
-   descriptor.getName(),
-   namespaceSerializer,
-   descriptor.getSerializer());
+   RegisteredKeyedBackendStateMetaInfo newMetaInfo = new 
RegisteredKeyedBackendStateMetaInfo<>(
+   descriptor.getType(),
+   descriptor.getName(),
+   namespaceSerializer,
+   descriptor.getSerializer());
 
if (stateInfo != null) {
-   if (newMetaInfo.canRestoreFrom(stateInfo.f1)) {
+   // TODO with eager registration in place, these checks 
should be moved to restore()
+
+   RegisteredKeyedBackendStateMetaInfo.Snapshot 
restoredMetaInfo =
+   
restoredKvStateMetaInfos.get(descriptor.getName());
+
+   Preconditions.checkState(
+   
newMetaInfo.getName().equals(restoredMetaInfo.getName()),
+   "Incompatible state names. " +
+   "Was [" + restoredMetaInfo.getName() + 
"], " +
+   "registered with [" + 
newMetaInfo.getName() + "].");
+
+   if 
(!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)
+   && 
!restoredMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)) {
+
+   Preconditions.checkState(
+   
newMetaInfo.getStateType().equals(restoredMetaInfo.getStateType()),
+   "Incompatible state types. " +
+   "Was [" + 
restoredMetaInfo.getStateType() + "], " +
+   "registered with [" + 
newMetaInfo.getStateType() + "].");
+   }
+
+   // check serializer migration strategies to determine 
if state migration is required
+
+   boolean requireMigration = false;
+
+   // only check migration strategy if there is a restored 
configuration snapshot;
+   // there wouldn't be one if we were restored from an 
older version checkpoint,
+   // in which case we can only simply assume that 
migration is not required
+
+   if 
(restoredMetaInfo.getNamespaceSerializerConfigSnapshot() != null) {
+   MigrationStrategy namespaceMigrationStrategy 
= newMetaInfo.getNamespaceSerializer()
+   
.getMigrationStrategyFor(restoredMetaInfo.getNamespaceSerializerConfigSnapshot());
+
+   TypeSerializer finalOldNamespaceSerializer;
+   if 
(namespaceMigrationStrategy.requireMigration()) {
+   requireMigration = true;
+
+   if 
(namespaceMigrationStrategy.getFallbackDeserializer() != null) {
+   finalOldNamespaceSerializer = 
namespaceMigrationStrategy.getFallbackDeserializer();
+   } else if 
(restoredMetaInfo.getNamespaceSerializer() != null
+   && 
!(restoredMetaInfo.getNamespaceSerializer() instanceof 
MigrationNamespaceSerializerProxy)) {
+   finalOldNamespaceSerializer = 
restoredMetaInfo.getNamespaceSerializer();
+   } else {
+   throw new RuntimeException(
+   "State migration 
required, but there is no available serializer ca

[GitHub] flink pull request #3834: [FLINK-6425] [runtime] Activate serializer upgrade...

2017-05-07 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3834#discussion_r115141548
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1473,22 +1481,92 @@ void restore(Collection 
restoreStateHandles) throws Exception
protected  ColumnFamilyHandle getColumnFamily(
StateDescriptor descriptor, TypeSerializer 
namespaceSerializer) throws IOException {
 
-   Tuple2> stateInfo =
+   Tuple2> stateInfo =
kvStateInformation.get(descriptor.getName());
 
-   RegisteredBackendStateMetaInfo newMetaInfo = new 
RegisteredBackendStateMetaInfo<>(
-   descriptor.getType(),
-   descriptor.getName(),
-   namespaceSerializer,
-   descriptor.getSerializer());
+   RegisteredKeyedBackendStateMetaInfo newMetaInfo = new 
RegisteredKeyedBackendStateMetaInfo<>(
+   descriptor.getType(),
+   descriptor.getName(),
+   namespaceSerializer,
+   descriptor.getSerializer());
 
if (stateInfo != null) {
-   if (newMetaInfo.canRestoreFrom(stateInfo.f1)) {
+   // TODO with eager registration in place, these checks 
should be moved to restore()
+
+   RegisteredKeyedBackendStateMetaInfo.Snapshot 
restoredMetaInfo =
+   
restoredKvStateMetaInfos.get(descriptor.getName());
+
+   Preconditions.checkState(
+   
newMetaInfo.getName().equals(restoredMetaInfo.getName()),
+   "Incompatible state names. " +
+   "Was [" + restoredMetaInfo.getName() + 
"], " +
+   "registered with [" + 
newMetaInfo.getName() + "].");
+
+   if 
(!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)
+   && 
!restoredMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)) {
+
+   Preconditions.checkState(
+   
newMetaInfo.getStateType().equals(restoredMetaInfo.getStateType()),
+   "Incompatible state types. " +
+   "Was [" + 
restoredMetaInfo.getStateType() + "], " +
+   "registered with [" + 
newMetaInfo.getStateType() + "].");
+   }
+
+   // check serializer migration strategies to determine 
if state migration is required
+
+   boolean requireMigration = false;
+
+   // only check migration strategy if there is a restored 
configuration snapshot;
+   // there wouldn't be one if we were restored from an 
older version checkpoint,
+   // in which case we can only simply assume that 
migration is not required
+
+   if 
(restoredMetaInfo.getNamespaceSerializerConfigSnapshot() != null) {
+   MigrationStrategy namespaceMigrationStrategy 
= newMetaInfo.getNamespaceSerializer()
+   
.getMigrationStrategyFor(restoredMetaInfo.getNamespaceSerializerConfigSnapshot());
+
+   TypeSerializer finalOldNamespaceSerializer;
+   if 
(namespaceMigrationStrategy.requireMigration()) {
+   requireMigration = true;
+
+   if 
(namespaceMigrationStrategy.getFallbackDeserializer() != null) {
+   finalOldNamespaceSerializer = 
namespaceMigrationStrategy.getFallbackDeserializer();
+   } else if 
(restoredMetaInfo.getNamespaceSerializer() != null
+   && 
!(restoredMetaInfo.getNamespaceSerializer() instanceof 
MigrationNamespaceSerializerProxy)) {
+   finalOldNamespaceSerializer = 
restoredMetaInfo.getNamespaceSerializer();
+   } else {
+   throw new RuntimeException(
+   "State migration 
required, but there is no available serializer capable of reading previous 
namespace.");
+   }
+   }
+   }
+
+   if (restoredMetaInfo.getStateSerializerConfigSnapshot() 
!= null) {
+  

[GitHub] flink pull request #3834: [FLINK-6425] [runtime] Activate serializer upgrade...

2017-05-07 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3834#discussion_r115141541
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1473,22 +1481,92 @@ void restore(Collection 
restoreStateHandles) throws Exception
protected  ColumnFamilyHandle getColumnFamily(
StateDescriptor descriptor, TypeSerializer 
namespaceSerializer) throws IOException {
 
-   Tuple2> stateInfo =
+   Tuple2> stateInfo =
kvStateInformation.get(descriptor.getName());
 
-   RegisteredBackendStateMetaInfo newMetaInfo = new 
RegisteredBackendStateMetaInfo<>(
-   descriptor.getType(),
-   descriptor.getName(),
-   namespaceSerializer,
-   descriptor.getSerializer());
+   RegisteredKeyedBackendStateMetaInfo newMetaInfo = new 
RegisteredKeyedBackendStateMetaInfo<>(
+   descriptor.getType(),
+   descriptor.getName(),
+   namespaceSerializer,
+   descriptor.getSerializer());
 
if (stateInfo != null) {
-   if (newMetaInfo.canRestoreFrom(stateInfo.f1)) {
+   // TODO with eager registration in place, these checks 
should be moved to restore()
+
+   RegisteredKeyedBackendStateMetaInfo.Snapshot 
restoredMetaInfo =
+   
restoredKvStateMetaInfos.get(descriptor.getName());
+
+   Preconditions.checkState(
+   
newMetaInfo.getName().equals(restoredMetaInfo.getName()),
+   "Incompatible state names. " +
+   "Was [" + restoredMetaInfo.getName() + 
"], " +
+   "registered with [" + 
newMetaInfo.getName() + "].");
+
+   if 
(!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)
+   && 
!restoredMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)) {
+
+   Preconditions.checkState(
+   
newMetaInfo.getStateType().equals(restoredMetaInfo.getStateType()),
+   "Incompatible state types. " +
+   "Was [" + 
restoredMetaInfo.getStateType() + "], " +
+   "registered with [" + 
newMetaInfo.getStateType() + "].");
+   }
+
+   // check serializer migration strategies to determine 
if state migration is required
+
+   boolean requireMigration = false;
+
+   // only check migration strategy if there is a restored 
configuration snapshot;
+   // there wouldn't be one if we were restored from an 
older version checkpoint,
+   // in which case we can only simply assume that 
migration is not required
+
+   if 
(restoredMetaInfo.getNamespaceSerializerConfigSnapshot() != null) {
+   MigrationStrategy namespaceMigrationStrategy 
= newMetaInfo.getNamespaceSerializer()
+   
.getMigrationStrategyFor(restoredMetaInfo.getNamespaceSerializerConfigSnapshot());
+
+   TypeSerializer finalOldNamespaceSerializer;
+   if 
(namespaceMigrationStrategy.requireMigration()) {
+   requireMigration = true;
+
+   if 
(namespaceMigrationStrategy.getFallbackDeserializer() != null) {
+   finalOldNamespaceSerializer = 
namespaceMigrationStrategy.getFallbackDeserializer();
+   } else if 
(restoredMetaInfo.getNamespaceSerializer() != null
+   && 
!(restoredMetaInfo.getNamespaceSerializer() instanceof 
MigrationNamespaceSerializerProxy)) {
+   finalOldNamespaceSerializer = 
restoredMetaInfo.getNamespaceSerializer();
+   } else {
+   throw new RuntimeException(
+   "State migration 
required, but there is no available serializer capable of reading previous 
namespace.");
+   }
+   }
+   }
+
+   if (restoredMetaInfo.getStateSerializerConfigSnapshot() 
!= null) {
+  

[jira] [Commented] (FLINK-6257) Post-pass OVER windows

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999819#comment-15999819
 ] 

ASF GitHub Bot commented on FLINK-6257:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3697


> Post-pass OVER windows
> --
>
> Key: FLINK-6257
> URL: https://issues.apache.org/jira/browse/FLINK-6257
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>Priority: Critical
>
> The OVER windows have been implemented by several contributors.
> We should do a post pass over the contributed code and improve a few things.
> * Functionality
> ** Currently every time attribute is allowed as ORDER BY attribute. We must 
> check that this is actually a time indicator ({{procTime()}}, {{rowTime()}}) 
> and that the order is ASCENDING.
> * Documentation
> ** Add documentation for OVER windows
> * Code style
> ** Consistent naming of {{ProcessFunctions}} and methods
> * Tests
> ** Move the OVER window tests out of SqlITCase into a dedicated class
> ** Move the OVER window tests out of WindowAggregateTest into a dedicated 
> class
> ** Add tests based on the test harness for all {{ProcessFunctions}} similar 
> to {{BoundedProcessingOverRangeProcessFunction}}. The tests should include 
> exact boundary checks for range windows and check for proper parallelization 
> with multiple keys.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3697: [FLINK-6257][table]Optimize test cases

2017-05-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3697


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3793: flink-6033 Support UNNEST query in the stream SQL ...

2017-05-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3793


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999811#comment-15999811
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3522
  
Here is a summary of why I picked a String initially for the metadata 
persistence location:

> The idea of the pointer is to have a state-backend independent string 
that indicates a checkpoint location. I picked a string, because the location 
is ultimately passed as a string, either from the user command flink run -s 
 or from ZooKeeper' "last completed checkpoint" entry.
> 
> Introducing the StreamHandleAndPointer here spares the code from having 
to reason about state backend specific file handles. We can change that at some 
point, but for the breadth of changes already in this PR, this helped reduce 
the scope a bit.


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3522: [FLINK-5823] [checkpoints] State Backends also handle Che...

2017-05-07 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3522
  
Here is a summary of why I picked a String initially for the metadata 
persistence location:

> The idea of the pointer is to have a state-backend independent string 
that indicates a checkpoint location. I picked a string, because the location 
is ultimately passed as a string, either from the user command flink run -s 
 or from ZooKeeper' "last completed checkpoint" entry.
> 
> Introducing the StreamHandleAndPointer here spares the code from having 
to reason about state backend specific file handles. We can change that at some 
point, but for the breadth of changes already in this PR, this helped reduce 
the scope a bit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3522: [FLINK-5823] [checkpoints] State Backends also handle Che...

2017-05-07 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3522
  
I would like to merge this for 1.3 with a slightly reduced scope.

I would include the refactoring to make the State Backends also responsible 
for storing the Metadata.

I would NOT merge the started work about cleanup hooks. The reason for that 
is that after some digging through this, I found we actually have to treat 
different file systems quite differently. For example for HDFS and posix-style 
file systems, we want to dispose the checkpoint-exclusive state via a `rm -r`.
However, for S3 we explicitly do not want to do that, but issue DELETE 
commands for each state object independently, because Flink has a more 
consistent view on what objects exist for a checkpoint than S3 has. Also, 
because a `rm -r` on the S3 file system means "list objects with path prefix; 
foreach object: delete" which is even worse.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >