[GitHub] [flink] sunhaibotb edited a comment on issue #9622: [FLINK-13516][test] Bump MiniKdc to 3.2.0 to fix the failure of YARNSessionFIFOSecuredITCase on Java 11

2019-09-05 Thread GitBox
sunhaibotb edited a comment on issue #9622: [FLINK-13516][test] Bump MiniKdc to 
3.2.0 to fix the failure of YARNSessionFIFOSecuredITCase on Java 11
URL: https://github.com/apache/flink/pull/9622#issuecomment-528714281
 
 
   I used Java 11  and the Hadoop versions explicitly supported by Flink to 
verify that the following modules, which depend on MiniKdc, work well.
   
   - flink-yarn-tests
   - flink-connectors/flink-connector-filesystem
   - flink-connectors/flink-connector-kafka-0.9
   - flink-connectors/flink-connector-kafka-base
   - flink-test-utils-parent/flink-test-utils
   
   @zentol 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunhaibotb commented on issue #9622: [FLINK-13516][test] Bump MiniKdc to 3.2.0 to fix the failure of YARNSessionFIFOSecuredITCase on Java 11

2019-09-05 Thread GitBox
sunhaibotb commented on issue #9622: [FLINK-13516][test] Bump MiniKdc to 3.2.0 
to fix the failure of YARNSessionFIFOSecuredITCase on Java 11
URL: https://github.com/apache/flink/pull/9622#issuecomment-528714281
 
 
   I used Java 11  and the Hadoop versions explicitly supported by Flink to 
verify that the following modules, which depend on MiniKdc, work well.
   
   - flink-yarn-tests
   - flink-connectors/flink-connector-filesystem
   - flink-connectors/flink-connector-kafka-0.9
   - flink-connectors/flink-connector-kafka-base
   - flink-test-utils-parent/flink-test-utils


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] walterddr commented on issue #8468: [FLINK-12399][table] Adding PushDownTableSource interface to fix FilterableTableSource applyPredicate problem

2019-09-05 Thread GitBox
walterddr commented on issue #8468: [FLINK-12399][table] Adding 
PushDownTableSource interface to fix FilterableTableSource applyPredicate 
problem
URL: https://github.com/apache/flink/pull/8468#issuecomment-528701147
 
 
   thanks folks for the help and review :-) I will update the PR and 
incorporate the comments soon


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yangjf2019 commented on issue #9591: [FLINK-13937][docs] Fix the error of the hive connector dependency ve…

2019-09-05 Thread GitBox
yangjf2019 commented on issue #9591: [FLINK-13937][docs] Fix the error of the 
hive connector dependency ve…
URL: https://github.com/apache/flink/pull/9591#issuecomment-528692966
 
 
   Hi, @bowenli86 @wuchong Thanks for your helping, and I will work hard in 
community.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-13973) Checkpoint recovery failed after user set uidHash

2019-09-05 Thread apus66 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923900#comment-16923900
 ] 

apus66 edited comment on FLINK-13973 at 9/6/19 3:26 AM:


I've solved the problem. I have already tested in the production environment. I 
want to submit my code to the official version. I want to be assigned this task.
Thanks.


was (Author: apus66):
I've solved the problem. I have already tested in the production environment. I 
want to submit my code to the official version. Could you assign this task to 
me?
Thanks.

> Checkpoint recovery failed after user set uidHash
> -
>
> Key: FLINK-13973
> URL: https://issues.apache.org/jira/browse/FLINK-13973
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.8.0, 1.8.1, 1.9.0
>Reporter: Zhouyu Pei
>Priority: Major
> Attachments: checkpoint-ex.jpg
>
>
> Checkpoint recovery failed after user set uidHash, the possible reasons are 
> as follows:
> If altOperatorID is not null, operatorState will be obtained by altOperatorID 
> and will not be given



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13973) Checkpoint recovery failed after user set uidHash

2019-09-05 Thread apus66 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923900#comment-16923900
 ] 

apus66 commented on FLINK-13973:


I've solved the problem. I have already tested in the production environment. I 
want to submit my code to the official version. Could you assign this task to 
me?
Thanks.

> Checkpoint recovery failed after user set uidHash
> -
>
> Key: FLINK-13973
> URL: https://issues.apache.org/jira/browse/FLINK-13973
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.8.0, 1.8.1, 1.9.0
>Reporter: Zhouyu Pei
>Priority: Major
> Attachments: checkpoint-ex.jpg
>
>
> Checkpoint recovery failed after user set uidHash, the possible reasons are 
> as follows:
> If altOperatorID is not null, operatorState will be obtained by altOperatorID 
> and will not be given



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[GitHub] [flink] flinkbot edited a comment on issue #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-05 Thread GitBox
flinkbot edited a comment on issue #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#issuecomment-513425405
 
 
   
   ## CI report:
   
   * bd7ade5e0b57dc8577d7f864afcbbb24c2513e56 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/119869757)
   * 6a187929b931a4bd8cd7dbd0ec3d2c5a7a98278d : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/121411219)
   * 4f2afd322f96aeaba6d9c0b67a82a051eff22df0 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/121723032)
   * c4fc6905d3adf3ad9ff6f58c5d4f472fdfa7d52b : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/121724039)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-13591) 'Completed Job List' in Flink web doesn't display right when job name is very long

2019-09-05 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-13591.
--
Resolution: Fixed

merged in master: 60cf41ab6b9309979849e3310b4c32cf528a1568

> 'Completed Job List' in Flink web doesn't display right when job name is very 
> long
> --
>
> Key: FLINK-13591
> URL: https://issues.apache.org/jira/browse/FLINK-13591
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend
>Affects Versions: 1.9.0
>Reporter: Kurt Young
>Assignee: Yadong Xie
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.10.0
>
> Attachments: 10_57_07__08_06_2019.jpg
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> !10_57_07__08_06_2019.jpg!



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[GitHub] [flink] KurtYoung merged pull request #9621: [FLINK-13591][web]: fix job list display when job name is too long

2019-09-05 Thread GitBox
KurtYoung merged pull request #9621: [FLINK-13591][web]: fix job list display 
when job name is too long
URL: https://github.com/apache/flink/pull/9621
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on issue #9621: [FLINK-13591][web]: fix job list display when job name is too long

2019-09-05 Thread GitBox
KurtYoung commented on issue #9621: [FLINK-13591][web]: fix job list display 
when job name is too long
URL: https://github.com/apache/flink/pull/9621#issuecomment-528688511
 
 
   +1, thanks for fixing this


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-13952) PartitionableTableSink can not work with OverwritableTableSink

2019-09-05 Thread Bowen Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li closed FLINK-13952.

Resolution: Fixed

master: d32af521cbe83f88cd0b822c4d752a1b5102c47c

> PartitionableTableSink can not work with OverwritableTableSink
> --
>
> Key: FLINK-13952
> URL: https://issues.apache.org/jira/browse/FLINK-13952
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Jingsong Lee
>Assignee: Rui Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> {code:java}
> tableSink match {
>   case partitionableSink: PartitionableTableSink
> if partitionableSink.getPartitionFieldNames != null
>   && partitionableSink.getPartitionFieldNames.nonEmpty =>
> partitionableSink.setStaticPartition(insertOptions.staticPartitions)
>   case overwritableTableSink: OverwritableTableSink =>
> overwritableTableSink.setOverwrite(insertOptions.overwrite)
> {code}
> Code in TableEnvImpl and PlannerBase
> overwrite will not be invoked when there are static partition columns.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[GitHub] [flink] flinkbot edited a comment on issue #9555: [FLINK-13868][REST] Job vertex add taskmanager id in rest api

2019-09-05 Thread GitBox
flinkbot edited a comment on issue #9555: [FLINK-13868][REST] Job vertex add 
taskmanager id in rest api
URL: https://github.com/apache/flink/pull/9555#issuecomment-526075111
 
 
   
   ## CI report:
   
   * 64f02281954e69bcfbe858eb40b8f1846cfbe195 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/125035687)
   * 5d3d03dffbaf00112a31663caaba3558886cd373 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/125055694)
   * fc91d03d717971e53abc68ba8a93a1e456fc6608 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/125176326)
   * 45d72e4d09fdbf4650077d90897aaadad67ca1fe : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/125588643)
   * 125c6a2e91f2015e029306c1c1dee741225b3868 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/126021253)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13986) Clean-up of legacy mode

2019-09-05 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song updated FLINK-13986:
-
Description: 
* Fix / update / remove test cases for legacy mode
 * Deprecate / remove legacy config options.
 * Remove legacy code paths
 * Remove the switch for legacy / new mode.

> Clean-up of legacy mode
> ---
>
> Key: FLINK-13986
> URL: https://issues.apache.org/jira/browse/FLINK-13986
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Xintong Song
>Priority: Major
>
> * Fix / update / remove test cases for legacy mode
>  * Deprecate / remove legacy config options.
>  * Remove legacy code paths
>  * Remove the switch for legacy / new mode.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Assigned] (FLINK-13952) PartitionableTableSink can not work with OverwritableTableSink

2019-09-05 Thread Bowen Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li reassigned FLINK-13952:


Assignee: Rui Li

> PartitionableTableSink can not work with OverwritableTableSink
> --
>
> Key: FLINK-13952
> URL: https://issues.apache.org/jira/browse/FLINK-13952
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Jingsong Lee
>Assignee: Rui Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> {code:java}
> tableSink match {
>   case partitionableSink: PartitionableTableSink
> if partitionableSink.getPartitionFieldNames != null
>   && partitionableSink.getPartitionFieldNames.nonEmpty =>
> partitionableSink.setStaticPartition(insertOptions.staticPartitions)
>   case overwritableTableSink: OverwritableTableSink =>
> overwritableTableSink.setOverwrite(insertOptions.overwrite)
> {code}
> Code in TableEnvImpl and PlannerBase
> overwrite will not be invoked when there are static partition columns.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[GitHub] [flink] asfgit closed pull request #9615: [FLINK-13952][table-planner][hive] PartitionableTableSink can not wor…

2019-09-05 Thread GitBox
asfgit closed pull request #9615: [FLINK-13952][table-planner][hive] 
PartitionableTableSink can not wor…
URL: https://github.com/apache/flink/pull/9615
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13985) Use native memory for managed memory.

2019-09-05 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song updated FLINK-13985:
-
Description: 
* Allocate memory with {{Unsafe.allocateMemory}}
 ** {{MemoryManager}}

Implement this issue in common code paths for the legacy / new mode. This 
should only affect the GC behavior.

> Use native memory for managed memory.
> -
>
> Key: FLINK-13985
> URL: https://issues.apache.org/jira/browse/FLINK-13985
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Xintong Song
>Priority: Major
>
> * Allocate memory with {{Unsafe.allocateMemory}}
>  ** {{MemoryManager}}
> Implement this issue in common code paths for the legacy / new mode. This 
> should only affect the GC behavior.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (FLINK-13983) Launch task executor with new memory calculation logics

2019-09-05 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song updated FLINK-13983:
-
Description: 
* Invoke data structures and utilities introduced in Step 2 to generate JVM 
parameters and dynamic configurations for launching new task executors.
 ** In startup scripts
 ** In resource managers
 * Task executor uses data structures and utilities introduced in Step 2 to set 
memory pool sizes and slot resource profiles.
 ** {{MemoryManager}}
 ** {{ShuffleEnvironment}}
 ** {{TaskSlotTable}}

Implement this step as separate code paths only for the new mode.

> Launch task executor with new memory calculation logics
> ---
>
> Key: FLINK-13983
> URL: https://issues.apache.org/jira/browse/FLINK-13983
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Xintong Song
>Priority: Major
>
> * Invoke data structures and utilities introduced in Step 2 to generate JVM 
> parameters and dynamic configurations for launching new task executors.
>  ** In startup scripts
>  ** In resource managers
>  * Task executor uses data structures and utilities introduced in Step 2 to 
> set memory pool sizes and slot resource profiles.
>  ** {{MemoryManager}}
>  ** {{ShuffleEnvironment}}
>  ** {{TaskSlotTable}}
> Implement this step as separate code paths only for the new mode.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (FLINK-13984) Separate on-heap and off-heap managed memory pools

2019-09-05 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song updated FLINK-13984:
-
Description: 
* Update {{MemoryManager}} to have two separated pools.
 * Extend {{MemoryManager}} interfaces to specify which pool to allocate memory 
from.

Implement this step in common code paths for the legacy / new mode. For the 
legacy mode, depending to the configured memory type, we can set one of the two 
pools to the managed memory size and always allocate from this pool, leaving 
the other pool empty.

> Separate on-heap and off-heap managed memory pools
> --
>
> Key: FLINK-13984
> URL: https://issues.apache.org/jira/browse/FLINK-13984
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Xintong Song
>Priority: Major
>
> * Update {{MemoryManager}} to have two separated pools.
>  * Extend {{MemoryManager}} interfaces to specify which pool to allocate 
> memory from.
> Implement this step in common code paths for the legacy / new mode. For the 
> legacy mode, depending to the configured memory type, we can set one of the 
> two pools to the managed memory size and always allocate from this pool, 
> leaving the other pool empty.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[GitHub] [flink] bowenli86 commented on issue #9615: [FLINK-13952][table-planner][hive] PartitionableTableSink can not wor…

2019-09-05 Thread GitBox
bowenli86 commented on issue #9615: [FLINK-13952][table-planner][hive] 
PartitionableTableSink can not wor…
URL: https://github.com/apache/flink/pull/9615#issuecomment-528687096
 
 
   LGTM. merging


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13981) Introduce a switch for enabling the new task executor memory configurations

2019-09-05 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song updated FLINK-13981:
-
Description: Introduce a temporal config option as a switch between the 
current / new task executor memory configuration code paths. This allows us to 
implement and test the new code paths without affect the existing code paths 
and behaviors.

> Introduce a switch for enabling the new task executor memory configurations
> ---
>
> Key: FLINK-13981
> URL: https://issues.apache.org/jira/browse/FLINK-13981
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Xintong Song
>Priority: Major
>
> Introduce a temporal config option as a switch between the current / new task 
> executor memory configuration code paths. This allows us to implement and 
> test the new code paths without affect the existing code paths and behaviors.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (FLINK-13982) Implement memory calculation logics

2019-09-05 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song updated FLINK-13982:
-
Description: 
* Introduce new configuration options
 * Introduce data structures and utilities.
 ** Data structure to store memory / pool sizes of task executor
 ** Utility for calculating memory / pool sizes from configuration
 ** Utility for generating dynamic configurations
 ** Utility for generating JVM parameters

This step should not introduce any behavior changes.

> Implement memory calculation logics
> ---
>
> Key: FLINK-13982
> URL: https://issues.apache.org/jira/browse/FLINK-13982
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Xintong Song
>Priority: Major
>
> * Introduce new configuration options
>  * Introduce data structures and utilities.
>  ** Data structure to store memory / pool sizes of task executor
>  ** Utility for calculating memory / pool sizes from configuration
>  ** Utility for generating dynamic configurations
>  ** Utility for generating JVM parameters
> This step should not introduce any behavior changes.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13983) Launch task executor with new memory calculation logics

2019-09-05 Thread Xintong Song (Jira)
Xintong Song created FLINK-13983:


 Summary: Launch task executor with new memory calculation logics
 Key: FLINK-13983
 URL: https://issues.apache.org/jira/browse/FLINK-13983
 Project: Flink
  Issue Type: Sub-task
Reporter: Xintong Song






--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13984) Separate on-heap and off-heap managed memory pools

2019-09-05 Thread Xintong Song (Jira)
Xintong Song created FLINK-13984:


 Summary: Separate on-heap and off-heap managed memory pools
 Key: FLINK-13984
 URL: https://issues.apache.org/jira/browse/FLINK-13984
 Project: Flink
  Issue Type: Sub-task
Reporter: Xintong Song






--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13986) Clean-up of legacy mode

2019-09-05 Thread Xintong Song (Jira)
Xintong Song created FLINK-13986:


 Summary: Clean-up of legacy mode
 Key: FLINK-13986
 URL: https://issues.apache.org/jira/browse/FLINK-13986
 Project: Flink
  Issue Type: Sub-task
Reporter: Xintong Song






--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13985) Use native memory for managed memory.

2019-09-05 Thread Xintong Song (Jira)
Xintong Song created FLINK-13985:


 Summary: Use native memory for managed memory.
 Key: FLINK-13985
 URL: https://issues.apache.org/jira/browse/FLINK-13985
 Project: Flink
  Issue Type: Sub-task
Reporter: Xintong Song






--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13982) Implement memory calculation logics

2019-09-05 Thread Xintong Song (Jira)
Xintong Song created FLINK-13982:


 Summary: Implement memory calculation logics
 Key: FLINK-13982
 URL: https://issues.apache.org/jira/browse/FLINK-13982
 Project: Flink
  Issue Type: Sub-task
Reporter: Xintong Song






--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13981) Introduce a switch for enabling the new task executor memory configurations

2019-09-05 Thread Xintong Song (Jira)
Xintong Song created FLINK-13981:


 Summary: Introduce a switch for enabling the new task executor 
memory configurations
 Key: FLINK-13981
 URL: https://issues.apache.org/jira/browse/FLINK-13981
 Project: Flink
  Issue Type: Sub-task
Reporter: Xintong Song






--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13980) FLIP-49 Unified Memory Configuration for TaskExecutors

2019-09-05 Thread Xintong Song (Jira)
Xintong Song created FLINK-13980:


 Summary: FLIP-49 Unified Memory Configuration for TaskExecutors
 Key: FLINK-13980
 URL: https://issues.apache.org/jira/browse/FLINK-13980
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Configuration, Runtime / Coordination
Affects Versions: 1.9.0
Reporter: Xintong Song
 Fix For: 1.10.0


This is the umbrella issue of 'FLIP-49: Unified Memory Configuration for 
TaskExecutors'.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[GitHub] [flink] KurtYoung commented on issue #8468: [FLINK-12399][table] Adding PushDownTableSource interface to fix FilterableTableSource applyPredicate problem

2019-09-05 Thread GitBox
KurtYoung commented on issue #8468: [FLINK-12399][table] Adding 
PushDownTableSource interface to fix FilterableTableSource applyPredicate 
problem
URL: https://github.com/apache/flink/pull/8468#issuecomment-52868
 
 
   @walterddr sounds good to me.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-13808) Checkpoints expired by timeout may leak RocksDB files

2019-09-05 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923874#comment-16923874
 ] 

Congxian Qiu(klion26) commented on FLINK-13808:
---

After analysis with [~Caesar], this issue is caused by the IO/Network problem.

The local directory will just be deleted on such below scenarios:
 * snapshot failed
 * snapshot succeed (when local recovery disabled)
 * have received a newer complete checkpoint(when local recovery enabled)

For this issue, the snapshot is still ongoing (uploading the sst files) when 
observed there are some files leaked. so the local directory would not be 
deleted.

I think FLINK-8871 help for this issue,
Aside from FLINK-8871, I want to propose such improvements:
 * just keep ${{maxConcurrentCheckpoint}} snapshot on TM side, which means if 
we have {{maxConcurrentCheckpoint}} is 2, and the current checkpoint 5, we'll 
cancel all the checkpoint before 4 (maybe the complete/cancel RPC message is 
late)
 * add some debug/trace log to track the steps of the snapshot on tm side, so 
users can know where is snapshot currently is on

[~StephanEwen] [~carp84] What do you think about the above two improvements, if 
this is ok, I'll file issues and contribute them.

> Checkpoints expired by timeout may leak RocksDB files
> -
>
> Key: FLINK-13808
> URL: https://issues.apache.org/jira/browse/FLINK-13808
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.8.0, 1.8.1
> Environment: So far only reliably reproducible on a 4-node cluster 
> with parallelism ≥ 100. But do try 
> https://github.com/jcaesar/flink-rocksdb-file-leak
>Reporter: Julius Michaelis
>Priority: Minor
>
> A RocksDB state backend with HDFS checkpoints, with or without local 
> recovery, may leak files in {{io.tmp.dirs}} on checkpoint expiry by timeout.
> If the size of a checkpoint crosses what can be transferred during one 
> checkpoint timeout, checkpoints will continue to fail forever. If this is 
> combined with a quick rollover of SST files (e.g. due to a high density of 
> writes), this may quickly exhaust available disk space (or memory, as /tmp is 
> the default location).
> As a workaround, the jobmanager's REST API can be frequently queried for 
> failed checkpoints, and associated files deleted accordingly.
> I've tried investing the cause a little bit, but I'm stuck:
>  * {{Checkpoint 19 of job ac7efce3457d9d73b0a4f775a6ef46f8 expired before 
> completing.}} and similar gets printed, so
>  * [{{abortExpired}} is 
> invoked|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L547-L549],
>  so
>  * [{{dispose}} is 
> invoked|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java#L416],
>  so
>  * [{{cancelCaller}} is 
> invoked|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java#L488],
>  so
>  * [the canceler is 
> invoked|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java#L497]
>  ([through one more 
> layer|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L129]),
>  so
>  * [{{cleanup}} is 
> invoked|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L95],
>  (possibly [not from 
> {{cancel}}|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L84]),
>  so
>  * [{{cleanupProvidedResources}} is 
> invoked|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L162]
>  (this is the indirection that made me give up), so
>  * [this trace 
> log|https://github.com/apache/flink/blob/release-1.8.1/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L372]
>  should be printed, but it isn't.
> I have some time to further investigate, but I'd appreciate help on finding 
> out where in this chain things go wrong.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[GitHub] [flink] sunhaibotb edited a comment on issue #9622: [FLINK-13516][test] Bump MiniKdc to 3.2.0 to fix the failure of YARNSessionFIFOSecuredITCase on Java 11

2019-09-05 Thread GitBox
sunhaibotb edited a comment on issue #9622: [FLINK-13516][test] Bump MiniKdc to 
3.2.0 to fix the failure of YARNSessionFIFOSecuredITCase on Java 11
URL: https://github.com/apache/flink/pull/9622#issuecomment-528677356
 
 
   No, other modules using MiniKdc only were tested with the version 2.4.1 of 
Hadoop. Today I'll verify all.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunhaibotb commented on issue #9622: [FLINK-13516][test] Bump MiniKdc to 3.2.0 to fix the failure of YARNSessionFIFOSecuredITCase on Java 11

2019-09-05 Thread GitBox
sunhaibotb commented on issue #9622: [FLINK-13516][test] Bump MiniKdc to 3.2.0 
to fix the failure of YARNSessionFIFOSecuredITCase on Java 11
URL: https://github.com/apache/flink/pull/9622#issuecomment-528677356
 
 
   No, today I'll verify that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9555: [FLINK-13868][REST] Job vertex add taskmanager id in rest api

2019-09-05 Thread GitBox
flinkbot edited a comment on issue #9555: [FLINK-13868][REST] Job vertex add 
taskmanager id in rest api
URL: https://github.com/apache/flink/pull/9555#issuecomment-526075111
 
 
   
   ## CI report:
   
   * 64f02281954e69bcfbe858eb40b8f1846cfbe195 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/125035687)
   * 5d3d03dffbaf00112a31663caaba3558886cd373 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/125055694)
   * fc91d03d717971e53abc68ba8a93a1e456fc6608 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/125176326)
   * 45d72e4d09fdbf4650077d90897aaadad67ca1fe : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/125588643)
   * 125c6a2e91f2015e029306c1c1dee741225b3868 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/126021253)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] jinglining removed a comment on issue #9555: [FLINK-13868][REST] Job vertex add taskmanager id in rest api

2019-09-05 Thread GitBox
jinglining removed a comment on issue #9555: [FLINK-13868][REST] Job vertex add 
taskmanager id in rest api
URL: https://github.com/apache/flink/pull/9555#issuecomment-528669226
 
 
   @flinkbot travil


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] jinglining commented on issue #9555: [FLINK-13868][REST] Job vertex add taskmanager id in rest api

2019-09-05 Thread GitBox
jinglining commented on issue #9555: [FLINK-13868][REST] Job vertex add 
taskmanager id in rest api
URL: https://github.com/apache/flink/pull/9555#issuecomment-528669531
 
 
   @flinkbot run travis


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] jinglining commented on issue #9555: [FLINK-13868][REST] Job vertex add taskmanager id in rest api

2019-09-05 Thread GitBox
jinglining commented on issue #9555: [FLINK-13868][REST] Job vertex add 
taskmanager id in rest api
URL: https://github.com/apache/flink/pull/9555#issuecomment-528669226
 
 
   @flinkbot travil


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-13771) Support kqueue Netty transports (MacOS)

2019-09-05 Thread Nico Kruber (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923797#comment-16923797
 ] 

Nico Kruber commented on FLINK-13771:
-

[~aitozi] I'm not working on this and I also do not know how much it is worth 
since Mac servers (running Flink, in particular) are not really wide-spread, 
afaik. However, the actual implementation overhead should be low.

I'll assign you to the issue and can have a look on the PR when you are done.

> Support kqueue Netty transports (MacOS)
> ---
>
> Key: FLINK-13771
> URL: https://issues.apache.org/jira/browse/FLINK-13771
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Nico Kruber
>Priority: Major
>
> It seems like Netty is now also supporting MacOS's native transport 
> {{kqueue}}:
> https://netty.io/wiki/native-transports.html#using-the-macosbsd-native-transport
> We should allow this via {{taskmanager.network.netty.transport}}.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Assigned] (FLINK-13771) Support kqueue Netty transports (MacOS)

2019-09-05 Thread Nico Kruber (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber reassigned FLINK-13771:
---

Assignee: Aitozi

> Support kqueue Netty transports (MacOS)
> ---
>
> Key: FLINK-13771
> URL: https://issues.apache.org/jira/browse/FLINK-13771
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Nico Kruber
>Assignee: Aitozi
>Priority: Major
>
> It seems like Netty is now also supporting MacOS's native transport 
> {{kqueue}}:
> https://netty.io/wiki/native-transports.html#using-the-macosbsd-native-transport
> We should allow this via {{taskmanager.network.netty.transport}}.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Reopened] (FLINK-13821) Website must link to License etc

2019-09-05 Thread Sebb (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sebb reopened FLINK-13821:
--

Please see:

https://whimsy.apache.org/site/project/flink

There are still two issues listed

> Website must link to License etc
> 
>
> Key: FLINK-13821
> URL: https://issues.apache.org/jira/browse/FLINK-13821
> Project: Flink
>  Issue Type: Bug
>  Components: Project Website
>Reporter: Sebb
>Assignee: Robert Metzger
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> ASF project websites must have certain links:
> Apachecon
> License
> Thanks
> Security
> Sponsor/Donat
> Please see:
> https://whimsy.apache.org/site/project/flink



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Closed] (FLINK-13937) Fix the error of the hive connector dependency version

2019-09-05 Thread Bowen Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li closed FLINK-13937.

Resolution: Fixed

merged in master: d6ff17d908932d867a6c0a02ac317a4f59e855b  1.9.1: 
1e88d4261c891d7d201621046e9c712b4fda400a

> Fix the error  of the hive connector dependency version 
> 
>
> Key: FLINK-13937
> URL: https://issues.apache.org/jira/browse/FLINK-13937
> Project: Flink
>  Issue Type: Task
>  Components: Documentation
>Affects Versions: 1.10.0
>Reporter: Jeff Yang
>Assignee: Jeff Yang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0, 1.9.1
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> There is a wrong maven dependency in the hive connector's 
> [documentation|https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/].
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[GitHub] [flink] asfgit closed pull request #9591: [FLINK-13937][docs] Fix the error of the hive connector dependency ve…

2019-09-05 Thread GitBox
asfgit closed pull request #9591: [FLINK-13937][docs] Fix the error of the hive 
connector dependency ve…
URL: https://github.com/apache/flink/pull/9591
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #9591: [FLINK-13937][docs] Fix the error of the hive connector dependency ve…

2019-09-05 Thread GitBox
bowenli86 commented on issue #9591: [FLINK-13937][docs] Fix the error of the 
hive connector dependency ve…
URL: https://github.com/apache/flink/pull/9591#issuecomment-528581132
 
 
   LGTM, thanks for the contribution. Merging


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-13930) Support Hive version 3.1.x

2019-09-05 Thread Bowen Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li closed FLINK-13930.

Resolution: Fixed

merged in master: e670b929ae124e04c6cdec560d4432ae9c561ff9

> Support Hive version 3.1.x
> --
>
> Key: FLINK-13930
> URL: https://issues.apache.org/jira/browse/FLINK-13930
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive
>Reporter: Xuefu Zhang
>Assignee: Xuefu Zhang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Including 3.1.0, 3.1.1, and 3.1.2.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[GitHub] [flink] asfgit closed pull request #9580: [FLINK-13930][hive] Support Hive version 3.1.x

2019-09-05 Thread GitBox
asfgit closed pull request #9580: [FLINK-13930][hive] Support Hive version 3.1.x
URL: https://github.com/apache/flink/pull/9580
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9604: [FLINK-13845][docs] Drop all the content of removed Checkpointed interface

2019-09-05 Thread GitBox
flinkbot edited a comment on issue #9604: [FLINK-13845][docs] Drop all the 
content of removed Checkpointed interface
URL: https://github.com/apache/flink/pull/9604#issuecomment-527447402
 
 
   
   ## CI report:
   
   * 66ade45fec93a56dde7609dffa0ad9dd241d0713 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/125596416)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] kl0u commented on issue #9581: [FLINK-13864][streaming]: Modify the StreamingFileSink Builder interface to allow for easier subclassing of StreamingFileSink

2019-09-05 Thread GitBox
kl0u commented on issue #9581: [FLINK-13864][streaming]: Modify the 
StreamingFileSink Builder interface to allow for easier subclassing of 
StreamingFileSink 
URL: https://github.com/apache/flink/pull/9581#issuecomment-528577524
 
 
   I will have a look first thing in the morning.
   
   In general changes look good, but I would like to have another pass. Also I 
think it is worth adding a test that verifies that we can change the type of 
the `BucketID` without problems. Essentially testing the `withBucketAssigner()` 
and the corresponding `withBucketAssignerAndPolicy()` for the `rowFormat` 
builder. This is mainly to guarantee that nobody will break it in the future. 
Such a test should have been added earlier when the initial `StreamingFileSink` 
was written :( . 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on issue #9622: [FLINK-13516][test] Bump MiniKdc to 3.2.0 to fix the failure of YARNSessionFIFOSecuredITCase on Java 11

2019-09-05 Thread GitBox
zentol commented on issue #9622: [FLINK-13516][test] Bump MiniKdc to 3.2.0 to 
fix the failure of YARNSessionFIFOSecuredITCase on Java 11
URL: https://github.com/apache/flink/pull/9622#issuecomment-528556899
 
 
   minikdc isn't just used by `flink-yarn-tests` but also kafka and the 
filesystem connector, along with all users of `SecureTestEnvironment`. Did you 
verify these as well?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9633: [FLINK-13970][coordinate] Remove LifoSetQueue and SetQueue

2019-09-05 Thread GitBox
flinkbot edited a comment on issue #9633: [FLINK-13970][coordinate] Remove 
LifoSetQueue and SetQueue
URL: https://github.com/apache/flink/pull/9633#issuecomment-528481813
 
 
   
   ## CI report:
   
   * 9d6fb73a077e26ec84d3cdf5a9835b0b5b567a0a : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/126096288)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #9632: [FLINK-13978] Add azure-pipelines.yml

2019-09-05 Thread GitBox
zentol commented on a change in pull request #9632: [FLINK-13978] Add 
azure-pipelines.yml
URL: https://github.com/apache/flink/pull/9632#discussion_r321372910
 
 

 ##
 File path: azure-pipelines.yml
 ##
 @@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+trigger:
+  branches:
+include:
+- '*' 
+  paths:
+exclude:
+- docs/*
+- '*.md'
+
+resources:
+  containers:
+  # Container with Maven 3.6.1 to have the same environment everywhere.
+  - container: flink-build-container
+image: rmetzger/flink-ci:2
+
+jobs:
+  # use 'Default' pool
+- job: runOnDefaultAgentPool
+  pool:
+name: Default
+  container: flink-build-container
+  timeoutInMinutes: 0
+  steps:
+  # Azure pipelines can only evaluate conditions with the build repo name in 
the steps.
+  # if the repo != "flink-ci/flink", we stop
+  - script: exit 1
+condition: not(eq(variables['Build.Repository.Name'], 'flink-ci/flink'))
+  - script: chown -R user:user /home/user
+  - script: chown -R user:user .
+  - task: Maven@3
+inputs:
+  mavenPomFile: 'pom.xml'
+  mavenOptions: '-Xmx3072m'
+  javaHomeOption: 'JDKVersion'
+  jdkVersionOption: '1.8'
+  jdkArchitectureOption: 'x64'
+  publishJUnitResults: true
+  testResultsFiles: '**/surefire-reports/TEST-*.xml'
+  goals: 'clean install'
+  timeoutInMinutes: 0
+- job: runOnAzure
+  pool:
+vmImage: 'ubuntu-latest'
+  container: flink-build-container
+  timeoutInMinutes: 0
+  steps:
+  # Azure pipelines can only evaluate conditions with the build repo name in 
the steps.
+  # if the repo == "flink-ci/flink", we stop
+  - script: exit 1
+condition: eq(variables['Build.Repository.Name'], 'flink-ci/flink')
+  - script: sudo chown -R user:user /home/user
+  - script: sudo chown -R user:user .
+  - task: Maven@3
 
 Review comment:
   can we just call into a bash script here that lives somewhere in flink-ci 
(or flink), so that we can change what is being run without having to touch the 
flink repo? (to a certain limit...)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9604: [FLINK-13845][docs] Drop all the content of removed Checkpointed interface

2019-09-05 Thread GitBox
flinkbot edited a comment on issue #9604: [FLINK-13845][docs] Drop all the 
content of removed Checkpointed interface
URL: https://github.com/apache/flink/pull/9604#issuecomment-527447402
 
 
   
   ## CI report:
   
   * 66ade45fec93a56dde7609dffa0ad9dd241d0713 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/125596416)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9632: [FLINK-13978] Add azure-pipelines.yml

2019-09-05 Thread GitBox
flinkbot edited a comment on issue #9632: [FLINK-13978] Add azure-pipelines.yml
URL: https://github.com/apache/flink/pull/9632#issuecomment-528440893
 
 
   
   ## CI report:
   
   * d9002418d222d1e7bedea5ed2dab38076fdaeaba : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/126086359)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12389) flink codegen set String type for ByteBuffer fields

2019-09-05 Thread Yu Yang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923694#comment-16923694
 ] 

Yu Yang commented on FLINK-12389:
-

Resolving this issue as `not a bug`  as we had been able to resolve the problem 
by passing the right type info. 

> flink codegen set String type for ByteBuffer fields
> ---
>
> Key: FLINK-12389
> URL: https://issues.apache.org/jira/browse/FLINK-12389
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Legacy Planner
>Affects Versions: 1.8.0
>Reporter: Yu Yang
>Priority: Major
>
> We try to write a simple flink sql program using "select  .. from " 
> statement, and encounter  a compile exception. 
> *Caused by: org.codehaus.commons.compiler.CompileException: Line 110, Column 
> 38: Cannot cast "java.nio.ByteBuffer" to "java.lang.String"*
> Further debugging shows that the following flink-generated code snippet 
> caused problem: 
> {code}
>   final java.lang.reflect.Field 
> field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds =
>   org.apache.flink.api.java.typeutils.TypeExtractor.getDeclaredField(
> com.pinterest.utzv2.thrift.RealtimeSpendValue.class, "segmentIds");
> ...
> boolean isNull$5 = (java.nio.ByteBuffer) 
> field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds.get(in1) == 
> null;
> java.lang.String result$4;
> if (isNull$5) {
>   result$4 = "";
> }
> else {
>   result$4 = (java.lang.String) (java.nio.ByteBuffer) 
> field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds.get(in1);
> }
>
> {code}
> The following is the stack track:
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
>   ... 17 more
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
>   at 
> org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
>   at 
> org.apache.flink.table.runtime.CRowOutputProcessRunner.compile(CRowOutputProcessRunner.scala:36)
>   at 
> org.apache.flink.table.runtime.CRowOutputProcessRunner.open(CRowOutputProcessRunner.scala:50)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>   at 
> org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:425)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:291)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.codehaus.commons.compiler.CompileException: Line 110, Column 
> 38: Cannot cast "java.nio.ByteBuffer" to "java.lang.String"
>   at 
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
>   at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5049)
>   at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215)
>   at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416)
>   at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394)
>   at org.codehaus.janino.Java$Cast.accept(Java.java:4887)
>   at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
>   at 
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3790)
>   at org.codehaus.janino.UnitCompiler.access$6100(UnitCompiler.java:215)
>   at 
> org.codehaus.janino.UnitCompiler$13.visitAssignment(UnitCompiler.java:3752)
>   at 
> org.codehaus.janino.UnitCompiler$13.visitAssignment(UnitCompiler.java:3732)
>   at org.codehaus.janino.Java$Assignment.accept(Java.java:4466)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
>   at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
>   at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> 

[jira] [Resolved] (FLINK-12389) flink codegen set String type for ByteBuffer fields

2019-09-05 Thread Yu Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Yang resolved FLINK-12389.
-
Resolution: Fixed

> flink codegen set String type for ByteBuffer fields
> ---
>
> Key: FLINK-12389
> URL: https://issues.apache.org/jira/browse/FLINK-12389
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Legacy Planner
>Affects Versions: 1.8.0
>Reporter: Yu Yang
>Priority: Major
>
> We try to write a simple flink sql program using "select  .. from " 
> statement, and encounter  a compile exception. 
> *Caused by: org.codehaus.commons.compiler.CompileException: Line 110, Column 
> 38: Cannot cast "java.nio.ByteBuffer" to "java.lang.String"*
> Further debugging shows that the following flink-generated code snippet 
> caused problem: 
> {code}
>   final java.lang.reflect.Field 
> field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds =
>   org.apache.flink.api.java.typeutils.TypeExtractor.getDeclaredField(
> com.pinterest.utzv2.thrift.RealtimeSpendValue.class, "segmentIds");
> ...
> boolean isNull$5 = (java.nio.ByteBuffer) 
> field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds.get(in1) == 
> null;
> java.lang.String result$4;
> if (isNull$5) {
>   result$4 = "";
> }
> else {
>   result$4 = (java.lang.String) (java.nio.ByteBuffer) 
> field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds.get(in1);
> }
>
> {code}
> The following is the stack track:
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
>   ... 17 more
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
>   at 
> org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
>   at 
> org.apache.flink.table.runtime.CRowOutputProcessRunner.compile(CRowOutputProcessRunner.scala:36)
>   at 
> org.apache.flink.table.runtime.CRowOutputProcessRunner.open(CRowOutputProcessRunner.scala:50)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>   at 
> org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:425)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:291)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.codehaus.commons.compiler.CompileException: Line 110, Column 
> 38: Cannot cast "java.nio.ByteBuffer" to "java.lang.String"
>   at 
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
>   at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5049)
>   at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215)
>   at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416)
>   at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394)
>   at org.codehaus.janino.Java$Cast.accept(Java.java:4887)
>   at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
>   at 
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3790)
>   at org.codehaus.janino.UnitCompiler.access$6100(UnitCompiler.java:215)
>   at 
> org.codehaus.janino.UnitCompiler$13.visitAssignment(UnitCompiler.java:3752)
>   at 
> org.codehaus.janino.UnitCompiler$13.visitAssignment(UnitCompiler.java:3732)
>   at org.codehaus.janino.Java$Assignment.accept(Java.java:4466)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
>   at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
>   at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
>   at 
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
>   at 

[GitHub] [flink] Myasuka commented on issue #9604: [FLINK-13845][docs] Drop all the content of removed Checkpointed interface

2019-09-05 Thread GitBox
Myasuka commented on issue #9604: [FLINK-13845][docs] Drop all the content of 
removed Checkpointed interface
URL: https://github.com/apache/flink/pull/9604#issuecomment-528523385
 
 
   @flinkbot run travis


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8303: [FLINK-12343] [flink-yarn] add file replication config for yarn configuration

2019-09-05 Thread GitBox
flinkbot edited a comment on issue #8303: [FLINK-12343] [flink-yarn] add file 
replication config for yarn configuration
URL: https://github.com/apache/flink/pull/8303#issuecomment-511684151
 
 
   
   ## CI report:
   
   * 6a7ca58b4a04f6dce250045e021702e67e82b893 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/119421914)
   * 4d38a8df0d59734c4b2386689a2f17b9f2b44b12 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/119441376)
   * 9c14836f8639e98d58cf7bb32e38b938b3843994 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/119577044)
   * 76186776c5620598a19234245bbd05dfdfb1c62c : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/120113740)
   * 628ca7b316ad3968c90192a47a84dd01f26e2578 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/122381349)
   * d204a725ff3c8a046cbd1b84e34d9e3ae8aafeac : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/123620485)
   * 143efadbdb6c4681569d5b412a175edfb1633b85 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/123637809)
   * b78b64a82ed2a9a92886095ec42f06d5082ad830 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/123671219)
   * 5145a0b9d6b320456bb971d96b9cc47707c8fd28 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/125476639)
   * 0d4d944c28c59ca1caa6c453c347ec786b40d245 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/125762588)
   * 91552c3804f5e96cc573e6ed48756f2b54c037d4 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/125844084)
   * fbf6d2850b6aa7c303981e6f5b24b0da0956b820 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/126086321)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9625: [FLINK-13968][travis] Check correctness of binary licensing

2019-09-05 Thread GitBox
flinkbot edited a comment on issue #9625: [FLINK-13968][travis] Check 
correctness of binary licensing
URL: https://github.com/apache/flink/pull/9625#issuecomment-528246792
 
 
   
   ## CI report:
   
   * e56dd0731c3aa8932e69cf0383ca6205336d5ddc : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/126011765)
   * ef2c16def1408a512643f61908954b80b84e55c3 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/126066594)
   * 33efe377186fa70ffe41ecb8deedaf3660d16ffb : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/126077026)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9633: [FLINK-13970][coordinate] Remove LifoSetQueue and SetQueue

2019-09-05 Thread GitBox
flinkbot edited a comment on issue #9633: [FLINK-13970][coordinate] Remove 
LifoSetQueue and SetQueue
URL: https://github.com/apache/flink/pull/9633#issuecomment-528481813
 
 
   
   ## CI report:
   
   * 9d6fb73a077e26ec84d3cdf5a9835b0b5b567a0a : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/126096288)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9631: [FLINK-13977] Replace HighAvailabilityServices#getWebMonitorLeaderElectionService with #getClusterRestEndpointLeaderElectionService

2019-09-05 Thread GitBox
flinkbot edited a comment on issue #9631: [FLINK-13977] Replace 
HighAvailabilityServices#getWebMonitorLeaderElectionService with 
#getClusterRestEndpointLeaderElectionService
URL: https://github.com/apache/flink/pull/9631#issuecomment-528394274
 
 
   
   ## CI report:
   
   * acafd45403970fbc9df42b7f2a26d29a381878c6 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/126070255)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-13974) isAssignable function return wrong result

2019-09-05 Thread forideal (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923582#comment-16923582
 ] 

forideal edited comment on FLINK-13974 at 9/5/19 5:22 PM:
--

[~Zentol]  i use blink planner.

in flink planner,there is not that function call.the code is 
{code:java}
val (physicalName, idx, tpe) = resolveInputField(name, tableSource) 
// validate that mapped fields are are same type 
if (tpe != t) { throw new ValidationException(s"Type $t of table field '$name' 
does not " + s"match with type $tpe of the field '$physicalName' of the 
TableSource return type.") } idx
{code}
 while,in blink planner
{code:java}

 val (physicalName, idx, tpe) = resolveInputField(name, tableSource)
 // validate that mapped fields are are same type
 if (!isAssignable(fromTypeInfoToLogicalType(tpe), t)) {
  throw new ValidationException(s"Type $t of table field '$name' does 
not " +
s"match with type $tpe of the field '$physicalName' of the 
TableSource return type.")
}
idx
{code}

howerver,both of them are not work.



was (Author: forideal):
[~Zentol]  i use blink planner.

in flink planner,there is not that function call.the code is 
{code:java}
val (physicalName, idx, tpe) = resolveInputField(name, tableSource) 
// validate that mapped fields are are same type 
if (tpe != t) { throw new ValidationException(s"Type $t of table field '$name' 
does not " + s"match with type $tpe of the field '$physicalName' of the 
TableSource return type.") } idx
{code}
 while,in blink planner
{code:java}

 val (physicalName, idx, tpe) = resolveInputField(name, tableSource)
 // validate that mapped fields are are same type
 if (!isAssignable(fromTypeInfoToLogicalType(tpe), t)) {
  throw new ValidationException(s"Type $t of table field '$name' does 
not " +
s"match with type $tpe of the field '$physicalName' of the 
TableSource return type.")
}
idx
{code}

> isAssignable function return wrong result
> -
>
> Key: FLINK-13974
> URL: https://issues.apache.org/jira/browse/FLINK-13974
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.9.0
>Reporter: forideal
>Priority: Major
> Attachments: image-2019-09-05-20-40-05-041.png
>
>
> !image-2019-09-05-20-40-05-041.png!



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[GitHub] [flink] zentol commented on a change in pull request #9632: [FLINK-13978] Add azure-pipelines.yml

2019-09-05 Thread GitBox
zentol commented on a change in pull request #9632: [FLINK-13978] Add 
azure-pipelines.yml
URL: https://github.com/apache/flink/pull/9632#discussion_r321384493
 
 

 ##
 File path: azure-pipelines.yml
 ##
 @@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+trigger:
+  branches:
+include:
+- '*' 
+  paths:
+exclude:
+- docs/*
+- '*.md'
+
+resources:
+  containers:
+  # Container with Maven 3.6.1 to have the same environment everywhere.
+  - container: flink-build-container
+image: rmetzger/flink-ci:2
+
+jobs:
+  # use 'Default' pool
+- job: runOnDefaultAgentPool
+  pool:
+name: Default
 
 Review comment:
   do I understand it correctly that this here runs on free machines, while the 
other job runs on sponsored ones?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #9633: [FLINK-13970][coordinate] Remove LifoSetQueue and SetQueue

2019-09-05 Thread GitBox
flinkbot commented on issue #9633: [FLINK-13970][coordinate] Remove 
LifoSetQueue and SetQueue
URL: https://github.com/apache/flink/pull/9633#issuecomment-528481813
 
 
   
   ## CI report:
   
   * 9d6fb73a077e26ec84d3cdf5a9835b0b5b567a0a : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9624: [FLINK-13967][licensing] Fully generate binary licensing

2019-09-05 Thread GitBox
flinkbot edited a comment on issue #9624: [FLINK-13967][licensing] Fully 
generate binary licensing
URL: https://github.com/apache/flink/pull/9624#issuecomment-528246748
 
 
   
   ## CI report:
   
   * 3c3290ae04fc1c8e732f50135c34563add5b82a8 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/126011736)
   * 9add8e0d4885b0c715b60808efc533737c851a1c : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/126066557)
   * 3b037f9c712b00eae3d2349407d8f7714ec1b7c4 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/126076990)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #9632: [FLINK-13978] Add azure-pipelines.yml

2019-09-05 Thread GitBox
zentol commented on a change in pull request #9632: [FLINK-13978] Add 
azure-pipelines.yml
URL: https://github.com/apache/flink/pull/9632#discussion_r321373629
 
 

 ##
 File path: azure-pipelines.yml
 ##
 @@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+trigger:
+  branches:
+include:
+- '*' 
+  paths:
+exclude:
+- docs/*
+- '*.md'
+
+resources:
+  containers:
+  # Container with Maven 3.6.1 to have the same environment everywhere.
 
 Review comment:
   can we reduce this to 3.2.5 to have something comparable to the current 
setup?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #9632: [FLINK-13978] Add azure-pipelines.yml

2019-09-05 Thread GitBox
zentol commented on a change in pull request #9632: [FLINK-13978] Add 
azure-pipelines.yml
URL: https://github.com/apache/flink/pull/9632#discussion_r321372910
 
 

 ##
 File path: azure-pipelines.yml
 ##
 @@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+trigger:
+  branches:
+include:
+- '*' 
+  paths:
+exclude:
+- docs/*
+- '*.md'
+
+resources:
+  containers:
+  # Container with Maven 3.6.1 to have the same environment everywhere.
+  - container: flink-build-container
+image: rmetzger/flink-ci:2
+
+jobs:
+  # use 'Default' pool
+- job: runOnDefaultAgentPool
+  pool:
+name: Default
+  container: flink-build-container
+  timeoutInMinutes: 0
+  steps:
+  # Azure pipelines can only evaluate conditions with the build repo name in 
the steps.
+  # if the repo != "flink-ci/flink", we stop
+  - script: exit 1
+condition: not(eq(variables['Build.Repository.Name'], 'flink-ci/flink'))
+  - script: chown -R user:user /home/user
+  - script: chown -R user:user .
+  - task: Maven@3
+inputs:
+  mavenPomFile: 'pom.xml'
+  mavenOptions: '-Xmx3072m'
+  javaHomeOption: 'JDKVersion'
+  jdkVersionOption: '1.8'
+  jdkArchitectureOption: 'x64'
+  publishJUnitResults: true
+  testResultsFiles: '**/surefire-reports/TEST-*.xml'
+  goals: 'clean install'
+  timeoutInMinutes: 0
+- job: runOnAzure
+  pool:
+vmImage: 'ubuntu-latest'
+  container: flink-build-container
+  timeoutInMinutes: 0
+  steps:
+  # Azure pipelines can only evaluate conditions with the build repo name in 
the steps.
+  # if the repo == "flink-ci/flink", we stop
+  - script: exit 1
+condition: eq(variables['Build.Repository.Name'], 'flink-ci/flink')
+  - script: sudo chown -R user:user /home/user
+  - script: sudo chown -R user:user .
+  - task: Maven@3
 
 Review comment:
   can we just call into a bash script here that lives somewhere in flink-ci, 
so that we can change what is being run without having to touch the flink repo? 
(to a certain limit...)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tweise commented on issue #9581: [FLINK-13864][streaming]: Modify the StreamingFileSink Builder interface to allow for easier subclassing of StreamingFileSink

2019-09-05 Thread GitBox
tweise commented on issue #9581: [FLINK-13864][streaming]: Modify the 
StreamingFileSink Builder interface to allow for easier subclassing of 
StreamingFileSink 
URL: https://github.com/apache/flink/pull/9581#issuecomment-528458812
 
 
   @kl0u could you please comment if this solution is acceptable? We would 
ideally like to wrap this up and use the same patch in our fork.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #9633: [FLINK-13970][coordinate] Remove LifoSetQueue and SetQueue

2019-09-05 Thread GitBox
flinkbot commented on issue #9633: [FLINK-13970][coordinate] Remove 
LifoSetQueue and SetQueue
URL: https://github.com/apache/flink/pull/9633#issuecomment-528458285
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 9d6fb73a077e26ec84d3cdf5a9835b0b5b567a0a (Thu Sep 05 
16:50:41 UTC 2019)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13970) Remove LifoSetQueue and SetQueue

2019-09-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-13970:
---
Labels: pull-request-available  (was: )

> Remove LifoSetQueue and SetQueue
> 
>
> Key: FLINK-13970
> URL: https://issues.apache.org/jira/browse/FLINK-13970
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>
> Hi [~till.rohrmann] I found a class {{LifoSetQueue}} which is not into used 
> any more. IIRC it was ever used in {{Scheduler}} and {{Instance}}. Shall we 
> remove this class also or put it under some directory collects utils?



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[GitHub] [flink] TisonKun opened a new pull request #9633: [FLINK-13970][coordinate] Remove LifoSetQueue and SetQueue

2019-09-05 Thread GitBox
TisonKun opened a new pull request #9633: [FLINK-13970][coordinate] Remove 
LifoSetQueue and SetQueue
URL: https://github.com/apache/flink/pull/9633
 
 
   ## What is the purpose of the change
   
   Dead code removal.
   
   ## Verifying this change
   
   This change is a code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   
   cc @tillrohrmann @zentol 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-13970) Remove LifoSetQueue and SetQueue

2019-09-05 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923602#comment-16923602
 ] 

TisonKun commented on FLINK-13970:
--

With another pass I find {{SetQueue}} is quite similar with {{LifoSetQueue}} 
and I'd like to do the removal in one pass. Also {{SetQueue}} is no longer used.

> Remove LifoSetQueue and SetQueue
> 
>
> Key: FLINK-13970
> URL: https://issues.apache.org/jira/browse/FLINK-13970
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
> Fix For: 1.10.0
>
>
> Hi [~till.rohrmann] I found a class {{LifoSetQueue}} which is not into used 
> any more. IIRC it was ever used in {{Scheduler}} and {{Instance}}. Shall we 
> remove this class also or put it under some directory collects utils?



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (FLINK-13970) Remove LifoSetQueue and SetQueue

2019-09-05 Thread TisonKun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun updated FLINK-13970:
-
Summary: Remove LifoSetQueue and SetQueue  (was: Remove or move 
LifoSetQueue)

> Remove LifoSetQueue and SetQueue
> 
>
> Key: FLINK-13970
> URL: https://issues.apache.org/jira/browse/FLINK-13970
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
> Fix For: 1.10.0
>
>
> Hi [~till.rohrmann] I found a class {{LifoSetQueue}} which is not into used 
> any more. IIRC it was ever used in {{Scheduler}} and {{Instance}}. Shall we 
> remove this class also or put it under some directory collects utils?



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-05 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r321355744
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -0,0 +1,1448 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link CopyOnWriteSkipListStateMap}.
+ */
+public class CopyOnWriteSkipListStateMapTest extends TestLogger {
+
+   private TestAllocator spaceAllocator;
+
+   @Before
+   public void setUp() {
+   int maxAllocateSize = 256;
+   spaceAllocator = new TestAllocator(maxAllocateSize);
+   }
+
+   @After
+   public void tearDown() {
+   IOUtils.closeQuietly(spaceAllocator);
+   }
+
+   /**
+* Test initialization of state map.
+*/
+   @Test
+   public void testInitStateMap() {
+   TypeSerializer keySerializer = IntSerializer.INSTANCE;
+   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
+   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
+   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
+   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+
+   assertTrue(stateMap.isEmpty());
+   assertEquals(0, stateMap.size());
+   assertEquals(0, stateMap.totalSize());
+   assertEquals(0, stateMap.getRequestCount());
+   assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty());
+   assertEquals(0, stateMap.getHighestRequiredSnapshotVersion());
+   assertEquals(0, stateMap.getHighestFinishedSnapshotVersion());
+   assertTrue(stateMap.getSnapshotVersions().isEmpty());
+   assertTrue(stateMap.getPruningValueNodes().isEmpty());
+   assertEquals(0, stateMap.getResourceGuard().getLeaseCount());
+   assertFalse(stateMap.getResourceGuard().isClosed());
+   assertFalse(stateMap.isClosed());
+
+   assertNull(stateMap.get(0, 0L));
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-05 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r321363043
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -0,0 +1,1448 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link CopyOnWriteSkipListStateMap}.
+ */
+public class CopyOnWriteSkipListStateMapTest extends TestLogger {
+
+   private TestAllocator spaceAllocator;
+
+   @Before
+   public void setUp() {
+   int maxAllocateSize = 256;
+   spaceAllocator = new TestAllocator(maxAllocateSize);
+   }
+
+   @After
+   public void tearDown() {
+   IOUtils.closeQuietly(spaceAllocator);
+   }
+
+   /**
+* Test initialization of state map.
+*/
+   @Test
+   public void testInitStateMap() {
+   TypeSerializer keySerializer = IntSerializer.INSTANCE;
+   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
+   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
+   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
+   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+
+   assertTrue(stateMap.isEmpty());
+   assertEquals(0, stateMap.size());
+   assertEquals(0, stateMap.totalSize());
+   assertEquals(0, stateMap.getRequestCount());
+   assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty());
+   assertEquals(0, stateMap.getHighestRequiredSnapshotVersion());
+   assertEquals(0, stateMap.getHighestFinishedSnapshotVersion());
+   assertTrue(stateMap.getSnapshotVersions().isEmpty());
+   assertTrue(stateMap.getPruningValueNodes().isEmpty());
+   assertEquals(0, stateMap.getResourceGuard().getLeaseCount());
+   assertFalse(stateMap.getResourceGuard().isClosed());
+   assertFalse(stateMap.isClosed());
+
+   assertNull(stateMap.get(0, 0L));
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-05 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r321351907
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -0,0 +1,1448 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link CopyOnWriteSkipListStateMap}.
+ */
+public class CopyOnWriteSkipListStateMapTest extends TestLogger {
+
+   private TestAllocator spaceAllocator;
+
+   @Before
+   public void setUp() {
+   int maxAllocateSize = 256;
+   spaceAllocator = new TestAllocator(maxAllocateSize);
+   }
+
+   @After
+   public void tearDown() {
+   IOUtils.closeQuietly(spaceAllocator);
+   }
+
+   /**
+* Test initialization of state map.
+*/
+   @Test
+   public void testInitStateMap() {
+   TypeSerializer keySerializer = IntSerializer.INSTANCE;
+   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
+   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
+   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
+   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+
+   assertTrue(stateMap.isEmpty());
+   assertEquals(0, stateMap.size());
+   assertEquals(0, stateMap.totalSize());
+   assertEquals(0, stateMap.getRequestCount());
+   assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty());
+   assertEquals(0, stateMap.getHighestRequiredSnapshotVersion());
+   assertEquals(0, stateMap.getHighestFinishedSnapshotVersion());
+   assertTrue(stateMap.getSnapshotVersions().isEmpty());
+   assertTrue(stateMap.getPruningValueNodes().isEmpty());
+   assertEquals(0, stateMap.getResourceGuard().getLeaseCount());
+   assertFalse(stateMap.getResourceGuard().isClosed());
+   assertFalse(stateMap.isClosed());
+
+   assertNull(stateMap.get(0, 0L));
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-05 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r321337838
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/SkipListUtils.java
 ##
 @@ -0,0 +1,797 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Utilities for skip list.
+ */
+@SuppressWarnings("WeakerAccess")
+public class SkipListUtils {
+   static final long NIL_NODE = -1;
+   static final long HEAD_NODE = -2;
+   static final long NIL_VALUE_POINTER = -1;
+   static final int MAX_LEVEL = 255;
+   static final int DEFAULT_LEVEL = 32;
+   static final int BYTE_MASK = 0xFF;
+
+   /**
+* Key space schema.
+* - key meta
+* -- int: level & status
+* --   byte 0: level of node in skip list
+* --   byte 1: node status
+* --   byte 2: preserve
+* --   byte 3: preserve
+* -- int: length of key
+* -- long: pointer to the newest value
+* -- long: pointer to next node on level 0
+* -- long[]: array of pointers to next node on different levels 
excluding level 0
+* -- long[]: array of pointers to previous node on different levels 
excluding level 0
+* - byte[]: data of key
+*/
+   static final int KEY_META_OFFSET = 0;
+   static final int KEY_LEN_OFFSET = KEY_META_OFFSET + Integer.BYTES;
+   static final int VALUE_POINTER_OFFSET = KEY_LEN_OFFSET + Integer.BYTES;
+   static final int NEXT_KEY_POINTER_OFFSET = VALUE_POINTER_OFFSET + 
Long.BYTES;
+   static final int LEVEL_INDEX_OFFSET = NEXT_KEY_POINTER_OFFSET + 
Long.BYTES;
+
+
+   /**
+* Pre-compute the offset of index for different levels to dismiss the 
duplicated
+* computation at runtime.
+*/
+   private static final int[] INDEX_NEXT_OFFSET_BY_LEVEL_ARRAY = new 
int[MAX_LEVEL + 1];
+
+   /**
+* Pre-compute the length of key meta for different levels to dismiss 
the duplicated
+* computation at runtime.
+*/
+   private static final int[] KEY_META_LEN_BY_LEVEL_ARRAY = new 
int[MAX_LEVEL + 1];
+
+   static {
+   for (int i = 1; i < INDEX_NEXT_OFFSET_BY_LEVEL_ARRAY.length; 
i++) {
+   INDEX_NEXT_OFFSET_BY_LEVEL_ARRAY[i] = 
LEVEL_INDEX_OFFSET + (i - 1) * Long.BYTES;
+   }
+
+   for (int i = 0; i < KEY_META_LEN_BY_LEVEL_ARRAY.length; i++) {
+   KEY_META_LEN_BY_LEVEL_ARRAY[i] = LEVEL_INDEX_OFFSET + 2 
* i * Long.BYTES;
+   }
+   }
+
+   /**
+* Returns the level of the node.
+*
+* @param byteBuffer byte buffer for key space.
+* @param offset offset of key space in the byte buffer.
+*/
+   public static int getLevel(ByteBuffer byteBuffer, int offset) {
+   return ByteBufferUtils.toInt(byteBuffer, offset + 
KEY_META_OFFSET) & BYTE_MASK;
+   }
+
+   /**
+* Returns the status of the node.
+*
+* @param byteBuffer byte buffer for key space.
+* @param offset offset of key space in the byte buffer.
+*/
+   public static byte getNodeStatus(ByteBuffer byteBuffer, int offset) {
+   return (byte) ((ByteBufferUtils.toInt(byteBuffer, offset + 
KEY_META_OFFSET) >>> 8) & BYTE_MASK);
+   }
+
+   /**
+* Puts the level and status to the key space.
+*
+* @param byteBuffer byte buffer for key space.
+* @param offset offset of key space in the byte buffer.
+* @param level the level.
+* @param status the status.
+*/
+   public 

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-05 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r321363183
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -0,0 +1,1448 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link CopyOnWriteSkipListStateMap}.
+ */
+public class CopyOnWriteSkipListStateMapTest extends TestLogger {
+
+   private TestAllocator spaceAllocator;
+
+   @Before
+   public void setUp() {
+   int maxAllocateSize = 256;
+   spaceAllocator = new TestAllocator(maxAllocateSize);
+   }
+
+   @After
+   public void tearDown() {
+   IOUtils.closeQuietly(spaceAllocator);
+   }
+
+   /**
+* Test initialization of state map.
+*/
+   @Test
+   public void testInitStateMap() {
+   TypeSerializer keySerializer = IntSerializer.INSTANCE;
+   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
+   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
+   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
+   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+
+   assertTrue(stateMap.isEmpty());
+   assertEquals(0, stateMap.size());
+   assertEquals(0, stateMap.totalSize());
+   assertEquals(0, stateMap.getRequestCount());
+   assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty());
+   assertEquals(0, stateMap.getHighestRequiredSnapshotVersion());
+   assertEquals(0, stateMap.getHighestFinishedSnapshotVersion());
+   assertTrue(stateMap.getSnapshotVersions().isEmpty());
+   assertTrue(stateMap.getPruningValueNodes().isEmpty());
+   assertEquals(0, stateMap.getResourceGuard().getLeaseCount());
+   assertFalse(stateMap.getResourceGuard().isClosed());
+   assertFalse(stateMap.isClosed());
+
+   assertNull(stateMap.get(0, 0L));
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-05 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r321353063
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -0,0 +1,1448 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link CopyOnWriteSkipListStateMap}.
+ */
+public class CopyOnWriteSkipListStateMapTest extends TestLogger {
+
+   private TestAllocator spaceAllocator;
+
+   @Before
+   public void setUp() {
+   int maxAllocateSize = 256;
+   spaceAllocator = new TestAllocator(maxAllocateSize);
+   }
+
+   @After
+   public void tearDown() {
+   IOUtils.closeQuietly(spaceAllocator);
+   }
+
+   /**
+* Test initialization of state map.
+*/
+   @Test
+   public void testInitStateMap() {
+   TypeSerializer keySerializer = IntSerializer.INSTANCE;
+   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
+   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
+   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
+   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+
+   assertTrue(stateMap.isEmpty());
+   assertEquals(0, stateMap.size());
+   assertEquals(0, stateMap.totalSize());
+   assertEquals(0, stateMap.getRequestCount());
+   assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty());
+   assertEquals(0, stateMap.getHighestRequiredSnapshotVersion());
+   assertEquals(0, stateMap.getHighestFinishedSnapshotVersion());
+   assertTrue(stateMap.getSnapshotVersions().isEmpty());
+   assertTrue(stateMap.getPruningValueNodes().isEmpty());
+   assertEquals(0, stateMap.getResourceGuard().getLeaseCount());
+   assertFalse(stateMap.getResourceGuard().isClosed());
+   assertFalse(stateMap.isClosed());
+
+   assertNull(stateMap.get(0, 0L));
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-05 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r321354028
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -0,0 +1,1448 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link CopyOnWriteSkipListStateMap}.
+ */
+public class CopyOnWriteSkipListStateMapTest extends TestLogger {
+
+   private TestAllocator spaceAllocator;
+
+   @Before
+   public void setUp() {
+   int maxAllocateSize = 256;
+   spaceAllocator = new TestAllocator(maxAllocateSize);
+   }
+
+   @After
+   public void tearDown() {
+   IOUtils.closeQuietly(spaceAllocator);
+   }
+
+   /**
+* Test initialization of state map.
+*/
+   @Test
+   public void testInitStateMap() {
+   TypeSerializer keySerializer = IntSerializer.INSTANCE;
+   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
+   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
+   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
+   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+
+   assertTrue(stateMap.isEmpty());
+   assertEquals(0, stateMap.size());
+   assertEquals(0, stateMap.totalSize());
+   assertEquals(0, stateMap.getRequestCount());
+   assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty());
+   assertEquals(0, stateMap.getHighestRequiredSnapshotVersion());
+   assertEquals(0, stateMap.getHighestFinishedSnapshotVersion());
+   assertTrue(stateMap.getSnapshotVersions().isEmpty());
+   assertTrue(stateMap.getPruningValueNodes().isEmpty());
+   assertEquals(0, stateMap.getResourceGuard().getLeaseCount());
+   assertFalse(stateMap.getResourceGuard().isClosed());
+   assertFalse(stateMap.isClosed());
+
+   assertNull(stateMap.get(0, 0L));
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-05 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r321322010
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/SkipListKeySerializer.java
 ##
 @@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.ByteBufferInputStreamWithPos;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/deserializer used for conversion between key/namespace and skip 
list key.
+ * It is not thread safe.
+ */
+class SkipListKeySerializer {
 
 Review comment:
   Generic parameters are not documented.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-05 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r321358028
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -0,0 +1,1448 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link CopyOnWriteSkipListStateMap}.
+ */
+public class CopyOnWriteSkipListStateMapTest extends TestLogger {
+
+   private TestAllocator spaceAllocator;
+
+   @Before
+   public void setUp() {
+   int maxAllocateSize = 256;
+   spaceAllocator = new TestAllocator(maxAllocateSize);
+   }
+
+   @After
+   public void tearDown() {
+   IOUtils.closeQuietly(spaceAllocator);
+   }
+
+   /**
+* Test initialization of state map.
+*/
+   @Test
+   public void testInitStateMap() {
+   TypeSerializer keySerializer = IntSerializer.INSTANCE;
+   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
+   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
+   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
+   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+
+   assertTrue(stateMap.isEmpty());
+   assertEquals(0, stateMap.size());
+   assertEquals(0, stateMap.totalSize());
+   assertEquals(0, stateMap.getRequestCount());
+   assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty());
+   assertEquals(0, stateMap.getHighestRequiredSnapshotVersion());
+   assertEquals(0, stateMap.getHighestFinishedSnapshotVersion());
+   assertTrue(stateMap.getSnapshotVersions().isEmpty());
+   assertTrue(stateMap.getPruningValueNodes().isEmpty());
+   assertEquals(0, stateMap.getResourceGuard().getLeaseCount());
+   assertFalse(stateMap.getResourceGuard().isClosed());
+   assertFalse(stateMap.isClosed());
+
+   assertNull(stateMap.get(0, 0L));
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-05 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r321359916
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -0,0 +1,1448 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link CopyOnWriteSkipListStateMap}.
+ */
+public class CopyOnWriteSkipListStateMapTest extends TestLogger {
+
+   private TestAllocator spaceAllocator;
+
+   @Before
+   public void setUp() {
+   int maxAllocateSize = 256;
+   spaceAllocator = new TestAllocator(maxAllocateSize);
+   }
+
+   @After
+   public void tearDown() {
+   IOUtils.closeQuietly(spaceAllocator);
+   }
+
+   /**
+* Test initialization of state map.
+*/
+   @Test
+   public void testInitStateMap() {
+   TypeSerializer keySerializer = IntSerializer.INSTANCE;
+   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
+   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
+   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
+   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+
+   assertTrue(stateMap.isEmpty());
+   assertEquals(0, stateMap.size());
+   assertEquals(0, stateMap.totalSize());
+   assertEquals(0, stateMap.getRequestCount());
+   assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty());
+   assertEquals(0, stateMap.getHighestRequiredSnapshotVersion());
+   assertEquals(0, stateMap.getHighestFinishedSnapshotVersion());
+   assertTrue(stateMap.getSnapshotVersions().isEmpty());
+   assertTrue(stateMap.getPruningValueNodes().isEmpty());
+   assertEquals(0, stateMap.getResourceGuard().getLeaseCount());
+   assertFalse(stateMap.getResourceGuard().isClosed());
+   assertFalse(stateMap.isClosed());
+
+   assertNull(stateMap.get(0, 0L));
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-05 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r321357522
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -0,0 +1,1448 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link CopyOnWriteSkipListStateMap}.
+ */
+public class CopyOnWriteSkipListStateMapTest extends TestLogger {
+
+   private TestAllocator spaceAllocator;
+
+   @Before
+   public void setUp() {
+   int maxAllocateSize = 256;
+   spaceAllocator = new TestAllocator(maxAllocateSize);
+   }
+
+   @After
+   public void tearDown() {
+   IOUtils.closeQuietly(spaceAllocator);
+   }
+
+   /**
+* Test initialization of state map.
+*/
+   @Test
+   public void testInitStateMap() {
+   TypeSerializer keySerializer = IntSerializer.INSTANCE;
+   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
+   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
+   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
+   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+
+   assertTrue(stateMap.isEmpty());
+   assertEquals(0, stateMap.size());
+   assertEquals(0, stateMap.totalSize());
+   assertEquals(0, stateMap.getRequestCount());
+   assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty());
+   assertEquals(0, stateMap.getHighestRequiredSnapshotVersion());
+   assertEquals(0, stateMap.getHighestFinishedSnapshotVersion());
+   assertTrue(stateMap.getSnapshotVersions().isEmpty());
+   assertTrue(stateMap.getPruningValueNodes().isEmpty());
+   assertEquals(0, stateMap.getResourceGuard().getLeaseCount());
+   assertFalse(stateMap.getResourceGuard().isClosed());
+   assertFalse(stateMap.isClosed());
+
+   assertNull(stateMap.get(0, 0L));
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-05 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r321361257
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -0,0 +1,1448 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link CopyOnWriteSkipListStateMap}.
+ */
+public class CopyOnWriteSkipListStateMapTest extends TestLogger {
+
+   private TestAllocator spaceAllocator;
+
+   @Before
+   public void setUp() {
+   int maxAllocateSize = 256;
+   spaceAllocator = new TestAllocator(maxAllocateSize);
+   }
+
+   @After
+   public void tearDown() {
+   IOUtils.closeQuietly(spaceAllocator);
+   }
+
+   /**
+* Test initialization of state map.
+*/
+   @Test
+   public void testInitStateMap() {
+   TypeSerializer keySerializer = IntSerializer.INSTANCE;
+   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
+   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
+   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
+   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+
+   assertTrue(stateMap.isEmpty());
+   assertEquals(0, stateMap.size());
+   assertEquals(0, stateMap.totalSize());
+   assertEquals(0, stateMap.getRequestCount());
+   assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty());
+   assertEquals(0, stateMap.getHighestRequiredSnapshotVersion());
+   assertEquals(0, stateMap.getHighestFinishedSnapshotVersion());
+   assertTrue(stateMap.getSnapshotVersions().isEmpty());
+   assertTrue(stateMap.getPruningValueNodes().isEmpty());
+   assertEquals(0, stateMap.getResourceGuard().getLeaseCount());
+   assertFalse(stateMap.getResourceGuard().isClosed());
+   assertFalse(stateMap.isClosed());
+
+   assertNull(stateMap.get(0, 0L));
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-05 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r321359402
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -0,0 +1,1448 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link CopyOnWriteSkipListStateMap}.
+ */
+public class CopyOnWriteSkipListStateMapTest extends TestLogger {
+
+   private TestAllocator spaceAllocator;
+
+   @Before
+   public void setUp() {
+   int maxAllocateSize = 256;
+   spaceAllocator = new TestAllocator(maxAllocateSize);
+   }
+
+   @After
+   public void tearDown() {
+   IOUtils.closeQuietly(spaceAllocator);
+   }
+
+   /**
+* Test initialization of state map.
+*/
+   @Test
+   public void testInitStateMap() {
+   TypeSerializer keySerializer = IntSerializer.INSTANCE;
+   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
+   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
+   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
+   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+
+   assertTrue(stateMap.isEmpty());
+   assertEquals(0, stateMap.size());
+   assertEquals(0, stateMap.totalSize());
+   assertEquals(0, stateMap.getRequestCount());
+   assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty());
+   assertEquals(0, stateMap.getHighestRequiredSnapshotVersion());
+   assertEquals(0, stateMap.getHighestFinishedSnapshotVersion());
+   assertTrue(stateMap.getSnapshotVersions().isEmpty());
+   assertTrue(stateMap.getPruningValueNodes().isEmpty());
+   assertEquals(0, stateMap.getResourceGuard().getLeaseCount());
+   assertFalse(stateMap.getResourceGuard().isClosed());
+   assertFalse(stateMap.isClosed());
+
+   assertNull(stateMap.get(0, 0L));
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-05 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r321353802
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -0,0 +1,1448 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link CopyOnWriteSkipListStateMap}.
+ */
+public class CopyOnWriteSkipListStateMapTest extends TestLogger {
+
+   private TestAllocator spaceAllocator;
+
+   @Before
+   public void setUp() {
+   int maxAllocateSize = 256;
+   spaceAllocator = new TestAllocator(maxAllocateSize);
+   }
+
+   @After
+   public void tearDown() {
+   IOUtils.closeQuietly(spaceAllocator);
+   }
+
+   /**
+* Test initialization of state map.
+*/
+   @Test
+   public void testInitStateMap() {
+   TypeSerializer keySerializer = IntSerializer.INSTANCE;
+   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
+   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
+   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
+   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+
+   assertTrue(stateMap.isEmpty());
+   assertEquals(0, stateMap.size());
+   assertEquals(0, stateMap.totalSize());
+   assertEquals(0, stateMap.getRequestCount());
+   assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty());
+   assertEquals(0, stateMap.getHighestRequiredSnapshotVersion());
+   assertEquals(0, stateMap.getHighestFinishedSnapshotVersion());
+   assertTrue(stateMap.getSnapshotVersions().isEmpty());
+   assertTrue(stateMap.getPruningValueNodes().isEmpty());
+   assertEquals(0, stateMap.getResourceGuard().getLeaseCount());
+   assertFalse(stateMap.getResourceGuard().isClosed());
+   assertFalse(stateMap.isClosed());
+
+   assertNull(stateMap.get(0, 0L));
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-05 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r321345857
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/space/Chunk.java
 ##
 @@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.space;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Chunk is a contiguous byteBuffer. or logically contiguous space .
+ * for example: a Chunk is 1G space, maybe it's one big file, or multi 4M 
on-heap ByteBuffer
+ */
+public interface Chunk {
+   /**
+* Try to allocate size bytes from the chunk. spaceSizeInfo will record 
occupied space size.
+*
+* @return the offset of the successful allocation, or -1 to indicate 
not-enough-space
 
 Review comment:
   Same here with the `-1` return value. Why not throwing an exception?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-05 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r321345725
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/space/Chunk.java
 ##
 @@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.space;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Chunk is a contiguous byteBuffer. or logically contiguous space .
+ * for example: a Chunk is 1G space, maybe it's one big file, or multi 4M 
on-heap ByteBuffer
+ */
+public interface Chunk {
+   /**
+* Try to allocate size bytes from the chunk. spaceSizeInfo will record 
occupied space size.
 
 Review comment:
   What is `spaceSizeInfo`? This sounds like an implementation detail.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-05 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r321356663
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -0,0 +1,1448 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link CopyOnWriteSkipListStateMap}.
+ */
+public class CopyOnWriteSkipListStateMapTest extends TestLogger {
+
+   private TestAllocator spaceAllocator;
+
+   @Before
+   public void setUp() {
+   int maxAllocateSize = 256;
+   spaceAllocator = new TestAllocator(maxAllocateSize);
+   }
+
+   @After
+   public void tearDown() {
+   IOUtils.closeQuietly(spaceAllocator);
+   }
+
+   /**
+* Test initialization of state map.
+*/
+   @Test
+   public void testInitStateMap() {
+   TypeSerializer keySerializer = IntSerializer.INSTANCE;
+   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
+   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
+   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
+   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+
+   assertTrue(stateMap.isEmpty());
+   assertEquals(0, stateMap.size());
+   assertEquals(0, stateMap.totalSize());
+   assertEquals(0, stateMap.getRequestCount());
+   assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty());
+   assertEquals(0, stateMap.getHighestRequiredSnapshotVersion());
+   assertEquals(0, stateMap.getHighestFinishedSnapshotVersion());
+   assertTrue(stateMap.getSnapshotVersions().isEmpty());
+   assertTrue(stateMap.getPruningValueNodes().isEmpty());
+   assertEquals(0, stateMap.getResourceGuard().getLeaseCount());
+   assertFalse(stateMap.getResourceGuard().isClosed());
+   assertFalse(stateMap.isClosed());
+
+   assertNull(stateMap.get(0, 0L));
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-05 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r321331829
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/SkipListUtils.java
 ##
 @@ -0,0 +1,797 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Utilities for skip list.
+ */
+@SuppressWarnings("WeakerAccess")
+public class SkipListUtils {
+   static final long NIL_NODE = -1;
+   static final long HEAD_NODE = -2;
+   static final long NIL_VALUE_POINTER = -1;
+   static final int MAX_LEVEL = 255;
+   static final int DEFAULT_LEVEL = 32;
+   static final int BYTE_MASK = 0xFF;
+
+   /**
+* Key space schema.
+* - key meta
+* -- int: level & status
+* --   byte 0: level of node in skip list
+* --   byte 1: node status
+* --   byte 2: preserve
+* --   byte 3: preserve
+* -- int: length of key
+* -- long: pointer to the newest value
+* -- long: pointer to next node on level 0
+* -- long[]: array of pointers to next node on different levels 
excluding level 0
+* -- long[]: array of pointers to previous node on different levels 
excluding level 0
+* - byte[]: data of key
+*/
+   static final int KEY_META_OFFSET = 0;
+   static final int KEY_LEN_OFFSET = KEY_META_OFFSET + Integer.BYTES;
+   static final int VALUE_POINTER_OFFSET = KEY_LEN_OFFSET + Integer.BYTES;
+   static final int NEXT_KEY_POINTER_OFFSET = VALUE_POINTER_OFFSET + 
Long.BYTES;
+   static final int LEVEL_INDEX_OFFSET = NEXT_KEY_POINTER_OFFSET + 
Long.BYTES;
+
+
+   /**
+* Pre-compute the offset of index for different levels to dismiss the 
duplicated
+* computation at runtime.
+*/
+   private static final int[] INDEX_NEXT_OFFSET_BY_LEVEL_ARRAY = new 
int[MAX_LEVEL + 1];
+
+   /**
+* Pre-compute the length of key meta for different levels to dismiss 
the duplicated
+* computation at runtime.
+*/
+   private static final int[] KEY_META_LEN_BY_LEVEL_ARRAY = new 
int[MAX_LEVEL + 1];
+
+   static {
+   for (int i = 1; i < INDEX_NEXT_OFFSET_BY_LEVEL_ARRAY.length; 
i++) {
+   INDEX_NEXT_OFFSET_BY_LEVEL_ARRAY[i] = 
LEVEL_INDEX_OFFSET + (i - 1) * Long.BYTES;
+   }
+
+   for (int i = 0; i < KEY_META_LEN_BY_LEVEL_ARRAY.length; i++) {
+   KEY_META_LEN_BY_LEVEL_ARRAY[i] = LEVEL_INDEX_OFFSET + 2 
* i * Long.BYTES;
+   }
+   }
+
+   /**
+* Returns the level of the node.
+*
+* @param byteBuffer byte buffer for key space.
+* @param offset offset of key space in the byte buffer.
+*/
+   public static int getLevel(ByteBuffer byteBuffer, int offset) {
+   return ByteBufferUtils.toInt(byteBuffer, offset + 
KEY_META_OFFSET) & BYTE_MASK;
+   }
+
+   /**
+* Returns the status of the node.
+*
+* @param byteBuffer byte buffer for key space.
+* @param offset offset of key space in the byte buffer.
+*/
+   public static byte getNodeStatus(ByteBuffer byteBuffer, int offset) {
+   return (byte) ((ByteBufferUtils.toInt(byteBuffer, offset + 
KEY_META_OFFSET) >>> 8) & BYTE_MASK);
+   }
+
+   /**
+* Puts the level and status to the key space.
+*
+* @param byteBuffer byte buffer for key space.
+* @param offset offset of key space in the byte buffer.
+* @param level the level.
+* @param status the status.
+*/
+   public 

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-05 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r321356421
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -0,0 +1,1448 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link CopyOnWriteSkipListStateMap}.
+ */
+public class CopyOnWriteSkipListStateMapTest extends TestLogger {
+
+   private TestAllocator spaceAllocator;
+
+   @Before
+   public void setUp() {
+   int maxAllocateSize = 256;
+   spaceAllocator = new TestAllocator(maxAllocateSize);
+   }
+
+   @After
+   public void tearDown() {
+   IOUtils.closeQuietly(spaceAllocator);
+   }
+
+   /**
+* Test initialization of state map.
+*/
+   @Test
+   public void testInitStateMap() {
+   TypeSerializer keySerializer = IntSerializer.INSTANCE;
+   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
+   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
+   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
+   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+
+   assertTrue(stateMap.isEmpty());
+   assertEquals(0, stateMap.size());
+   assertEquals(0, stateMap.totalSize());
+   assertEquals(0, stateMap.getRequestCount());
+   assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty());
+   assertEquals(0, stateMap.getHighestRequiredSnapshotVersion());
+   assertEquals(0, stateMap.getHighestFinishedSnapshotVersion());
+   assertTrue(stateMap.getSnapshotVersions().isEmpty());
+   assertTrue(stateMap.getPruningValueNodes().isEmpty());
+   assertEquals(0, stateMap.getResourceGuard().getLeaseCount());
+   assertFalse(stateMap.getResourceGuard().isClosed());
+   assertFalse(stateMap.isClosed());
+
+   assertNull(stateMap.get(0, 0L));
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-05 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r321360968
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -0,0 +1,1448 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link CopyOnWriteSkipListStateMap}.
+ */
+public class CopyOnWriteSkipListStateMapTest extends TestLogger {
+
+   private TestAllocator spaceAllocator;
+
+   @Before
+   public void setUp() {
+   int maxAllocateSize = 256;
+   spaceAllocator = new TestAllocator(maxAllocateSize);
+   }
+
+   @After
+   public void tearDown() {
+   IOUtils.closeQuietly(spaceAllocator);
+   }
+
+   /**
+* Test initialization of state map.
+*/
+   @Test
+   public void testInitStateMap() {
+   TypeSerializer keySerializer = IntSerializer.INSTANCE;
+   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
+   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
+   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
+   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+
+   assertTrue(stateMap.isEmpty());
+   assertEquals(0, stateMap.size());
+   assertEquals(0, stateMap.totalSize());
+   assertEquals(0, stateMap.getRequestCount());
+   assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty());
+   assertEquals(0, stateMap.getHighestRequiredSnapshotVersion());
+   assertEquals(0, stateMap.getHighestFinishedSnapshotVersion());
+   assertTrue(stateMap.getSnapshotVersions().isEmpty());
+   assertTrue(stateMap.getPruningValueNodes().isEmpty());
+   assertEquals(0, stateMap.getResourceGuard().getLeaseCount());
+   assertFalse(stateMap.getResourceGuard().isClosed());
+   assertFalse(stateMap.isClosed());
+
+   assertNull(stateMap.get(0, 0L));
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-05 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r321349868
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -0,0 +1,1448 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link CopyOnWriteSkipListStateMap}.
+ */
+public class CopyOnWriteSkipListStateMapTest extends TestLogger {
+
+   private TestAllocator spaceAllocator;
+
+   @Before
+   public void setUp() {
+   int maxAllocateSize = 256;
+   spaceAllocator = new TestAllocator(maxAllocateSize);
+   }
+
+   @After
+   public void tearDown() {
+   IOUtils.closeQuietly(spaceAllocator);
+   }
+
+   /**
+* Test initialization of state map.
+*/
+   @Test
+   public void testInitStateMap() {
+   TypeSerializer keySerializer = IntSerializer.INSTANCE;
+   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
+   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
+   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
+   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+
+   assertTrue(stateMap.isEmpty());
+   assertEquals(0, stateMap.size());
+   assertEquals(0, stateMap.totalSize());
+   assertEquals(0, stateMap.getRequestCount());
+   assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty());
+   assertEquals(0, stateMap.getHighestRequiredSnapshotVersion());
+   assertEquals(0, stateMap.getHighestFinishedSnapshotVersion());
+   assertTrue(stateMap.getSnapshotVersions().isEmpty());
+   assertTrue(stateMap.getPruningValueNodes().isEmpty());
+   assertEquals(0, stateMap.getResourceGuard().getLeaseCount());
+   assertFalse(stateMap.getResourceGuard().isClosed());
+   assertFalse(stateMap.isClosed());
+
+   assertNull(stateMap.get(0, 0L));
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-05 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r321363402
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -0,0 +1,1448 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link CopyOnWriteSkipListStateMap}.
+ */
+public class CopyOnWriteSkipListStateMapTest extends TestLogger {
+
+   private TestAllocator spaceAllocator;
+
+   @Before
+   public void setUp() {
+   int maxAllocateSize = 256;
+   spaceAllocator = new TestAllocator(maxAllocateSize);
+   }
+
+   @After
+   public void tearDown() {
+   IOUtils.closeQuietly(spaceAllocator);
+   }
+
+   /**
+* Test initialization of state map.
+*/
+   @Test
+   public void testInitStateMap() {
+   TypeSerializer keySerializer = IntSerializer.INSTANCE;
+   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
+   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
+   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
+   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+
+   assertTrue(stateMap.isEmpty());
+   assertEquals(0, stateMap.size());
+   assertEquals(0, stateMap.totalSize());
+   assertEquals(0, stateMap.getRequestCount());
+   assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty());
+   assertEquals(0, stateMap.getHighestRequiredSnapshotVersion());
+   assertEquals(0, stateMap.getHighestFinishedSnapshotVersion());
+   assertTrue(stateMap.getSnapshotVersions().isEmpty());
+   assertTrue(stateMap.getPruningValueNodes().isEmpty());
+   assertEquals(0, stateMap.getResourceGuard().getLeaseCount());
+   assertFalse(stateMap.getResourceGuard().isClosed());
+   assertFalse(stateMap.isClosed());
+
+   assertNull(stateMap.get(0, 0L));
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-05 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r321346394
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/space/Chunk.java
 ##
 @@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.space;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Chunk is a contiguous byteBuffer. or logically contiguous space .
+ * for example: a Chunk is 1G space, maybe it's one big file, or multi 4M 
on-heap ByteBuffer
+ */
+public interface Chunk {
+   /**
+* Try to allocate size bytes from the chunk. spaceSizeInfo will record 
occupied space size.
+*
+* @return the offset of the successful allocation, or -1 to indicate 
not-enough-space
+*/
+   int allocate(int len);
+
+   /**
+* release the space addressed by interChunkOffset. spaceSizeInfo will 
record occupied space size.
+*
+* @param interChunkOffset offset of the chunk
+*/
+   void free(int interChunkOffset);
+
+   /**
+* @return Id of this Chunk
+*/
+   int getChunkId();
+
+   int getChunkCapacity();
+
+   /**
+* @return This chunk's backing ByteBuffer described by chunkOffset.
 
 Review comment:
   I assume that the chunk is not always backed by a `ByteBuffer` as the 
class's Javadoc indicate.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-05 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r321347548
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/space/SpaceUtils.java
 ##
 @@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.space;
+
+import static 
org.apache.flink.runtime.state.heap.space.Constants.FOUR_BYTES_BITS;
+import static 
org.apache.flink.runtime.state.heap.space.Constants.FOUR_BYTES_MARK;
+
+/**
+ * Utils.
+ */
+public class SpaceUtils {
+
+   public static int getChunkIdByAddress(long offset) {
+   return (int) ((offset >>> FOUR_BYTES_BITS) & FOUR_BYTES_MARK);
+   }
+
+   public static int getChunkOffsetByAddress(long offset) {
+   return (int) (offset & FOUR_BYTES_MARK);
+   }
+}
 
 Review comment:
   What I don't like so much about this class is that it is somehow 
disconnected from `Chunk` or the `Allocator` where we generate the addresses. I 
think this is a drawback of the procedural approach that we don't group code 
and abstractions together. You need to have a lot of contextual knowledge to 
know that there is a `SpaceUtils` which offers you to transform a `long` into 
an `int` which represents the chunk id.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-05 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r321330691
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/SkipListUtils.java
 ##
 @@ -0,0 +1,797 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Utilities for skip list.
+ */
+@SuppressWarnings("WeakerAccess")
+public class SkipListUtils {
+   static final long NIL_NODE = -1;
+   static final long HEAD_NODE = -2;
+   static final long NIL_VALUE_POINTER = -1;
+   static final int MAX_LEVEL = 255;
+   static final int DEFAULT_LEVEL = 32;
+   static final int BYTE_MASK = 0xFF;
+
+   /**
+* Key space schema.
+* - key meta
+* -- int: level & status
+* --   byte 0: level of node in skip list
+* --   byte 1: node status
+* --   byte 2: preserve
+* --   byte 3: preserve
+* -- int: length of key
+* -- long: pointer to the newest value
+* -- long: pointer to next node on level 0
+* -- long[]: array of pointers to next node on different levels 
excluding level 0
+* -- long[]: array of pointers to previous node on different levels 
excluding level 0
+* - byte[]: data of key
+*/
+   static final int KEY_META_OFFSET = 0;
+   static final int KEY_LEN_OFFSET = KEY_META_OFFSET + Integer.BYTES;
+   static final int VALUE_POINTER_OFFSET = KEY_LEN_OFFSET + Integer.BYTES;
+   static final int NEXT_KEY_POINTER_OFFSET = VALUE_POINTER_OFFSET + 
Long.BYTES;
+   static final int LEVEL_INDEX_OFFSET = NEXT_KEY_POINTER_OFFSET + 
Long.BYTES;
+
+
+   /**
+* Pre-compute the offset of index for different levels to dismiss the 
duplicated
+* computation at runtime.
+*/
+   private static final int[] INDEX_NEXT_OFFSET_BY_LEVEL_ARRAY = new 
int[MAX_LEVEL + 1];
+
+   /**
+* Pre-compute the length of key meta for different levels to dismiss 
the duplicated
+* computation at runtime.
+*/
+   private static final int[] KEY_META_LEN_BY_LEVEL_ARRAY = new 
int[MAX_LEVEL + 1];
+
+   static {
+   for (int i = 1; i < INDEX_NEXT_OFFSET_BY_LEVEL_ARRAY.length; 
i++) {
+   INDEX_NEXT_OFFSET_BY_LEVEL_ARRAY[i] = 
LEVEL_INDEX_OFFSET + (i - 1) * Long.BYTES;
+   }
+
+   for (int i = 0; i < KEY_META_LEN_BY_LEVEL_ARRAY.length; i++) {
+   KEY_META_LEN_BY_LEVEL_ARRAY[i] = LEVEL_INDEX_OFFSET + 2 
* i * Long.BYTES;
+   }
+   }
+
+   /**
+* Returns the level of the node.
+*
+* @param byteBuffer byte buffer for key space.
+* @param offset offset of key space in the byte buffer.
+*/
+   public static int getLevel(ByteBuffer byteBuffer, int offset) {
+   return ByteBufferUtils.toInt(byteBuffer, offset + 
KEY_META_OFFSET) & BYTE_MASK;
+   }
+
+   /**
+* Returns the status of the node.
+*
+* @param byteBuffer byte buffer for key space.
+* @param offset offset of key space in the byte buffer.
+*/
+   public static byte getNodeStatus(ByteBuffer byteBuffer, int offset) {
 
 Review comment:
   I guess it would be more expressive to have an enum instead of a `byte` for 
the node status.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-05 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r321354880
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -0,0 +1,1448 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link CopyOnWriteSkipListStateMap}.
+ */
+public class CopyOnWriteSkipListStateMapTest extends TestLogger {
+
+   private TestAllocator spaceAllocator;
+
+   @Before
+   public void setUp() {
+   int maxAllocateSize = 256;
+   spaceAllocator = new TestAllocator(maxAllocateSize);
+   }
+
+   @After
+   public void tearDown() {
+   IOUtils.closeQuietly(spaceAllocator);
+   }
+
+   /**
+* Test initialization of state map.
+*/
+   @Test
+   public void testInitStateMap() {
+   TypeSerializer keySerializer = IntSerializer.INSTANCE;
+   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
+   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
+   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
+   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+
+   assertTrue(stateMap.isEmpty());
+   assertEquals(0, stateMap.size());
+   assertEquals(0, stateMap.totalSize());
+   assertEquals(0, stateMap.getRequestCount());
+   assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty());
+   assertEquals(0, stateMap.getHighestRequiredSnapshotVersion());
+   assertEquals(0, stateMap.getHighestFinishedSnapshotVersion());
+   assertTrue(stateMap.getSnapshotVersions().isEmpty());
+   assertTrue(stateMap.getPruningValueNodes().isEmpty());
+   assertEquals(0, stateMap.getResourceGuard().getLeaseCount());
+   assertFalse(stateMap.getResourceGuard().isClosed());
+   assertFalse(stateMap.isClosed());
+
+   assertNull(stateMap.get(0, 0L));
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-05 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r321362123
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -0,0 +1,1448 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link CopyOnWriteSkipListStateMap}.
+ */
+public class CopyOnWriteSkipListStateMapTest extends TestLogger {
+
+   private TestAllocator spaceAllocator;
+
+   @Before
+   public void setUp() {
+   int maxAllocateSize = 256;
+   spaceAllocator = new TestAllocator(maxAllocateSize);
+   }
+
+   @After
+   public void tearDown() {
+   IOUtils.closeQuietly(spaceAllocator);
+   }
+
+   /**
+* Test initialization of state map.
+*/
+   @Test
+   public void testInitStateMap() {
+   TypeSerializer keySerializer = IntSerializer.INSTANCE;
+   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
+   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
+   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
+   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+
+   assertTrue(stateMap.isEmpty());
+   assertEquals(0, stateMap.size());
+   assertEquals(0, stateMap.totalSize());
+   assertEquals(0, stateMap.getRequestCount());
+   assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty());
+   assertEquals(0, stateMap.getHighestRequiredSnapshotVersion());
+   assertEquals(0, stateMap.getHighestFinishedSnapshotVersion());
+   assertTrue(stateMap.getSnapshotVersions().isEmpty());
+   assertTrue(stateMap.getPruningValueNodes().isEmpty());
+   assertEquals(0, stateMap.getResourceGuard().getLeaseCount());
+   assertFalse(stateMap.getResourceGuard().isClosed());
+   assertFalse(stateMap.isClosed());
+
+   assertNull(stateMap.get(0, 0L));
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-05 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r321333846
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/SkipListUtils.java
 ##
 @@ -0,0 +1,797 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Utilities for skip list.
+ */
+@SuppressWarnings("WeakerAccess")
+public class SkipListUtils {
+   static final long NIL_NODE = -1;
+   static final long HEAD_NODE = -2;
+   static final long NIL_VALUE_POINTER = -1;
+   static final int MAX_LEVEL = 255;
+   static final int DEFAULT_LEVEL = 32;
+   static final int BYTE_MASK = 0xFF;
+
+   /**
+* Key space schema.
+* - key meta
+* -- int: level & status
+* --   byte 0: level of node in skip list
+* --   byte 1: node status
+* --   byte 2: preserve
+* --   byte 3: preserve
+* -- int: length of key
+* -- long: pointer to the newest value
+* -- long: pointer to next node on level 0
+* -- long[]: array of pointers to next node on different levels 
excluding level 0
+* -- long[]: array of pointers to previous node on different levels 
excluding level 0
+* - byte[]: data of key
+*/
+   static final int KEY_META_OFFSET = 0;
+   static final int KEY_LEN_OFFSET = KEY_META_OFFSET + Integer.BYTES;
+   static final int VALUE_POINTER_OFFSET = KEY_LEN_OFFSET + Integer.BYTES;
+   static final int NEXT_KEY_POINTER_OFFSET = VALUE_POINTER_OFFSET + 
Long.BYTES;
+   static final int LEVEL_INDEX_OFFSET = NEXT_KEY_POINTER_OFFSET + 
Long.BYTES;
+
+
+   /**
+* Pre-compute the offset of index for different levels to dismiss the 
duplicated
+* computation at runtime.
+*/
+   private static final int[] INDEX_NEXT_OFFSET_BY_LEVEL_ARRAY = new 
int[MAX_LEVEL + 1];
+
+   /**
+* Pre-compute the length of key meta for different levels to dismiss 
the duplicated
+* computation at runtime.
+*/
+   private static final int[] KEY_META_LEN_BY_LEVEL_ARRAY = new 
int[MAX_LEVEL + 1];
+
+   static {
+   for (int i = 1; i < INDEX_NEXT_OFFSET_BY_LEVEL_ARRAY.length; 
i++) {
+   INDEX_NEXT_OFFSET_BY_LEVEL_ARRAY[i] = 
LEVEL_INDEX_OFFSET + (i - 1) * Long.BYTES;
+   }
+
+   for (int i = 0; i < KEY_META_LEN_BY_LEVEL_ARRAY.length; i++) {
+   KEY_META_LEN_BY_LEVEL_ARRAY[i] = LEVEL_INDEX_OFFSET + 2 
* i * Long.BYTES;
+   }
+   }
+
+   /**
+* Returns the level of the node.
+*
+* @param byteBuffer byte buffer for key space.
+* @param offset offset of key space in the byte buffer.
+*/
+   public static int getLevel(ByteBuffer byteBuffer, int offset) {
+   return ByteBufferUtils.toInt(byteBuffer, offset + 
KEY_META_OFFSET) & BYTE_MASK;
+   }
+
+   /**
+* Returns the status of the node.
+*
+* @param byteBuffer byte buffer for key space.
+* @param offset offset of key space in the byte buffer.
+*/
+   public static byte getNodeStatus(ByteBuffer byteBuffer, int offset) {
+   return (byte) ((ByteBufferUtils.toInt(byteBuffer, offset + 
KEY_META_OFFSET) >>> 8) & BYTE_MASK);
+   }
+
+   /**
+* Puts the level and status to the key space.
+*
+* @param byteBuffer byte buffer for key space.
+* @param offset offset of key space in the byte buffer.
+* @param level the level.
+* @param status the status.
+*/
+   public 

[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-05 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r321343055
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/SkipListValueSerializer.java
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.ByteBufferInputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/deserializer used for conversion between state and skip list 
value.
+ * It is not thread safe.
+ */
+class SkipListValueSerializer {
 
 Review comment:
   Generic parameter is not documented.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-05 Thread GitBox
tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r321344351
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/space/Allocator.java
 ##
 @@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.space;
+
+import java.io.Closeable;
+
+/**
+ * Implementations are responsible for allocate space.
+ */
+public interface Allocator extends Closeable {
+
+   /**
+* Allocate space with the given size.
+*
+* @param size size of space to allocate.
+* @return address of the allocated space, or -1 when allocation is 
failed.
+*/
+   long allocate(int size);
 
 Review comment:
   We don't seem to check the return value for `!= -1` at the call sites of 
this method. Is this intended? What about throwing an `Exception` instead of 
returning the `-1` value?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   >