[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372422#comment-16372422
 ] 

ASF GitHub Bot commented on FLINK-8689:
---

GitHub user walterddr opened a pull request:

https://github.com/apache/flink/pull/

[FLINK-8689][table]Add runtime support of distinct filter using MapView for 
codegen


## What is the purpose of the change

- Adding in runtime support using distinct aggregation delegate to support 
distinct filtering using MapView. This is only fixing the currently broken 
over-window aggregate with distinct filter, however this change is meant to be 
used by other distinct operations on datastream such as group window aggregate, 
group aggregate, etc.


## Brief change log

  - Adding a new DistinctAggDelegateFunction and DistinctAccumulator that 
encapsulates any real aggregate function.
  - change codegen to specifically generate distinct filter before invoking 
the actual aggregate function.
  - adding in more codegen specifically for using delegates on merge and 
reset.

## Verifying this change

This change added tests and can be verified as follows:

  - Added over-window unit-test to verify generated plan before codegen.
  - Added integration tests for testing distinct over-window aggregate 
end-to-end.

## Does this pull request potentially affect one of the following parts:

no

## Documentation

this does not expose and additional external facing functionality, which 
will come in separated PR.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/walterddr/flink FLINK-8689

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #


commit cb853e6d1adfbb04819a234f8fbced32a8ffb21f
Author: Rong Rong 
Date:   2018-02-21T21:40:29Z

support distinct agg function delegate using mapview accumulator in runtime 
environment

commit fff7fa48f07b350f6fd4565f58d6a35af88faa50
Author: Rong Rong 
Date:   2018-02-21T21:43:02Z

add over window aggregate and unbounded aggregate ITCase

commit 03fc641ac01af6867d2516263bcea0ac96f1802c
Author: Rong Rong 
Date:   2018-02-22T05:15:26Z

adding in overwindow logical test cases to verify distinct modifier




> Add runtime support of distinct filter using MapView 
> -
>
> Key: FLINK-8689
> URL: https://issues.apache.org/jira/browse/FLINK-8689
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This ticket should cover distinct aggregate function support to codegen for 
> *AggregateCall*, where *isDistinct* fields is set to true.
> This can be verified using the following SQL, which is not currently 
> producing correct results.
> {code:java}
> SELECT
>   a,
>   SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW)
> FROM
>   MyTable{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-02-21 Thread walterddr
GitHub user walterddr opened a pull request:

https://github.com/apache/flink/pull/

[FLINK-8689][table]Add runtime support of distinct filter using MapView for 
codegen


## What is the purpose of the change

- Adding in runtime support using distinct aggregation delegate to support 
distinct filtering using MapView. This is only fixing the currently broken 
over-window aggregate with distinct filter, however this change is meant to be 
used by other distinct operations on datastream such as group window aggregate, 
group aggregate, etc.


## Brief change log

  - Adding a new DistinctAggDelegateFunction and DistinctAccumulator that 
encapsulates any real aggregate function.
  - change codegen to specifically generate distinct filter before invoking 
the actual aggregate function.
  - adding in more codegen specifically for using delegates on merge and 
reset.

## Verifying this change

This change added tests and can be verified as follows:

  - Added over-window unit-test to verify generated plan before codegen.
  - Added integration tests for testing distinct over-window aggregate 
end-to-end.

## Does this pull request potentially affect one of the following parts:

no

## Documentation

this does not expose and additional external facing functionality, which 
will come in separated PR.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/walterddr/flink FLINK-8689

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #


commit cb853e6d1adfbb04819a234f8fbced32a8ffb21f
Author: Rong Rong 
Date:   2018-02-21T21:40:29Z

support distinct agg function delegate using mapview accumulator in runtime 
environment

commit fff7fa48f07b350f6fd4565f58d6a35af88faa50
Author: Rong Rong 
Date:   2018-02-21T21:43:02Z

add over window aggregate and unbounded aggregate ITCase

commit 03fc641ac01af6867d2516263bcea0ac96f1802c
Author: Rong Rong 
Date:   2018-02-22T05:15:26Z

adding in overwindow logical test cases to verify distinct modifier




---


[jira] [Commented] (FLINK-8704) Migrate tests from TestingCluster to MiniClusterResource

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372419#comment-16372419
 ] 

ASF GitHub Bot commented on FLINK-8704:
---

Github user zhangminglei closed the pull request at:

https://github.com/apache/flink/pull/5547


> Migrate tests from TestingCluster to MiniClusterResource
> 
>
> Key: FLINK-8704
> URL: https://issues.apache.org/jira/browse/FLINK-8704
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5547: [FLINK-8704] [tests] Port AccumulatorLiveITCase to...

2018-02-21 Thread zhangminglei
Github user zhangminglei closed the pull request at:

https://github.com/apache/flink/pull/5547


---


[jira] [Commented] (FLINK-8704) Migrate tests from TestingCluster to MiniClusterResource

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372417#comment-16372417
 ] 

ASF GitHub Bot commented on FLINK-8704:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5547
  
@aljoscha Okay. thanks. I will follow those issues.


> Migrate tests from TestingCluster to MiniClusterResource
> 
>
> Key: FLINK-8704
> URL: https://issues.apache.org/jira/browse/FLINK-8704
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5547: [FLINK-8704] [tests] Port AccumulatorLiveITCase to...

2018-02-21 Thread zhangminglei
GitHub user zhangminglei reopened a pull request:

https://github.com/apache/flink/pull/5547

[FLINK-8704] [tests] Port AccumulatorLiveITCase to MiniClusterResource

## What is the purpose of the change
This PR contains the first batch of test ports from ```TestingCluster``` to 
```MiniClusterResource```.  I will push them one by one. Each test has it's own 
commit and is self-contained.

## Verifying this change
This change is covered by existing tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-8704

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5547.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5547


commit 983b4ed8e8591ff5343307220475f2be4c9813f0
Author: zhangminglei 
Date:   2018-02-21T15:11:19Z

[FLINK-8704] [tests] Port AccumulatorLiveITCase to MiniClusterResource




---


[jira] [Commented] (FLINK-8704) Migrate tests from TestingCluster to MiniClusterResource

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372418#comment-16372418
 ] 

ASF GitHub Bot commented on FLINK-8704:
---

GitHub user zhangminglei reopened a pull request:

https://github.com/apache/flink/pull/5547

[FLINK-8704] [tests] Port AccumulatorLiveITCase to MiniClusterResource

## What is the purpose of the change
This PR contains the first batch of test ports from ```TestingCluster``` to 
```MiniClusterResource```.  I will push them one by one. Each test has it's own 
commit and is self-contained.

## Verifying this change
This change is covered by existing tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-8704

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5547.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5547


commit 983b4ed8e8591ff5343307220475f2be4c9813f0
Author: zhangminglei 
Date:   2018-02-21T15:11:19Z

[FLINK-8704] [tests] Port AccumulatorLiveITCase to MiniClusterResource




> Migrate tests from TestingCluster to MiniClusterResource
> 
>
> Key: FLINK-8704
> URL: https://issues.apache.org/jira/browse/FLINK-8704
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5547: [FLINK-8704] [tests] Port AccumulatorLiveITCase to MiniCl...

2018-02-21 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5547
  
@aljoscha Okay. thanks. I will follow those issues.


---


[jira] [Updated] (FLINK-8689) Add runtime support of distinct filter using MapView

2018-02-21 Thread Rong Rong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong updated FLINK-8689:
-
Summary: Add runtime support of distinct filter using MapView   (was: Add 
runtime support of distinct filter using MapView for GenerateAggregation)

> Add runtime support of distinct filter using MapView 
> -
>
> Key: FLINK-8689
> URL: https://issues.apache.org/jira/browse/FLINK-8689
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This ticket should cover distinct aggregate function support to codegen for 
> *AggregateCall*, where *isDistinct* fields is set to true.
> This can be verified using the following SQL, which is not currently 
> producing correct results.
> {code:java}
> SELECT
>   a,
>   SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW)
> FROM
>   MyTable{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372176#comment-16372176
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5481
  
Thanks for the review and suggestions. And your comment on 
`DataStream#process(KeyedProcessFunction)` makes sense, I've removed it. 

(btw, I feel https://github.com/apache/flink/pull/5500 is more urgent that 
this PR. Can you take it look at that one?)


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5481: [FLINK-8560] Access to the current key in ProcessFunction...

2018-02-21 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5481
  
Thanks for the review and suggestions. And your comment on 
`DataStream#process(KeyedProcessFunction)` makes sense, I've removed it. 

(btw, I feel https://github.com/apache/flink/pull/5500 is more urgent that 
this PR. Can you take it look at that one?)


---


[jira] [Commented] (FLINK-8632) Generalize SavepointHandlers to be used for other asynchronous operations

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372161#comment-16372161
 ] 

ASF GitHub Bot commented on FLINK-8632:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5451


> Generalize SavepointHandlers to be used for other asynchronous operations
> -
>
> Key: FLINK-8632
> URL: https://issues.apache.org/jira/browse/FLINK-8632
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> We should generalize the {{SavepointHandlers}} to be usable for other 
> asynchronous operations as well. The basic idea is that one has a trigger 
> handler which triggers an asynchronous operation. This operation returns a 
> future which is completed once the operation is done. The trigger handler 
> returns a trigger id which can be used to check the status of the operation 
> by querying the status handler.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8705) Integrate Remote(Stream)Environment with Flip-6 cluster

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372155#comment-16372155
 ] 

ASF GitHub Bot commented on FLINK-8705:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5527


> Integrate Remote(Stream)Environment with Flip-6 cluster
> ---
>
> Key: FLINK-8705
> URL: https://issues.apache.org/jira/browse/FLINK-8705
> Project: Flink
>  Issue Type: New Feature
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Allow the {{Remote(Stream)Environment}} to submit jobs to a Flip-6 cluster. 
> This entails that we create the correct {{ClusterClient}} to communicate with 
> the respective cluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8631) Add support for generic response types to RestClient

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372160#comment-16372160
 ] 

ASF GitHub Bot commented on FLINK-8631:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5450


> Add support for generic response types to RestClient
> 
>
> Key: FLINK-8631
> URL: https://issues.apache.org/jira/browse/FLINK-8631
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to send generic response types to the {{RestClient}} we have to 
> enhance the {{MessageHeaders}} to also contain a collection of type 
> parameters.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7714) Port JarPlanHandler to new REST endpoint

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372159#comment-16372159
 ] 

ASF GitHub Bot commented on FLINK-7714:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5533


> Port JarPlanHandler to new REST endpoint
> 
>
> Key: FLINK-7714
> URL: https://issues.apache.org/jira/browse/FLINK-7714
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarPlanHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7712) Port JarDeleteHandler to new REST endpoint

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372157#comment-16372157
 ] 

ASF GitHub Bot commented on FLINK-7712:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5529


> Port JarDeleteHandler to new REST endpoint
> --
>
> Key: FLINK-7712
> URL: https://issues.apache.org/jira/browse/FLINK-7712
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarDeleteHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8710) AbstractYarnClusterDescriptor doesn't use pre-defined configs in Hadoop's YarnConfiguration

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372156#comment-16372156
 ] 

ASF GitHub Bot commented on FLINK-8710:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5522


> AbstractYarnClusterDescriptor doesn't use pre-defined configs in Hadoop's 
> YarnConfiguration
> ---
>
> Key: FLINK-8710
> URL: https://issues.apache.org/jira/browse/FLINK-8710
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.5.0
>
>
> {{AbstractYarnClusterDescriptor}} should use Hadoop's 
> {{YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB}} rather than raw 
> string "yarn.scheduler.minimum-allocation-mb"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8711) Flink with YARN uses wrong SlotsPerTaskManager

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372158#comment-16372158
 ] 

ASF GitHub Bot commented on FLINK-8711:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5532


> Flink with YARN uses wrong SlotsPerTaskManager
> --
>
> Key: FLINK-8711
> URL: https://issues.apache.org/jira/browse/FLINK-8711
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.3.2, 1.5.0
>Reporter: Aleksandr Filichkin
>Assignee: Till Rohrmann
>Priority: Critical
>
> I see wrong behavior for Flink in YARN.
> I tried to setup SlotsPerTaskManager using "-ys 2 ", but it used only 1 slot.
> I found the code 
> [https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L387]
> For example, when I have :"-yn 7 -ys 2  -p 2" in log I see:
>  "The YARN cluster has 14 slots available, but the user requested a 
> parallelism of 2 on YARN. Each of the 7 TaskManagers will get 1 slots."
> Why can't we use -ys with -p?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5533: [FLINK-7714][flip6] Port JarPlanHandler to new RES...

2018-02-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5533


---


[GitHub] flink pull request #5532: [FLINK-8711] [yarn] Remove code which auto-magical...

2018-02-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5532


---


[GitHub] flink pull request #5522: [FLINK-8710] [YARN] AbstractYarnClusterDescriptor ...

2018-02-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5522


---


[GitHub] flink pull request #5527: [FLINK-8705] [flip6] Add Flip-6 support to Remote(...

2018-02-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5527


---


[GitHub] flink pull request #5450: [FLINK-8631] [rest] Add support for generic types ...

2018-02-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5450


---


[GitHub] flink pull request #5451: [FLINK-8632] [flip6] Introduce generalized asynchr...

2018-02-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5451


---


[GitHub] flink pull request #5529: [FLINK-7712][flip6] Port JarDeleteHandler to new R...

2018-02-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5529


---


[jira] [Created] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode

2018-02-21 Thread Joshua DeWald (JIRA)
Joshua DeWald created FLINK-8740:


 Summary: Job-level metrics lost during job re-submission in HA mode
 Key: FLINK-8740
 URL: https://issues.apache.org/jira/browse/FLINK-8740
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Affects Versions: 1.4.0
Reporter: Joshua DeWald


When Flink is running in High Availability and a leader re-election occurs to 
the same job manager, the job is unable to register the job-level metrics due 
to a name collision. 

This may occur even if a different Job Manager is elected, but as it is a local 
JobManagerMetricsGroup which spits out the error, that is unlikely the case.

 

*Expected Behavior*

When a job is forced to re-submit due to Job Manager re-election, job-level 
metrics should be available in the new instance of the job (uptime, checkpoints 
size, checkpoint duration, etc)

*Actual Behavior*

When job gets re-submitted, it is unable to register job-level metrics due to 
collision in the JobManagerMetricGroup, which leads to situation where even 
though job is running the metrics around checkpoints and uptime are not 
available

*Steps to reproduce*
 # Start up Flink in HA mode using ZooKeeper, single node is fine
 # Submit a job to the cluster
 # Stop and restart ZooKeeper
 # In Job Manager logs you will see the following errors:
 # 
{noformat}
79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - Name 
collision: Group already contains a Metric with the name 
'totalNumberOfCheckpoints'. Metric will not be reported
79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - Name 
collision: Group already contains a Metric with the name 
'numberOfInProgressCheckpoints'. Metric will not be reported
79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - Name 
collision: Group already contains a Metric with the name 
'numberOfCompletedCheckpoints'. Metric will not be reported{noformat}

*Proposed Solution*

I suspect that there may be other related issues than just the metrics, but a 
code change that seems to fix the issue is that, during recovery, to remove the 
existing registered Job Metrics:
{code:java}
if (isRecovery) {
   log.info(s"Removing metrics for $jobId, new will be added during recover")
   jobManagerMetricGroup.removeJob(jobId)
}{code}
I'd be happy to submit this in a PR if that is acceptable to open up the 
discussion, but I am not sure the consequences of not closing the previous JMMG 
or perhaps simply not re-registering job-level metrics during recovery. Doing 
this would seem to entail informing lower levels about the recovery.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8739) Optimize runtime support for distinct filter to reuse same distinct accumulator for filtering

2018-02-21 Thread Rong Rong (JIRA)
Rong Rong created FLINK-8739:


 Summary: Optimize runtime support for distinct filter to reuse 
same distinct accumulator for filtering
 Key: FLINK-8739
 URL: https://issues.apache.org/jira/browse/FLINK-8739
 Project: Flink
  Issue Type: Sub-task
Reporter: Rong Rong






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8690) Update logical rule set to generate FlinkLogicalAggregate explicitly allow distinct agg on DataStream

2018-02-21 Thread Rong Rong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong updated FLINK-8690:
-
Description: 
**Currently, *FlinkLogicalAggregate* does not allow distinct aggregate.

We are propose to reuse distinct aggregate codegen work designed for 
*FlinkLogicalOverAggregate* / *FlinkLogicalWindowAggregate*, to support 
unbounded distinct aggregation as well.

> Update logical rule set to generate FlinkLogicalAggregate explicitly allow 
> distinct agg on DataStream
> -
>
> Key: FLINK-8690
> URL: https://issues.apache.org/jira/browse/FLINK-8690
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> **Currently, *FlinkLogicalAggregate* does not allow distinct aggregate.
> We are propose to reuse distinct aggregate codegen work designed for 
> *FlinkLogicalOverAggregate* / *FlinkLogicalWindowAggregate*, to support 
> unbounded distinct aggregation as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8690) Update logical rule set to generate FlinkLogicalAggregate explicitly allow distinct agg on DataStream

2018-02-21 Thread Rong Rong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong updated FLINK-8690:
-
Description: 
**Currently, *FlinkLogicalAggregate* does not allow distinct aggregate.

We are proposing to reuse distinct aggregate codegen work designed for 
*FlinkLogicalOverAggregate* / *FlinkLogicalWindowAggregate*, to support 
unbounded distinct aggregation as well.

  was:
**Currently, *FlinkLogicalAggregate* does not allow distinct aggregate.

We are propose to reuse distinct aggregate codegen work designed for 
*FlinkLogicalOverAggregate* / *FlinkLogicalWindowAggregate*, to support 
unbounded distinct aggregation as well.


> Update logical rule set to generate FlinkLogicalAggregate explicitly allow 
> distinct agg on DataStream
> -
>
> Key: FLINK-8690
> URL: https://issues.apache.org/jira/browse/FLINK-8690
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> **Currently, *FlinkLogicalAggregate* does not allow distinct aggregate.
> We are proposing to reuse distinct aggregate codegen work designed for 
> *FlinkLogicalOverAggregate* / *FlinkLogicalWindowAggregate*, to support 
> unbounded distinct aggregation as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8689) Add runtime support of distinct filter using MapView for GenerateAggregation

2018-02-21 Thread Rong Rong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong updated FLINK-8689:
-
Description: 
This ticket should cover distinct aggregate function support to codegen for 
*AggregateCall*, where *isDistinct* fields is set to true.

This can be verified using the following SQL, which is not currently producing 
correct results.
{code:java}
SELECT
  a,
  SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
CURRENT ROW)
FROM
  MyTable{code}
 

 

  was:
This ticket should cover distinct aggregate function support to codegen for 
`AggregateCall`, where `isDistinct` fields is set to true.

This can be verified using the following SQL, which is not currently producing 
correct results.
{code:java}
SELECT
  a,
  SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
CURRENT ROW)
FROM
  MyTable{code}
 

 


> Add runtime support of distinct filter using MapView for GenerateAggregation
> 
>
> Key: FLINK-8689
> URL: https://issues.apache.org/jira/browse/FLINK-8689
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This ticket should cover distinct aggregate function support to codegen for 
> *AggregateCall*, where *isDistinct* fields is set to true.
> This can be verified using the following SQL, which is not currently 
> producing correct results.
> {code:java}
> SELECT
>   a,
>   SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW)
> FROM
>   MyTable{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8690) Update logical rule set to generate FlinkLogicalAggregate explicitly allow distinct agg on DataStream

2018-02-21 Thread Rong Rong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong updated FLINK-8690:
-
Summary: Update logical rule set to generate FlinkLogicalAggregate 
explicitly allow distinct agg on DataStream  (was: Update logical rule set to 
generate FlinkLogicalAggregate explicitly allow distinct operator )

> Update logical rule set to generate FlinkLogicalAggregate explicitly allow 
> distinct agg on DataStream
> -
>
> Key: FLINK-8690
> URL: https://issues.apache.org/jira/browse/FLINK-8690
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8689) Add runtime support of distinct filter using MapView for GenerateAggregation

2018-02-21 Thread Rong Rong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong updated FLINK-8689:
-
Description: 
This should add distinct aggregate function support to codegen for 
`AggregateCall`, where `isDistinct` fields is set to true.

This can be verified using the following SQL, which is not currently producing 
correct results.

 
{code:java}
SELECT
  a,
  SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
CURRENT ROW)
FROM
  MyTable{code}
 

 

> Add runtime support of distinct filter using MapView for GenerateAggregation
> 
>
> Key: FLINK-8689
> URL: https://issues.apache.org/jira/browse/FLINK-8689
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This should add distinct aggregate function support to codegen for 
> `AggregateCall`, where `isDistinct` fields is set to true.
> This can be verified using the following SQL, which is not currently 
> producing correct results.
>  
> {code:java}
> SELECT
>   a,
>   SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW)
> FROM
>   MyTable{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8689) Add runtime support of distinct filter using MapView for GenerateAggregation

2018-02-21 Thread Rong Rong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong updated FLINK-8689:
-
Description: 
This ticket should cover distinct aggregate function support to codegen for 
`AggregateCall`, where `isDistinct` fields is set to true.

This can be verified using the following SQL, which is not currently producing 
correct results.
{code:java}
SELECT
  a,
  SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
CURRENT ROW)
FROM
  MyTable{code}
 

 

  was:
This should add distinct aggregate function support to codegen for 
`AggregateCall`, where `isDistinct` fields is set to true.

This can be verified using the following SQL, which is not currently producing 
correct results.
{code:java}
SELECT
  a,
  SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
CURRENT ROW)
FROM
  MyTable{code}
 

 


> Add runtime support of distinct filter using MapView for GenerateAggregation
> 
>
> Key: FLINK-8689
> URL: https://issues.apache.org/jira/browse/FLINK-8689
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This ticket should cover distinct aggregate function support to codegen for 
> `AggregateCall`, where `isDistinct` fields is set to true.
> This can be verified using the following SQL, which is not currently 
> producing correct results.
> {code:java}
> SELECT
>   a,
>   SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW)
> FROM
>   MyTable{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8689) Add runtime support of distinct filter using MapView for GenerateAggregation

2018-02-21 Thread Rong Rong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong updated FLINK-8689:
-
Description: 
This should add distinct aggregate function support to codegen for 
`AggregateCall`, where `isDistinct` fields is set to true.

This can be verified using the following SQL, which is not currently producing 
correct results.
{code:java}
SELECT
  a,
  SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
CURRENT ROW)
FROM
  MyTable{code}
 

 

  was:
This should add distinct aggregate function support to codegen for 
`AggregateCall`, where `isDistinct` fields is set to true.

This can be verified using the following SQL, which is not currently producing 
correct results.

 
{code:java}
SELECT
  a,
  SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
CURRENT ROW)
FROM
  MyTable{code}
 

 


> Add runtime support of distinct filter using MapView for GenerateAggregation
> 
>
> Key: FLINK-8689
> URL: https://issues.apache.org/jira/browse/FLINK-8689
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This should add distinct aggregate function support to codegen for 
> `AggregateCall`, where `isDistinct` fields is set to true.
> This can be verified using the following SQL, which is not currently 
> producing correct results.
> {code:java}
> SELECT
>   a,
>   SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW)
> FROM
>   MyTable{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8735) Add savepoint migration ITCase that covers operator state

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371842#comment-16371842
 ] 

ASF GitHub Bot commented on FLINK-8735:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5552#discussion_r169738273
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java
 ---
@@ -0,0 +1,663 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Migration ITCases for a stateful job. The tests are parameterized to 
cover
+ * migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class LegacyStatefulJobSavepointMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_2, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_2, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_3, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_3, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_4, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_4, 

[jira] [Commented] (FLINK-8718) Non-parallel DataStreamSource does not set max parallelism

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372076#comment-16372076
 ] 

ASF GitHub Bot commented on FLINK-8718:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5545
  
Shouldn't it always be possible to merge n key groups into a single key 
group? Maybe this could be the backwards compatibility path.

Moreover, which sources use keyed state? Don't almost all of the sources 
use operator state if they are stateful?


> Non-parallel DataStreamSource does not set max parallelism
> --
>
> Key: FLINK-8718
> URL: https://issues.apache.org/jira/browse/FLINK-8718
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Streaming
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Major
> Fix For: 1.5.0
>
>
> {{org.apache.flink.streaming.api.datastream.DataStreamSource}} does not set 
> {{maxParallelism}} to 1 if it is non-parallel.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5545: [FLINK-8718][DataStream] Set maxParallelism on non-parall...

2018-02-21 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5545
  
Shouldn't it always be possible to merge n key groups into a single key 
group? Maybe this could be the backwards compatibility path.

Moreover, which sources use keyed state? Don't almost all of the sources 
use operator state if they are stateful?


---


[jira] [Commented] (FLINK-8707) Excessive amount of files opened by flink task manager

2018-02-21 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372028#comment-16372028
 ] 

Stephan Ewen commented on FLINK-8707:
-

If I understood the description by [~imogard] correct, then there are quite 
some jars (100s), but still many fewer than the number of FDs in total.

> Excessive amount of files opened by flink task manager
> --
>
> Key: FLINK-8707
> URL: https://issues.apache.org/jira/browse/FLINK-8707
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.3.2
> Environment: NAME="Red Hat Enterprise Linux Server"
> VERSION="7.3 (Maipo)"
> Two boxes, each with a Job Manager & Task Manager, using Zookeeper for HA.
> flink.yaml below with some settings (removed exact box names) etc:
> env.log.dir: ...some dir...residing on the same box
> env.pid.dir: some dir...residing on the same box
> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
> metrics.reporters: jmx
> state.backend: filesystem
> state.backend.fs.checkpointdir: file:///some_nfs_mount
> state.checkpoints.dir: file:///some_nfs_mount
> state.checkpoints.num-retained: 3
> high-availability.cluster-id: /tst
> high-availability.storageDir: file:///some_nfs_mount/ha
> high-availability: zookeeper
> high-availability.zookeeper.path.root: /flink
> high-availability.zookeeper.quorum: ...list of zookeeper boxes
> env.java.opts.jobmanager: ...some extra jar args
> jobmanager.archive.fs.dir: some dir...residing on the same box
> jobmanager.web.submit.enable: true
> jobmanager.web.tmpdir:  some dir...residing on the same box
> env.java.opts.taskmanager: some extra jar args
> taskmanager.tmp.dirs:  some dir...residing on the same box/var/tmp
> taskmanager.network.memory.min: 1024MB
> taskmanager.network.memory.max: 2048MB
> blob.storage.directory:  some dir...residing on the same box
>Reporter: Alexander Gardner
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: box1-jobmgr-lsof, box1-taskmgr-lsof, box2-jobmgr-lsof, 
> box2-taskmgr-lsof
>
>
> The job manager has less FDs than the task manager.
>  
> Hi
> A support alert indicated that there were a lot of open files for the boxes 
> running Flink.
> There were 4 flink jobs that were dormant but had consumed a number of msgs 
> from Kafka using the FlinkKafkaConsumer010.
> A simple general lsof:
> $ lsof | wc -l       ->  returned 153114 open file descriptors.
> Focusing on the TaskManager process (process ID = 12154):
> $ lsof | grep 12154 | wc -l-    > returned 129322 open FDs
> $ lsof -p 12154 | wc -l   -> returned 531 FDs
> There were 228 threads running for the task manager.
>  
> Drilling down a bit further, looking at a_inode and FIFO entries: 
> $ lsof -p 12154 | grep a_inode | wc -l = 100 FDs
> $ lsof -p 12154 | grep FIFO | wc -l  = 200 FDs
> $ /proc/12154/maps = 920 entries.
> Apart from lsof identifying lots of JARs and SOs being referenced there were 
> also 244 child processes for the task manager process.
> Noticed that in each environment, a creep of file descriptors...are the above 
> figures deemed excessive for the no of FDs in use? I know Flink uses Netty - 
> is it using a separate Selector for reads & writes? 
> Additionally Flink uses memory mapped files? or direct bytebuffers are these 
> skewing the numbers of FDs shown?
> Example of one child process ID 6633:
> java 12154 6633 dfdev 387u a_inode 0,9 0 5869 [eventpoll]
>  java 12154 6633 dfdev 388r FIFO 0,8 0t0 459758080 pipe
>  java 12154 6633 dfdev 389w FIFO 0,8 0t0 459758080 pipe
> Lasty, cannot identify yet the reason for the creep in FDs even if Flink is 
> pretty dormant or has dormant jobs. Production nodes are not experiencing 
> excessive amounts of throughput yet either.
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-6489) Rework 'start-local.bat' to 'start-local-cluster.bat'

2018-02-21 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-6489.
---

> Rework 'start-local.bat' to 'start-local-cluster.bat'
> -
>
> Key: FLINK-6489
> URL: https://issues.apache.org/jira/browse/FLINK-6489
> Project: Flink
>  Issue Type: Sub-task
>  Components: Startup Shell Scripts
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0
>
>
> To get rid of the JobManager local mode, we need to make sure that the 
> {{start-local.bat}} script for windows also starts a separate JobManager and 
> TaskManager process, rather than only the JobManager (and relying on that one 
> to spawn a TaskManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-6489) Rework 'start-local.bat' to 'start-local-cluster.bat'

2018-02-21 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-6489.
-
   Resolution: Fixed
Fix Version/s: 1.5.0

Fixed in c68f95c3e5e4fa0bce39915238209b238a8036a9

> Rework 'start-local.bat' to 'start-local-cluster.bat'
> -
>
> Key: FLINK-6489
> URL: https://issues.apache.org/jira/browse/FLINK-6489
> Project: Flink
>  Issue Type: Sub-task
>  Components: Startup Shell Scripts
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0
>
>
> To get rid of the JobManager local mode, we need to make sure that the 
> {{start-local.bat}} script for windows also starts a separate JobManager and 
> TaskManager process, rather than only the JobManager (and relying on that one 
> to spawn a TaskManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8696) Remove JobManager local mode from the Unix Shell Scripts

2018-02-21 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-8696.
---

> Remove JobManager local mode from the Unix Shell Scripts
> 
>
> Key: FLINK-8696
> URL: https://issues.apache.org/jira/browse/FLINK-8696
> Project: Flink
>  Issue Type: Sub-task
>  Components: Startup Shell Scripts
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0
>
>
> In order to work towards removing the local JobManager mode, the shell 
> scripts need to be changed to not use/assume that mode any more



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8696) Remove JobManager local mode from the Unix Shell Scripts

2018-02-21 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-8696.
-
Resolution: Fixed

Fixed via d61664ca64bcb82c4e8ddf03a2ed38fe8edafa98

> Remove JobManager local mode from the Unix Shell Scripts
> 
>
> Key: FLINK-8696
> URL: https://issues.apache.org/jira/browse/FLINK-8696
> Project: Flink
>  Issue Type: Sub-task
>  Components: Startup Shell Scripts
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0
>
>
> In order to work towards removing the local JobManager mode, the shell 
> scripts need to be changed to not use/assume that mode any more



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8738) Converge runtime dependency versions for 'scala-lang' and for 'com.typesafe:config'

2018-02-21 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-8738.
-
Resolution: Fixed

Fixed in a844cd89207747a27535e8cb7cd8d6f4a332d8ed

> Converge runtime dependency versions for 'scala-lang' and for 
> 'com.typesafe:config'
> ---
>
> Key: FLINK-8738
> URL: https://issues.apache.org/jira/browse/FLINK-8738
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0
>
>
> These dependencies are currently diverged:
> {code}
> Dependency convergence error for com.typesafe:config:1.3.0 paths to 
> dependency are:
> +-com.daplatform.flink:txn-api:1.0-SNAPSHOT
>   +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
> +-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
>   +-com.typesafe.akka:akka-actor_2.11:2.4.20
> +-com.typesafe:config:1.3.0
> and
> +-com.daplatform.flink:txn-api:1.0-SNAPSHOT
>   +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
> +-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
>   +-com.typesafe.akka:akka-stream_2.11:2.4.20
> +-com.typesafe:ssl-config-core_2.11:0.2.1
>   +-com.typesafe:config:1.2.0
> {code}
> and
> {code}
> Dependency convergence error for org.scala-lang:scala-library:2.11.12 paths 
> to dependency are:
> +-com.daplatform.flink:txn-api:1.0-SNAPSHOT
>   +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
> +-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
>   +-org.scala-lang:scala-library:2.11.12
> and
> +-com.daplatform.flink:txn-api:1.0-SNAPSHOT
>   +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
> +-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
>   +-com.typesafe.akka:akka-actor_2.11:2.4.20
> +-org.scala-lang:scala-library:2.11.11
> and
> +-com.daplatform.flink:txn-api:1.0-SNAPSHOT
>   +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
> +-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
>   +-com.typesafe.akka:akka-actor_2.11:2.4.20
> +-org.scala-lang.modules:scala-java8-compat_2.11:0.7.0
>   +-org.scala-lang:scala-library:2.11.7
> and
> +-com.daplatform.flink:txn-api:1.0-SNAPSHOT
>   +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
> +-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
>   +-com.typesafe.akka:akka-stream_2.11:2.4.20
> +-org.scala-lang:scala-library:2.11.11
> and
> +-com.daplatform.flink:txn-api:1.0-SNAPSHOT
>   +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
> +-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
>   +-com.typesafe.akka:akka-stream_2.11:2.4.20
> +-com.typesafe:ssl-config-core_2.11:0.2.1
>   +-org.scala-lang:scala-library:2.11.8
> and
> +-com.daplatform.flink:txn-api:1.0-SNAPSHOT
>   +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
> +-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
>   +-com.typesafe.akka:akka-stream_2.11:2.4.20
> +-com.typesafe:ssl-config-core_2.11:0.2.1
>   +-org.scala-lang.modules:scala-parser-combinators_2.11:1.0.4
> +-org.scala-lang:scala-library:2.11.6
> and
> +-com.daplatform.flink:txn-api:1.0-SNAPSHOT
>   +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
> +-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
>   +-com.typesafe.akka:akka-protobuf_2.11:2.4.20
> +-org.scala-lang:scala-library:2.11.11
> and
> +-com.daplatform.flink:txn-api:1.0-SNAPSHOT
>   +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
> +-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
>   +-com.typesafe.akka:akka-slf4j_2.11:2.4.20
> +-org.scala-lang:scala-library:2.11.11
> and
> +-com.daplatform.flink:txn-api:1.0-SNAPSHOT
>   +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
> +-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
>   +-org.clapper:grizzled-slf4j_2.11:1.0.2
> +-org.scala-lang:scala-library:2.11.0
> and
> +-com.daplatform.flink:txn-api:1.0-SNAPSHOT
>   +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
> +-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
>   +-com.twitter:chill_2.11:0.7.4
> +-org.scala-lang:scala-library:2.11.7
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6489) Rework 'start-local.bat' to 'start-local-cluster.bat'

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372016#comment-16372016
 ] 

ASF GitHub Bot commented on FLINK-6489:
---

Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/5528


> Rework 'start-local.bat' to 'start-local-cluster.bat'
> -
>
> Key: FLINK-6489
> URL: https://issues.apache.org/jira/browse/FLINK-6489
> Project: Flink
>  Issue Type: Sub-task
>  Components: Startup Shell Scripts
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
>
> To get rid of the JobManager local mode, we need to make sure that the 
> {{start-local.bat}} script for windows also starts a separate JobManager and 
> TaskManager process, rather than only the JobManager (and relying on that one 
> to spawn a TaskManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6489) Rework 'start-local.bat' to 'start-local-cluster.bat'

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372015#comment-16372015
 ] 

ASF GitHub Bot commented on FLINK-6489:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5528
  
Merged in 74c5570c9fa94d35e47899b0dcdc74a5d18750f6


> Rework 'start-local.bat' to 'start-local-cluster.bat'
> -
>
> Key: FLINK-6489
> URL: https://issues.apache.org/jira/browse/FLINK-6489
> Project: Flink
>  Issue Type: Sub-task
>  Components: Startup Shell Scripts
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
>
> To get rid of the JobManager local mode, we need to make sure that the 
> {{start-local.bat}} script for windows also starts a separate JobManager and 
> TaskManager process, rather than only the JobManager (and relying on that one 
> to spawn a TaskManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8738) Converge runtime dependency versions for 'scala-lang' and for 'com.typesafe:config'

2018-02-21 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-8738.
---

> Converge runtime dependency versions for 'scala-lang' and for 
> 'com.typesafe:config'
> ---
>
> Key: FLINK-8738
> URL: https://issues.apache.org/jira/browse/FLINK-8738
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0
>
>
> These dependencies are currently diverged:
> {code}
> Dependency convergence error for com.typesafe:config:1.3.0 paths to 
> dependency are:
> +-com.daplatform.flink:txn-api:1.0-SNAPSHOT
>   +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
> +-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
>   +-com.typesafe.akka:akka-actor_2.11:2.4.20
> +-com.typesafe:config:1.3.0
> and
> +-com.daplatform.flink:txn-api:1.0-SNAPSHOT
>   +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
> +-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
>   +-com.typesafe.akka:akka-stream_2.11:2.4.20
> +-com.typesafe:ssl-config-core_2.11:0.2.1
>   +-com.typesafe:config:1.2.0
> {code}
> and
> {code}
> Dependency convergence error for org.scala-lang:scala-library:2.11.12 paths 
> to dependency are:
> +-com.daplatform.flink:txn-api:1.0-SNAPSHOT
>   +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
> +-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
>   +-org.scala-lang:scala-library:2.11.12
> and
> +-com.daplatform.flink:txn-api:1.0-SNAPSHOT
>   +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
> +-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
>   +-com.typesafe.akka:akka-actor_2.11:2.4.20
> +-org.scala-lang:scala-library:2.11.11
> and
> +-com.daplatform.flink:txn-api:1.0-SNAPSHOT
>   +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
> +-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
>   +-com.typesafe.akka:akka-actor_2.11:2.4.20
> +-org.scala-lang.modules:scala-java8-compat_2.11:0.7.0
>   +-org.scala-lang:scala-library:2.11.7
> and
> +-com.daplatform.flink:txn-api:1.0-SNAPSHOT
>   +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
> +-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
>   +-com.typesafe.akka:akka-stream_2.11:2.4.20
> +-org.scala-lang:scala-library:2.11.11
> and
> +-com.daplatform.flink:txn-api:1.0-SNAPSHOT
>   +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
> +-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
>   +-com.typesafe.akka:akka-stream_2.11:2.4.20
> +-com.typesafe:ssl-config-core_2.11:0.2.1
>   +-org.scala-lang:scala-library:2.11.8
> and
> +-com.daplatform.flink:txn-api:1.0-SNAPSHOT
>   +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
> +-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
>   +-com.typesafe.akka:akka-stream_2.11:2.4.20
> +-com.typesafe:ssl-config-core_2.11:0.2.1
>   +-org.scala-lang.modules:scala-parser-combinators_2.11:1.0.4
> +-org.scala-lang:scala-library:2.11.6
> and
> +-com.daplatform.flink:txn-api:1.0-SNAPSHOT
>   +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
> +-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
>   +-com.typesafe.akka:akka-protobuf_2.11:2.4.20
> +-org.scala-lang:scala-library:2.11.11
> and
> +-com.daplatform.flink:txn-api:1.0-SNAPSHOT
>   +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
> +-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
>   +-com.typesafe.akka:akka-slf4j_2.11:2.4.20
> +-org.scala-lang:scala-library:2.11.11
> and
> +-com.daplatform.flink:txn-api:1.0-SNAPSHOT
>   +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
> +-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
>   +-org.clapper:grizzled-slf4j_2.11:1.0.2
> +-org.scala-lang:scala-library:2.11.0
> and
> +-com.daplatform.flink:txn-api:1.0-SNAPSHOT
>   +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
> +-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
>   +-com.twitter:chill_2.11:0.7.4
> +-org.scala-lang:scala-library:2.11.7
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5528: [FLINK-6489] [FLINK-8696] [shell scripts] Remove JobManag...

2018-02-21 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5528
  
Merged in 74c5570c9fa94d35e47899b0dcdc74a5d18750f6


---


[GitHub] flink pull request #5528: [FLINK-6489] [FLINK-8696] [shell scripts] Remove J...

2018-02-21 Thread StephanEwen
Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/5528


---


[GitHub] flink pull request #5552: [FLINK-8735] Add new StatefulJobSavepointMigration...

2018-02-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5552#discussion_r169741935
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java
 ---
@@ -0,0 +1,658 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+/**
+ * Migration ITCases for a stateful job. The tests are parameterized to 
cover
+ * migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobSavepointMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_4, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_4, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   /**
+* TODO to generate savepoints for a specific Flink version / backend 
type,
+* TODO change these values accordingly, e.g. to generate for 1.4 with 
RocksDB,
+* TODO set as (MigrationVersion.v1_4, 

[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-21 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371870#comment-16371870
 ] 

Steve Loughran commented on FLINK-8543:
---

There's no truncate operation in the S3 protocol; so not in the s3a connector, 
nor in the azure or swift clients. Not fixable. Flush-after-close, that can be 
downgraded, but truncate isn't part of the S3 object model of immutable 
objects.. 

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vv-- Additional logging --vvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException {
> this.checkOpen();
> this.backupStream.flush();
> }
> @Override
> public void close() throws IOException {
> if (this.closed.getAndSet(true)) {
> return;
> }
> this.backupStream.close();
> LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
> (Object)this.key);
> try {
> ObjectMetadata om = 
> 

[GitHub] flink pull request #5552: [FLINK-8735] Add new StatefulJobSavepointMigration...

2018-02-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5552#discussion_r169734512
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java
 ---
@@ -0,0 +1,658 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+/**
+ * Migration ITCases for a stateful job. The tests are parameterized to 
cover
+ * migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobSavepointMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_4, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_4, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   /**
+* TODO to generate savepoints for a specific Flink version / backend 
type,
+* TODO change these values accordingly, e.g. to generate for 1.4 with 
RocksDB,
+* TODO set as (MigrationVersion.v1_4, 

[jira] [Commented] (FLINK-8735) Add savepoint migration ITCase that covers operator state

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371867#comment-16371867
 ] 

ASF GitHub Bot commented on FLINK-8735:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5552#discussion_r169741385
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java
 ---
@@ -0,0 +1,658 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+/**
+ * Migration ITCases for a stateful job. The tests are parameterized to 
cover
+ * migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobSavepointMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_4, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_4, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   /**
+

[GitHub] flink pull request #5552: [FLINK-8735] Add new StatefulJobSavepointMigration...

2018-02-21 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/5552

 [FLINK-8735] Add new StatefulJobSavepointMigrationITCase

R: @kl0u, and this is also relevant for your related PR that adds support 
for broadcast state. I think you would have to build on this new one and add 
broadcast state there. 😳 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink 
jira-8735-new-savepoint-migration-test

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5552.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5552


commit 13c7fcc16c78f15abe11dfa4a0dbe91e7a96b3d8
Author: Aljoscha Krettek 
Date:   2018-02-21T17:08:13Z

[FLINK-8735] Rename StatefulJobSavepointMigrationITCase

This is preparation for modifying a new ITCase to use modern state
features.

commit 5792c207f427f62aad9f26dd08112a676aab614b
Author: Aljoscha Krettek 
Date:   2018-02-21T17:10:55Z

[FLINK-8735] Add new StatefulJobSavepointMigrationITCase

This new test does not pretend to use legacy state but now instead uses
the more modern operator state varieties.

The binary savepoints for this were generated on the release-1.4 branch.




---


[GitHub] flink pull request #5552: [FLINK-8735] Add new StatefulJobSavepointMigration...

2018-02-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5552#discussion_r169742676
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java
 ---
@@ -0,0 +1,658 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+/**
+ * Migration ITCases for a stateful job. The tests are parameterized to 
cover
+ * migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobSavepointMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_4, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_4, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   /**
+* TODO to generate savepoints for a specific Flink version / backend 
type,
+* TODO change these values accordingly, e.g. to generate for 1.4 with 
RocksDB,
+* TODO set as (MigrationVersion.v1_4, 

[jira] [Commented] (FLINK-8735) Add savepoint migration ITCase that covers operator state

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371795#comment-16371795
 ] 

ASF GitHub Bot commented on FLINK-8735:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/5552

 [FLINK-8735] Add new StatefulJobSavepointMigrationITCase

R: @kl0u, and this is also relevant for your related PR that adds support 
for broadcast state. I think you would have to build on this new one and add 
broadcast state there.  

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink 
jira-8735-new-savepoint-migration-test

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5552.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5552


commit 13c7fcc16c78f15abe11dfa4a0dbe91e7a96b3d8
Author: Aljoscha Krettek 
Date:   2018-02-21T17:08:13Z

[FLINK-8735] Rename StatefulJobSavepointMigrationITCase

This is preparation for modifying a new ITCase to use modern state
features.

commit 5792c207f427f62aad9f26dd08112a676aab614b
Author: Aljoscha Krettek 
Date:   2018-02-21T17:10:55Z

[FLINK-8735] Add new StatefulJobSavepointMigrationITCase

This new test does not pretend to use legacy state but now instead uses
the more modern operator state varieties.

The binary savepoints for this were generated on the release-1.4 branch.




> Add savepoint migration ITCase that covers operator state
> -
>
> Key: FLINK-8735
> URL: https://issues.apache.org/jira/browse/FLINK-8735
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.5.0, 1.4.2
>
>
> The current {{StatefulJobSavepointMigrationITCase}} does not cover operator 
> state, meaning state accessed using {{OperatorStateStore}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5554: [FLINK-8729][streaming] Refactor JSONGenerator to ...

2018-02-21 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5554

[FLINK-8729][streaming] Refactor JSONGenerator to use jackson

## What is the purpose of the change

This PR ports the `JSONGenerator` to rely on jackson instead of 
`org.apache.sling`.

## Brief changelog

* refactor JSONGenerator
* remove org.apache.slink dependency from flink-streaming-java

## Verifying this change

This change is already covered by existing tests, such as 
`JsonGeneratorTest`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8729

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5554.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5554


commit bbbc3aaced575f7e08141794e756fa25723d0f82
Author: zentol 
Date:   2018-02-21T14:30:16Z

[FLINK-8729][streaming] Refactor JSONGenerator to use jackson




---


[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-21 Thread chris snow (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371882#comment-16371882
 ] 

chris snow commented on FLINK-8543:
---

So all files being accompanied by .valid-length is expected behavoir when 
saving to s3?

If so, unless client applications can understand and use the .valid-length 
(which I don’t think will be the case), I don’t think this functionality makes 
sense with s3?

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vv-- Additional logging --vvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException {
> this.checkOpen();
> this.backupStream.flush();
> }
> @Override
> public void close() throws IOException {
> if (this.closed.getAndSet(true)) {
> return;
> }
> this.backupStream.close();
> LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
> (Object)this.key);
> try {
> ObjectMetadata om = 
> 

[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-21 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371976#comment-16371976
 ] 

Steve Loughran commented on FLINK-8543:
---

bq. So all files being accompanied by .valid-length is expected behavoir when 
saving to s3?

that's nothing to do with the s3a connector, so ask the flink team. 

One thing to always remember is: Object Stores are not Filesystems.

Yes, they appear to support the same API, have that same metaphor of 
directories and files, but they are different, and if you try too hard, you 
will discover that things you expect aren't there. This may be one of them

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vv-- Additional logging --vvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException {
> this.checkOpen();
> this.backupStream.flush();
> }
> @Override
> public void close() throws IOException {
> if (this.closed.getAndSet(true)) {
> return;
>   

[GitHub] flink issue #5544: [FLINK-8645][configuration] Split classloader.parent-firs...

2018-02-21 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5544
  
Looks good to me.

Would be nice to have another opinion on the config key names. After all, I 
just suggested my personal opinion...


---


[jira] [Commented] (FLINK-8735) Add savepoint migration ITCase that covers operator state

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371866#comment-16371866
 ] 

ASF GitHub Bot commented on FLINK-8735:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5552#discussion_r169741935
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java
 ---
@@ -0,0 +1,658 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+/**
+ * Migration ITCases for a stateful job. The tests are parameterized to 
cover
+ * migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobSavepointMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_4, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_4, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   /**
+

[jira] [Commented] (FLINK-8735) Add savepoint migration ITCase that covers operator state

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371843#comment-16371843
 ] 

ASF GitHub Bot commented on FLINK-8735:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5552#discussion_r169734512
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java
 ---
@@ -0,0 +1,658 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+/**
+ * Migration ITCases for a stateful job. The tests are parameterized to 
cover
+ * migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobSavepointMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_4, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_4, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   /**
+

[jira] [Created] (FLINK-8738) Converge runtime dependency versions for 'scala-lang' and for 'com.typesafe:config'

2018-02-21 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-8738:
---

 Summary: Converge runtime dependency versions for 'scala-lang' and 
for 'com.typesafe:config'
 Key: FLINK-8738
 URL: https://issues.apache.org/jira/browse/FLINK-8738
 Project: Flink
  Issue Type: Bug
  Components: Build System
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.5.0


These dependencies are currently diverged:

{code}
Dependency convergence error for com.typesafe:config:1.3.0 paths to dependency 
are:
+-com.daplatform.flink:txn-api:1.0-SNAPSHOT
  +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
+-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
  +-com.typesafe.akka:akka-actor_2.11:2.4.20
+-com.typesafe:config:1.3.0
and
+-com.daplatform.flink:txn-api:1.0-SNAPSHOT
  +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
+-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
  +-com.typesafe.akka:akka-stream_2.11:2.4.20
+-com.typesafe:ssl-config-core_2.11:0.2.1
  +-com.typesafe:config:1.2.0
{code}

and

{code}
Dependency convergence error for org.scala-lang:scala-library:2.11.12 paths to 
dependency are:
+-com.daplatform.flink:txn-api:1.0-SNAPSHOT
  +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
+-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
  +-org.scala-lang:scala-library:2.11.12
and
+-com.daplatform.flink:txn-api:1.0-SNAPSHOT
  +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
+-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
  +-com.typesafe.akka:akka-actor_2.11:2.4.20
+-org.scala-lang:scala-library:2.11.11
and
+-com.daplatform.flink:txn-api:1.0-SNAPSHOT
  +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
+-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
  +-com.typesafe.akka:akka-actor_2.11:2.4.20
+-org.scala-lang.modules:scala-java8-compat_2.11:0.7.0
  +-org.scala-lang:scala-library:2.11.7
and
+-com.daplatform.flink:txn-api:1.0-SNAPSHOT
  +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
+-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
  +-com.typesafe.akka:akka-stream_2.11:2.4.20
+-org.scala-lang:scala-library:2.11.11
and
+-com.daplatform.flink:txn-api:1.0-SNAPSHOT
  +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
+-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
  +-com.typesafe.akka:akka-stream_2.11:2.4.20
+-com.typesafe:ssl-config-core_2.11:0.2.1
  +-org.scala-lang:scala-library:2.11.8
and
+-com.daplatform.flink:txn-api:1.0-SNAPSHOT
  +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
+-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
  +-com.typesafe.akka:akka-stream_2.11:2.4.20
+-com.typesafe:ssl-config-core_2.11:0.2.1
  +-org.scala-lang.modules:scala-parser-combinators_2.11:1.0.4
+-org.scala-lang:scala-library:2.11.6
and
+-com.daplatform.flink:txn-api:1.0-SNAPSHOT
  +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
+-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
  +-com.typesafe.akka:akka-protobuf_2.11:2.4.20
+-org.scala-lang:scala-library:2.11.11
and
+-com.daplatform.flink:txn-api:1.0-SNAPSHOT
  +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
+-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
  +-com.typesafe.akka:akka-slf4j_2.11:2.4.20
+-org.scala-lang:scala-library:2.11.11
and
+-com.daplatform.flink:txn-api:1.0-SNAPSHOT
  +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
+-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
  +-org.clapper:grizzled-slf4j_2.11:1.0.2
+-org.scala-lang:scala-library:2.11.0
and
+-com.daplatform.flink:txn-api:1.0-SNAPSHOT
  +-org.apache.flink:flink-streaming-java_2.11:1.5-SNAPSHOT
+-org.apache.flink:flink-runtime_2.11:1.5-SNAPSHOT
  +-com.twitter:chill_2.11:0.7.4
+-org.scala-lang:scala-library:2.11.7
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5552: [FLINK-8735] Add new StatefulJobSavepointMigrationITCase

2018-02-21 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5552
  
The sister PR for `release-1.4` is #5553 .


---


[jira] [Commented] (FLINK-8645) Support convenient extension of parent-first ClassLoader patterns

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371807#comment-16371807
 ] 

ASF GitHub Bot commented on FLINK-8645:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5544
  
Looks good to me.

Would be nice to have another opinion on the config key names. After all, I 
just suggested my personal opinion...


> Support convenient extension of parent-first ClassLoader patterns
> -
>
> Key: FLINK-8645
> URL: https://issues.apache.org/jira/browse/FLINK-8645
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>
> The option {{classloader.parent-first-patterns}} defines a list of class 
> pattern that should always be loaded through the parent class-loader. The 
> default value contains all classes that are effectively required to be loaded 
> that way for Flink to function.
> This list cannot be extended in a convenient way, as one would have to 
> manually copy the existing default and append new entries. This makes the 
> configuration brittle in light of version upgrades where we may extend the 
> default, and also obfuscates the configuration a bit.
> I propose to separate this option into 
> {{classloader.parent-first-patterns.base}}, which subsumes the existing 
> option, and {{classloader.parent-first-patterns.append}} which is 
> automatically appended to the base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8735) Add savepoint migration ITCase that covers operator state

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371796#comment-16371796
 ] 

ASF GitHub Bot commented on FLINK-8735:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/5553

 [FLINK-8735] Add new StatefulJobSavepointMigrationITCase (release-1.4)

Sister PR to #5552 for the release-1.4 branch.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink 
jira-8735-new-savepoint-migration-test-release-14

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5553.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5553


commit 82e6f8d5b8623f97f854ae9936e6754dc09ef5af
Author: Aljoscha Krettek 
Date:   2018-02-21T17:08:13Z

[FLINK-8735] Rename StatefulJobSavepointMigrationITCase

This is preparation for modifying a new ITCase to use modern state
features.

commit bc848e43f8f6c041161d787da1c3131e4365b4c6
Author: Aljoscha Krettek 
Date:   2018-02-21T17:10:55Z

[FLINK-8735] Add new StatefulJobSavepointMigrationITCase

This new test does not pretend to use legacy state but now instead uses
the more modern operator state varieties.




> Add savepoint migration ITCase that covers operator state
> -
>
> Key: FLINK-8735
> URL: https://issues.apache.org/jira/browse/FLINK-8735
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.5.0, 1.4.2
>
>
> The current {{StatefulJobSavepointMigrationITCase}} does not cover operator 
> state, meaning state accessed using {{OperatorStateStore}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8735) Add savepoint migration ITCase that covers operator state

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371869#comment-16371869
 ] 

ASF GitHub Bot commented on FLINK-8735:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5552#discussion_r169743891
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java
 ---
@@ -0,0 +1,658 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+/**
+ * Migration ITCases for a stateful job. The tests are parameterized to 
cover
+ * migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobSavepointMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_4, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_4, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   /**
+

[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-21 Thread chris snow (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371855#comment-16371855
 ] 

chris snow commented on FLINK-8543:
---

Does Flink running on EMR using S3 have the same issue?  If not, what AWS S3 
API calls and filesystem implementation are used?

 

IBM COS S3 supports a subset of the most common AWS S3 API operations 
(https://ibm-public-cos.github.io/crs-docs/api-reference#copy-an-object).

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vv-- Additional logging --vvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException {
> this.checkOpen();
> this.backupStream.flush();
> }
> @Override
> public void close() throws IOException {
> if (this.closed.getAndSet(true)) {
> return;
> }
> this.backupStream.close();
> LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
> (Object)this.key);
> try {
> 

[jira] [Updated] (FLINK-8719) add README for flink-contrib to clarify its purpose

2018-02-21 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-8719:

Summary: add README for flink-contrib to clarify its purpose  (was: add 
module description for flink-contrib to clarify its purpose)

> add README for flink-contrib to clarify its purpose
> ---
>
> Key: FLINK-8719
> URL: https://issues.apache.org/jira/browse/FLINK-8719
> Project: Flink
>  Issue Type: Improvement
>  Components: flink-contrib
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.5.0
>
>
> {\{flink-contrib}} currently doesn't have any clarification or description of 
> its purpose, which confuses lots of developers. Adding clarification and 
> module description



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5552: [FLINK-8735] Add new StatefulJobSavepointMigration...

2018-02-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5552#discussion_r169743891
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java
 ---
@@ -0,0 +1,658 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+/**
+ * Migration ITCases for a stateful job. The tests are parameterized to 
cover
+ * migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobSavepointMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_4, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_4, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   /**
+* TODO to generate savepoints for a specific Flink version / backend 
type,
+* TODO change these values accordingly, e.g. to generate for 1.4 with 
RocksDB,
+* TODO set as (MigrationVersion.v1_4, 

[GitHub] flink pull request #5552: [FLINK-8735] Add new StatefulJobSavepointMigration...

2018-02-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5552#discussion_r169741385
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java
 ---
@@ -0,0 +1,658 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+/**
+ * Migration ITCases for a stateful job. The tests are parameterized to 
cover
+ * migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobSavepointMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_4, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_4, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   /**
+* TODO to generate savepoints for a specific Flink version / backend 
type,
+* TODO change these values accordingly, e.g. to generate for 1.4 with 
RocksDB,
+* TODO set as (MigrationVersion.v1_4, 

[jira] [Comment Edited] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-21 Thread chris snow (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371882#comment-16371882
 ] 

chris snow edited comment on FLINK-8543 at 2/21/18 7:42 PM:


So all files being accompanied by .valid-length is expected behavoir when 
saving to s3?

If so, unless client applications can understand and use the .valid-length 
(which I don’t think will be the case), I don’t think this functionality makes 
sense with s3?  I.e. am I trying to do something with Flink that it wasn’t 
designed to do?


was (Author: snowch):
So all files being accompanied by .valid-length is expected behavoir when 
saving to s3?

If so, unless client applications can understand and use the .valid-length 
(which I don’t think will be the case), I don’t think this functionality makes 
sense with s3?

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vv-- Additional logging --vvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException 

[jira] [Commented] (FLINK-8729) Migrate JSONGenerator from Sling to Jackson

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371962#comment-16371962
 ] 

ASF GitHub Bot commented on FLINK-8729:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5554

[FLINK-8729][streaming] Refactor JSONGenerator to use jackson

## What is the purpose of the change

This PR ports the `JSONGenerator` to rely on jackson instead of 
`org.apache.sling`.

## Brief changelog

* refactor JSONGenerator
* remove org.apache.slink dependency from flink-streaming-java

## Verifying this change

This change is already covered by existing tests, such as 
`JsonGeneratorTest`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8729

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5554.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5554


commit bbbc3aaced575f7e08141794e756fa25723d0f82
Author: zentol 
Date:   2018-02-21T14:30:16Z

[FLINK-8729][streaming] Refactor JSONGenerator to use jackson




> Migrate JSONGenerator from Sling to Jackson
> ---
>
> Key: FLINK-8729
> URL: https://issues.apache.org/jira/browse/FLINK-8729
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, Streaming
>Reporter: Stephan Ewen
>Assignee: Chesnay Schepler
>Priority: Minor
>  Labels: beginner, easy-fix, starter
>
> The {{org.apache.flink.streaming.api.graph.JSONGenerator}} uses Slink for 
> JSON encoding, adding an extra dependency. All other Flink parts use a 
> specially shaded Jackson dependency.
> Migrating the JSONGenerator would allow us to drop a dependency and make the 
> code more homogeneous.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8729) Migrate JSONGenerator from Sling to Jackson

2018-02-21 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-8729:

Component/s: Build System

> Migrate JSONGenerator from Sling to Jackson
> ---
>
> Key: FLINK-8729
> URL: https://issues.apache.org/jira/browse/FLINK-8729
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, Streaming
>Reporter: Stephan Ewen
>Assignee: Chesnay Schepler
>Priority: Minor
>  Labels: beginner, easy-fix, starter
>
> The {{org.apache.flink.streaming.api.graph.JSONGenerator}} uses Slink for 
> JSON encoding, adding an extra dependency. All other Flink parts use a 
> specially shaded Jackson dependency.
> Migrating the JSONGenerator would allow us to drop a dependency and make the 
> code more homogeneous.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5552: [FLINK-8735] Add new StatefulJobSavepointMigration...

2018-02-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5552#discussion_r169745602
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java
 ---
@@ -0,0 +1,663 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Migration ITCases for a stateful job. The tests are parameterized to 
cover
+ * migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class LegacyStatefulJobSavepointMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_2, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_2, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_3, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_3, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_4, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_4, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   /**
+* TODO to generate savepoints for a specific Flink version / backend 
type,
+* TODO change these values accordingly, e.g. to generate for 1.3 with 
RocksDB,
+* TODO set as 

[jira] [Commented] (FLINK-8735) Add savepoint migration ITCase that covers operator state

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371876#comment-16371876
 ] 

ASF GitHub Bot commented on FLINK-8735:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5552#discussion_r169745602
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java
 ---
@@ -0,0 +1,663 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Migration ITCases for a stateful job. The tests are parameterized to 
cover
+ * migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class LegacyStatefulJobSavepointMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_2, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_2, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_3, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_3, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_4, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_4, 

[GitHub] flink pull request #5552: [FLINK-8735] Add new StatefulJobSavepointMigration...

2018-02-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5552#discussion_r169740338
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java
 ---
@@ -0,0 +1,658 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+/**
+ * Migration ITCases for a stateful job. The tests are parameterized to 
cover
+ * migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobSavepointMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_4, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_4, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   /**
+* TODO to generate savepoints for a specific Flink version / backend 
type,
+* TODO change these values accordingly, e.g. to generate for 1.4 with 
RocksDB,
+* TODO set as (MigrationVersion.v1_4, 

[jira] [Commented] (FLINK-8735) Add savepoint migration ITCase that covers operator state

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371868#comment-16371868
 ] 

ASF GitHub Bot commented on FLINK-8735:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5552#discussion_r169742676
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java
 ---
@@ -0,0 +1,658 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+/**
+ * Migration ITCases for a stateful job. The tests are parameterized to 
cover
+ * migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobSavepointMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_4, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_4, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   /**
+

[jira] [Commented] (FLINK-8735) Add savepoint migration ITCase that covers operator state

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371865#comment-16371865
 ] 

ASF GitHub Bot commented on FLINK-8735:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5552#discussion_r169740338
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java
 ---
@@ -0,0 +1,658 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+/**
+ * Migration ITCases for a stateful job. The tests are parameterized to 
cover
+ * migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobSavepointMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_4, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_4, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   /**
+

[GitHub] flink pull request #5552: [FLINK-8735] Add new StatefulJobSavepointMigration...

2018-02-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5552#discussion_r169738273
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java
 ---
@@ -0,0 +1,663 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Migration ITCases for a stateful job. The tests are parameterized to 
cover
+ * migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class LegacyStatefulJobSavepointMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_2, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_2, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_3, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_3, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_4, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_4, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   /**
+* TODO to generate savepoints for a specific Flink version / backend 
type,
+* TODO change these values accordingly, e.g. to generate for 1.3 with 
RocksDB,
--- End diff --


[jira] [Commented] (FLINK-8735) Add savepoint migration ITCase that covers operator state

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371797#comment-16371797
 ] 

ASF GitHub Bot commented on FLINK-8735:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5552
  
The sister PR for `release-1.4` is #5553 .


> Add savepoint migration ITCase that covers operator state
> -
>
> Key: FLINK-8735
> URL: https://issues.apache.org/jira/browse/FLINK-8735
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.5.0, 1.4.2
>
>
> The current {{StatefulJobSavepointMigrationITCase}} does not cover operator 
> state, meaning state accessed using {{OperatorStateStore}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5553: [FLINK-8735] Add new StatefulJobSavepointMigration...

2018-02-21 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/5553

 [FLINK-8735] Add new StatefulJobSavepointMigrationITCase (release-1.4)

Sister PR to #5552 for the release-1.4 branch.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink 
jira-8735-new-savepoint-migration-test-release-14

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5553.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5553


commit 82e6f8d5b8623f97f854ae9936e6754dc09ef5af
Author: Aljoscha Krettek 
Date:   2018-02-21T17:08:13Z

[FLINK-8735] Rename StatefulJobSavepointMigrationITCase

This is preparation for modifying a new ITCase to use modern state
features.

commit bc848e43f8f6c041161d787da1c3131e4365b4c6
Author: Aljoscha Krettek 
Date:   2018-02-21T17:10:55Z

[FLINK-8735] Add new StatefulJobSavepointMigrationITCase

This new test does not pretend to use legacy state but now instead uses
the more modern operator state varieties.




---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371780#comment-16371780
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169727315
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class represents a directory that is the target for a state 
snapshot. This class provides some method that
+ * simplify resource management when dealing with such directories, e.g. 
it can produce a {@link DirectoryStateHandle}
+ * when the snapshot is completed and disposal considers whether or not a 
snapshot was already completed. For a
+ * completed snapshot, the ownership for cleanup is transferred to the 
created directory state handle. For incomplete
+ * snapshots, calling {@link #deleteIfIncompleteSnapshot()} will delete 
the underlying directory resource.
+ */
+public class SnapshotDirectory {
+
+   /**
+* Lifecycle stages of a snapshot directory.
+*/
+   enum State {
+   ONGOING, COMPLETED, DELETED
+   }
+
+   /** This path describes the underlying directory for the snapshot. */
+   @Nonnull
+   private final Path directory;
+
+   /** The filesystem that contains the snapshot directory. */
+   @Nonnull
+   private final FileSystem fileSystem;
--- End diff --

Do we need `FileSystem` if we have `directory` which is of type `Path`?


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371785#comment-16371785
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169726050
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryKeyedStateHandle.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nonnull;
+
+/**
+ * This class is a keyed state handle based on a directory. It combines a 
{@link DirectoryStateHandle} and a
+ * {@link KeyGroupRange}.
+ */
+public class DirectoryKeyedStateHandle implements KeyedStateHandle {
+
+   @Nonnull
+   private final DirectoryStateHandle directoryStateHandle;
+
+   @Nonnull
+   private final KeyGroupRange keyGroupRange;
+
+   public DirectoryKeyedStateHandle(
+   @Nonnull DirectoryStateHandle directoryStateHandle,
+   @Nonnull KeyGroupRange keyGroupRange) {
+
+   this.directoryStateHandle = directoryStateHandle;
+   this.keyGroupRange = keyGroupRange;
+   }
+
+   @Nonnull
+   public DirectoryStateHandle getDirectoryStateHandle() {
+   return directoryStateHandle;
+   }
+
+   @Nonnull
+   @Override
+   public KeyGroupRange getKeyGroupRange() {
+   return keyGroupRange;
+   }
+
+   @Override
+   public void discardState() throws Exception {
+   directoryStateHandle.discardState();
+   }
+
+   @Override
+   public long getStateSize() {
+   return directoryStateHandle.getStateSize();
+   }
+
+   @Override
+   public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+   return 
keyGroupRange.getIntersection(keyGroupRange).getNumberOfKeyGroups() > 0 ? this 
: null;
--- End diff --

This looks like a bug. `keyGroupRange.getIntersection(keyGroupRange)` 
should be `keyGroupRange`. I think in general it is a good idea to not shadow 
local fields by function parameters.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5545: [FLINK-8718][DataStream] Set maxParallelism on non-parall...

2018-02-21 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5545
  
I'm not sure we can do this change since it breaks compatibility for 
savepoints because previous versions of Flink hat some `maxParallelism` setting 
for sources which would be incompatible with the new `maxParallelism` of 1. 
Thoughts?


---


[jira] [Commented] (FLINK-8718) Non-parallel DataStreamSource does not set max parallelism

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371789#comment-16371789
 ] 

ASF GitHub Bot commented on FLINK-8718:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5545
  
I'm not sure we can do this change since it breaks compatibility for 
savepoints because previous versions of Flink hat some `maxParallelism` setting 
for sources which would be incompatible with the new `maxParallelism` of 1. 
Thoughts?


> Non-parallel DataStreamSource does not set max parallelism
> --
>
> Key: FLINK-8718
> URL: https://issues.apache.org/jira/browse/FLINK-8718
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Streaming
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Major
> Fix For: 1.5.0
>
>
> {{org.apache.flink.streaming.api.datastream.DataStreamSource}} does not set 
> {{maxParallelism}} to 1 if it is non-parallel.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371786#comment-16371786
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169726507
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+
+/**
+ * This state handle represents a directory. This class is, for example, 
used to represent the directory of RocksDB's
+ * native checkpoint directories for local recovery.
+ */
+public class DirectoryStateHandle implements StateObject {
+
+   /** Serial version. */
+   private static final long serialVersionUID = 1L;
+
+   /** The path that describes the directory. */
+   @Nonnull
+   private final Path directory;
+
+   /** The filesystem that contains the directory described by path. */
+   @Nonnull
+   private final FileSystem fileSystem;
--- End diff --

`FileSystem` is not serializable and should be removed from the 
`DirectoryStateHandle`. `directory` should give you all you need.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371784#comment-16371784
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169723987
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1552,10 +1556,33 @@ public IncrementalSnapshotStrategy() {
return DoneFuture.nullValue();
}
 
+   SnapshotDirectory snapshotDirectory;
--- End diff --

Could be made `final`


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371783#comment-16371783
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169727620
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class represents a directory that is the target for a state 
snapshot. This class provides some method that
+ * simplify resource management when dealing with such directories, e.g. 
it can produce a {@link DirectoryStateHandle}
+ * when the snapshot is completed and disposal considers whether or not a 
snapshot was already completed. For a
+ * completed snapshot, the ownership for cleanup is transferred to the 
created directory state handle. For incomplete
+ * snapshots, calling {@link #deleteIfIncompleteSnapshot()} will delete 
the underlying directory resource.
+ */
+public class SnapshotDirectory {
+
+   /**
+* Lifecycle stages of a snapshot directory.
+*/
+   enum State {
+   ONGOING, COMPLETED, DELETED
+   }
+
+   /** This path describes the underlying directory for the snapshot. */
+   @Nonnull
+   private final Path directory;
+
+   /** The filesystem that contains the snapshot directory. */
+   @Nonnull
+   private final FileSystem fileSystem;
+
+   /** This reference tracks the lifecycle state of the snapshot 
directory. */
+   @Nonnull
+   private AtomicReference state;
+
+   public SnapshotDirectory(@Nonnull Path directory, @Nonnull FileSystem 
fileSystem) {
+   this.directory = directory;
+   this.fileSystem = fileSystem;
+   this.state = new AtomicReference<>(State.ONGOING);
+   }
+
+   public SnapshotDirectory(@Nonnull Path directory) throws IOException {
+   this(directory, directory.getFileSystem());
+   }
+
+   @Nonnull
+   public Path getDirectory() {
+   return directory;
+   }
+
+   public boolean mkdirs() throws IOException {
+   return fileSystem.mkdirs(directory);
+   }
+
+   @Nonnull
+   public FileSystem getFileSystem() {
+   return fileSystem;
+   }
+
+   public boolean exists() throws IOException {
+   return fileSystem.exists(directory);
+   }
+
+   /**
+* List the statuses of the files/directories in the snapshot directory.
+*
+* @return the statuses of the files/directories in the given path.
+* @throws IOException if there is a problem creating the file statuses.
+*/
+   public FileStatus[] listStatus() throws IOException {
+   return fileSystem.listStatus(directory);
+   }
+
+   /**
+* Calling this method completes the snapshot into the snapshot 
directory and creates a corresponding
+* {@link DirectoryStateHandle} that points to the snapshot directory. 
Calling this method will also change the
+* lifecycle state from "ongoing" to "completed". If the state was 
already deleted, an {@link IOException} is
+* thrown.
+*
+* @return a directory state handle that points to the snapshot 
directory.
+* @throws IOException if the state of this snapshot directory object 
is different from "ongoing".
+*/
+   public DirectoryStateHandle completeSnapshotAndGetHandle() throws 
IOException {
+   if 

[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371787#comment-16371787
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169728016
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class represents a directory that is the target for a state 
snapshot. This class provides some method that
+ * simplify resource management when dealing with such directories, e.g. 
it can produce a {@link DirectoryStateHandle}
+ * when the snapshot is completed and disposal considers whether or not a 
snapshot was already completed. For a
+ * completed snapshot, the ownership for cleanup is transferred to the 
created directory state handle. For incomplete
+ * snapshots, calling {@link #deleteIfIncompleteSnapshot()} will delete 
the underlying directory resource.
+ */
+public class SnapshotDirectory {
+
+   /**
+* Lifecycle stages of a snapshot directory.
+*/
+   enum State {
+   ONGOING, COMPLETED, DELETED
+   }
+
+   /** This path describes the underlying directory for the snapshot. */
+   @Nonnull
+   private final Path directory;
+
+   /** The filesystem that contains the snapshot directory. */
+   @Nonnull
+   private final FileSystem fileSystem;
+
+   /** This reference tracks the lifecycle state of the snapshot 
directory. */
+   @Nonnull
+   private AtomicReference state;
+
+   public SnapshotDirectory(@Nonnull Path directory, @Nonnull FileSystem 
fileSystem) {
+   this.directory = directory;
+   this.fileSystem = fileSystem;
+   this.state = new AtomicReference<>(State.ONGOING);
+   }
+
+   public SnapshotDirectory(@Nonnull Path directory) throws IOException {
+   this(directory, directory.getFileSystem());
+   }
+
+   @Nonnull
+   public Path getDirectory() {
+   return directory;
+   }
+
+   public boolean mkdirs() throws IOException {
+   return fileSystem.mkdirs(directory);
+   }
+
+   @Nonnull
+   public FileSystem getFileSystem() {
+   return fileSystem;
+   }
+
+   public boolean exists() throws IOException {
+   return fileSystem.exists(directory);
+   }
+
+   /**
+* List the statuses of the files/directories in the snapshot directory.
+*
+* @return the statuses of the files/directories in the given path.
+* @throws IOException if there is a problem creating the file statuses.
+*/
+   public FileStatus[] listStatus() throws IOException {
+   return fileSystem.listStatus(directory);
+   }
+
+   /**
+* Calling this method completes the snapshot into the snapshot 
directory and creates a corresponding
+* {@link DirectoryStateHandle} that points to the snapshot directory. 
Calling this method will also change the
+* lifecycle state from "ongoing" to "completed". If the state was 
already deleted, an {@link IOException} is
+* thrown.
+*
+* @return a directory state handle that points to the snapshot 
directory.
+* @throws IOException if the state of this snapshot directory object 
is different from "ongoing".
+*/
+   public DirectoryStateHandle completeSnapshotAndGetHandle() throws 
IOException {
+   if 

[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371781#comment-16371781
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169724637
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -2123,9 +2162,21 @@ void takeSnapshot() throws Exception {
checkpointId,
sstFiles,
miscFiles,
-   metaStateHandle);
+   metaStateHandle.getJobManagerOwnedSnapshot());
+
+   DirectoryStateHandle directoryStateHandle = 
localBackupDirectory.completeSnapshotAndGetHandle();
+   StreamStateHandle taskLocalSnapshotMetaDataStateHandle 
= metaStateHandle.getTaskLocalSnapshot();
+   IncrementalLocalKeyedStateHandle 
directoryKeyedStateHandle =
+   directoryStateHandle != null && 
taskLocalSnapshotMetaDataStateHandle != null ?
+   new IncrementalLocalKeyedStateHandle(
+   stateBackend.backendUID,
+   checkpointId,
+   directoryStateHandle,
+   stateBackend.keyGroupRange,
+   
taskLocalSnapshotMetaDataStateHandle) :
+   null;
 
-   return new 
SnapshotResult<>(incrementalKeyedStateHandle, null);
+   return new 
SnapshotResult<>(incrementalKeyedStateHandle, directoryKeyedStateHandle);
--- End diff --

I guess you changed the creation of the `SnapshotResult` in a fixup commit 
in your local branch, right? Otherwise we might refactor this in order to get 
rid of the many `nulls` in the lines above.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371782#comment-16371782
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169725430
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java
 ---
@@ -172,4 +167,30 @@ public CheckpointStreamWithResultProvider create(
return new 
CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);
}
}
+
+   /**
+* Helper method that takes a {@link SnapshotResult} 
and a {@link KeyGroupRangeOffsets} and
+* creates a {@link SnapshotResult} by combining 
the key groups offsets with all the
+* present stream state handles.
+*/
+   static SnapshotResult 
toKeyedStateHandleSnapshotResult(
+   @Nullable SnapshotResult snapshotResult,
--- End diff --

I think this parameter should not be `Nullable`. If it is null, then we 
don't have to call this method.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169728016
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class represents a directory that is the target for a state 
snapshot. This class provides some method that
+ * simplify resource management when dealing with such directories, e.g. 
it can produce a {@link DirectoryStateHandle}
+ * when the snapshot is completed and disposal considers whether or not a 
snapshot was already completed. For a
+ * completed snapshot, the ownership for cleanup is transferred to the 
created directory state handle. For incomplete
+ * snapshots, calling {@link #deleteIfIncompleteSnapshot()} will delete 
the underlying directory resource.
+ */
+public class SnapshotDirectory {
+
+   /**
+* Lifecycle stages of a snapshot directory.
+*/
+   enum State {
+   ONGOING, COMPLETED, DELETED
+   }
+
+   /** This path describes the underlying directory for the snapshot. */
+   @Nonnull
+   private final Path directory;
+
+   /** The filesystem that contains the snapshot directory. */
+   @Nonnull
+   private final FileSystem fileSystem;
+
+   /** This reference tracks the lifecycle state of the snapshot 
directory. */
+   @Nonnull
+   private AtomicReference state;
+
+   public SnapshotDirectory(@Nonnull Path directory, @Nonnull FileSystem 
fileSystem) {
+   this.directory = directory;
+   this.fileSystem = fileSystem;
+   this.state = new AtomicReference<>(State.ONGOING);
+   }
+
+   public SnapshotDirectory(@Nonnull Path directory) throws IOException {
+   this(directory, directory.getFileSystem());
+   }
+
+   @Nonnull
+   public Path getDirectory() {
+   return directory;
+   }
+
+   public boolean mkdirs() throws IOException {
+   return fileSystem.mkdirs(directory);
+   }
+
+   @Nonnull
+   public FileSystem getFileSystem() {
+   return fileSystem;
+   }
+
+   public boolean exists() throws IOException {
+   return fileSystem.exists(directory);
+   }
+
+   /**
+* List the statuses of the files/directories in the snapshot directory.
+*
+* @return the statuses of the files/directories in the given path.
+* @throws IOException if there is a problem creating the file statuses.
+*/
+   public FileStatus[] listStatus() throws IOException {
+   return fileSystem.listStatus(directory);
+   }
+
+   /**
+* Calling this method completes the snapshot into the snapshot 
directory and creates a corresponding
+* {@link DirectoryStateHandle} that points to the snapshot directory. 
Calling this method will also change the
+* lifecycle state from "ongoing" to "completed". If the state was 
already deleted, an {@link IOException} is
+* thrown.
+*
+* @return a directory state handle that points to the snapshot 
directory.
+* @throws IOException if the state of this snapshot directory object 
is different from "ongoing".
+*/
+   public DirectoryStateHandle completeSnapshotAndGetHandle() throws 
IOException {
+   if (state.compareAndSet(State.ONGOING, State.COMPLETED)) {
+   return new DirectoryStateHandle(directory, fileSystem);
+   } else {
+   throw new IOException("Expected state " + State.ONGOING 
+ " but found state " + 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169726507
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+
+/**
+ * This state handle represents a directory. This class is, for example, 
used to represent the directory of RocksDB's
+ * native checkpoint directories for local recovery.
+ */
+public class DirectoryStateHandle implements StateObject {
+
+   /** Serial version. */
+   private static final long serialVersionUID = 1L;
+
+   /** The path that describes the directory. */
+   @Nonnull
+   private final Path directory;
+
+   /** The filesystem that contains the directory described by path. */
+   @Nonnull
+   private final FileSystem fileSystem;
--- End diff --

`FileSystem` is not serializable and should be removed from the 
`DirectoryStateHandle`. `directory` should give you all you need.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169727620
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class represents a directory that is the target for a state 
snapshot. This class provides some method that
+ * simplify resource management when dealing with such directories, e.g. 
it can produce a {@link DirectoryStateHandle}
+ * when the snapshot is completed and disposal considers whether or not a 
snapshot was already completed. For a
+ * completed snapshot, the ownership for cleanup is transferred to the 
created directory state handle. For incomplete
+ * snapshots, calling {@link #deleteIfIncompleteSnapshot()} will delete 
the underlying directory resource.
+ */
+public class SnapshotDirectory {
+
+   /**
+* Lifecycle stages of a snapshot directory.
+*/
+   enum State {
+   ONGOING, COMPLETED, DELETED
+   }
+
+   /** This path describes the underlying directory for the snapshot. */
+   @Nonnull
+   private final Path directory;
+
+   /** The filesystem that contains the snapshot directory. */
+   @Nonnull
+   private final FileSystem fileSystem;
+
+   /** This reference tracks the lifecycle state of the snapshot 
directory. */
+   @Nonnull
+   private AtomicReference state;
+
+   public SnapshotDirectory(@Nonnull Path directory, @Nonnull FileSystem 
fileSystem) {
+   this.directory = directory;
+   this.fileSystem = fileSystem;
+   this.state = new AtomicReference<>(State.ONGOING);
+   }
+
+   public SnapshotDirectory(@Nonnull Path directory) throws IOException {
+   this(directory, directory.getFileSystem());
+   }
+
+   @Nonnull
+   public Path getDirectory() {
+   return directory;
+   }
+
+   public boolean mkdirs() throws IOException {
+   return fileSystem.mkdirs(directory);
+   }
+
+   @Nonnull
+   public FileSystem getFileSystem() {
+   return fileSystem;
+   }
+
+   public boolean exists() throws IOException {
+   return fileSystem.exists(directory);
+   }
+
+   /**
+* List the statuses of the files/directories in the snapshot directory.
+*
+* @return the statuses of the files/directories in the given path.
+* @throws IOException if there is a problem creating the file statuses.
+*/
+   public FileStatus[] listStatus() throws IOException {
+   return fileSystem.listStatus(directory);
+   }
+
+   /**
+* Calling this method completes the snapshot into the snapshot 
directory and creates a corresponding
+* {@link DirectoryStateHandle} that points to the snapshot directory. 
Calling this method will also change the
+* lifecycle state from "ongoing" to "completed". If the state was 
already deleted, an {@link IOException} is
+* thrown.
+*
+* @return a directory state handle that points to the snapshot 
directory.
+* @throws IOException if the state of this snapshot directory object 
is different from "ongoing".
+*/
+   public DirectoryStateHandle completeSnapshotAndGetHandle() throws 
IOException {
+   if (state.compareAndSet(State.ONGOING, State.COMPLETED)) {
+   return new DirectoryStateHandle(directory, fileSystem);
+   } else {
+   throw new IOException("Expected state " + State.ONGOING 
+ " but found state " + 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169723987
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1552,10 +1556,33 @@ public IncrementalSnapshotStrategy() {
return DoneFuture.nullValue();
}
 
+   SnapshotDirectory snapshotDirectory;
--- End diff --

Could be made `final`


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169725430
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java
 ---
@@ -172,4 +167,30 @@ public CheckpointStreamWithResultProvider create(
return new 
CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);
}
}
+
+   /**
+* Helper method that takes a {@link SnapshotResult} 
and a {@link KeyGroupRangeOffsets} and
+* creates a {@link SnapshotResult} by combining 
the key groups offsets with all the
+* present stream state handles.
+*/
+   static SnapshotResult 
toKeyedStateHandleSnapshotResult(
+   @Nullable SnapshotResult snapshotResult,
--- End diff --

I think this parameter should not be `Nullable`. If it is null, then we 
don't have to call this method.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169727315
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class represents a directory that is the target for a state 
snapshot. This class provides some method that
+ * simplify resource management when dealing with such directories, e.g. 
it can produce a {@link DirectoryStateHandle}
+ * when the snapshot is completed and disposal considers whether or not a 
snapshot was already completed. For a
+ * completed snapshot, the ownership for cleanup is transferred to the 
created directory state handle. For incomplete
+ * snapshots, calling {@link #deleteIfIncompleteSnapshot()} will delete 
the underlying directory resource.
+ */
+public class SnapshotDirectory {
+
+   /**
+* Lifecycle stages of a snapshot directory.
+*/
+   enum State {
+   ONGOING, COMPLETED, DELETED
+   }
+
+   /** This path describes the underlying directory for the snapshot. */
+   @Nonnull
+   private final Path directory;
+
+   /** The filesystem that contains the snapshot directory. */
+   @Nonnull
+   private final FileSystem fileSystem;
--- End diff --

Do we need `FileSystem` if we have `directory` which is of type `Path`?


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169724637
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -2123,9 +2162,21 @@ void takeSnapshot() throws Exception {
checkpointId,
sstFiles,
miscFiles,
-   metaStateHandle);
+   metaStateHandle.getJobManagerOwnedSnapshot());
+
+   DirectoryStateHandle directoryStateHandle = 
localBackupDirectory.completeSnapshotAndGetHandle();
+   StreamStateHandle taskLocalSnapshotMetaDataStateHandle 
= metaStateHandle.getTaskLocalSnapshot();
+   IncrementalLocalKeyedStateHandle 
directoryKeyedStateHandle =
+   directoryStateHandle != null && 
taskLocalSnapshotMetaDataStateHandle != null ?
+   new IncrementalLocalKeyedStateHandle(
+   stateBackend.backendUID,
+   checkpointId,
+   directoryStateHandle,
+   stateBackend.keyGroupRange,
+   
taskLocalSnapshotMetaDataStateHandle) :
+   null;
 
-   return new 
SnapshotResult<>(incrementalKeyedStateHandle, null);
+   return new 
SnapshotResult<>(incrementalKeyedStateHandle, directoryKeyedStateHandle);
--- End diff --

I guess you changed the creation of the `SnapshotResult` in a fixup commit 
in your local branch, right? Otherwise we might refactor this in order to get 
rid of the many `nulls` in the lines above.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r169726050
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryKeyedStateHandle.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nonnull;
+
+/**
+ * This class is a keyed state handle based on a directory. It combines a 
{@link DirectoryStateHandle} and a
+ * {@link KeyGroupRange}.
+ */
+public class DirectoryKeyedStateHandle implements KeyedStateHandle {
+
+   @Nonnull
+   private final DirectoryStateHandle directoryStateHandle;
+
+   @Nonnull
+   private final KeyGroupRange keyGroupRange;
+
+   public DirectoryKeyedStateHandle(
+   @Nonnull DirectoryStateHandle directoryStateHandle,
+   @Nonnull KeyGroupRange keyGroupRange) {
+
+   this.directoryStateHandle = directoryStateHandle;
+   this.keyGroupRange = keyGroupRange;
+   }
+
+   @Nonnull
+   public DirectoryStateHandle getDirectoryStateHandle() {
+   return directoryStateHandle;
+   }
+
+   @Nonnull
+   @Override
+   public KeyGroupRange getKeyGroupRange() {
+   return keyGroupRange;
+   }
+
+   @Override
+   public void discardState() throws Exception {
+   directoryStateHandle.discardState();
+   }
+
+   @Override
+   public long getStateSize() {
+   return directoryStateHandle.getStateSize();
+   }
+
+   @Override
+   public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+   return 
keyGroupRange.getIntersection(keyGroupRange).getNumberOfKeyGroups() > 0 ? this 
: null;
--- End diff --

This looks like a bug. `keyGroupRange.getIntersection(keyGroupRange)` 
should be `keyGroupRange`. I think in general it is a good idea to not shadow 
local fields by function parameters.


---


[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-21 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371773#comment-16371773
 ] 

Aljoscha Krettek commented on FLINK-8543:
-

Yes, this is what I would expect since the Hadoop S3A filesystem does not 
support truncate. Do you know if IBM COS would support truncate? Could you use 
an alternative {{FileSystem}} implementation?

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vv-- Additional logging --vvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException {
> this.checkOpen();
> this.backupStream.flush();
> }
> @Override
> public void close() throws IOException {
> if (this.closed.getAndSet(true)) {
> return;
> }
> this.backupStream.close();
> LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
> (Object)this.key);
> try {
> ObjectMetadata om = 
> 

  1   2   3   4   >